Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Streaming Analytics Using Kafka SQL

DZone's Guide to

Streaming Analytics Using Kafka SQL

Learn how to enrich data from Citi Bike and find the number of trips on a particular day to and from a particular station.

· Big Data Zone ·
Free Resource

How to Simplify Apache Kafka. Get eBook.

Kafka SQL, a streaming SQL engine for Apache Kafka by Confluent, is used for real-time data integration, data monitoring, and data anomaly detection. KSQL is used to read, write, and process Citi Bike trip data in real-time, enrich the trip data with other station details, and find the number of trips started and ended in a day for a particular station. It is also used to publish trip data from the source to other destinations for further analysis.

In this article, let's discuss enriching the Citi Bike trip data and finding the number of trips on a particular day to and from a particular station.

Prerequisites

Install the following:

  • Scala
  • Apache Kafka
  • KSQL
  • JDK

Data Description

The trip dataset from March 2017 is used as the source data. It contains basic details such as trip duration, ride start time, ride end time, station ID, station name, station latitude, and station longitude.

select

The station dataset is used to enrich trip details for further analysis after data consumption. It contains basic details such as availableBikesavailableDocksstatusValue, and totalDocks.

select

Use Case

  • Enrich Citi Bike trip data in real time using joining and aggregation concepts.
  • Find the number of trips on a day to and from a particular station.
  • View trip details with station details and aggregate the trip count of each station.

Synopsis

  • Produce station details.
  • Join stream data and table data.
  • Group data.
  • Produce trip details.
  • View output.
    • View trip details with station details.
    • View aggregate trip count for each station.

Producing Station Details

To produce the station details using Scala, perform the following.

  • Createtrip-details andstation-details topics in Kafka using the below commands:
./bin/kafka-topics --create --zookeeper localhost:2181 --topic station-details --replication-factor 1 --partitions 1
./bin/kafka-topics --create --zookeeper localhost:2181 --topic trip-details --replication-factor 1 --partitions 1

select

    • Read station data from the URL https://feeds.citibikenyc.com/stations/stations.json using the below commands:

select

    • Iterate the station list to produce a JSON file using the below commands:

select

  • Produce station data into the station-details topic via the below Scala command:
java -cp kafka_producer_consumer.jar com.treselle.kafka.core.Producer station-details localhost:9092 station_data

select

  • Iterate and produce the station-details list in JSON format.
  • Check the produced and consumed station details using the below command:
./bin/kafka-console-consumer –bootstrap-server localhost:9092 –topic station- details –from-beginning

Joining Stream Data and Table Data

To join the stream and table data, perform the following.

  • In the KSQL console, create a table for the station details to join it with the trip details while producing the stream using the below commands:
CREATE TABLE
station_details_table
(
id BIGINT,
stationName VARCHAR,
availableDocks BIGINT,
totalDocks BIGINT,
latitude DOUBLE,
longitude DOUBLE,
statusValue VARCHAR,
statusKey BIGINT,
availableBikes BIGINT,
stAddress1 VARCHAR,
stAddress2 VARCHAR,
city VARCHAR,
postalCode VARCHAR,
location VARCHAR,
altitude VARCHAR,
testStation BOOLEAN,
lastCommunicationTime VARCHAR,
landMark VARCHAR
)
WITH
(
kafka_topic='station-details',
value_format='JSON'
)

select

  • In the KSQL console, create a stream for the trip details to enrich the data with the start station details and to find the trip count of each station for the day using the below commands:
CREATE STREAM
trip_details_stream
(
tripduration BIGINT,
starttime VARCHAR,
stoptime VARCHAR,
start_station_id BIGINT,
start_station_name VARCHAR,
start_station_latitude DOUBLE,
start_station_longitude DOUBLE,
end_station_id BIGINT,
end_station_name VARCHAR,
end_station_latitude DOUBLE,
end_station_longitude DOUBLE,
bikeid INT,
usertype VARCHAR,
birth_year VARCHAR,
gender VARCHAR
)
WITH
(
kafka_topic='trip-details',
value_format='DELIMITED'
);

select

  • Join the stream with the station details table to get fields such as availableBikes, totalDocks, and availableDocks, using the station ID as the key.
  • Extract the SELECT statement start time in date format as the timestamp to get only the day from the start time and get the started trip count details in the day using the below commands:
CREATE STREAM
citibike_trip_start_station_details WITH
(
value_format='JSON'
) AS
SELECT
a.tripduration,
a.starttime,
STRINGTOTIMESTAMP(a.starttime, 'yyyy-MM-dd HH:mm:ss') AS startime_timestamp,
a.start_station_id,
a.start_station_name,
a.start_station_latitude,
a.start_station_longitude,
a.bikeid,
a.usertype,
a.birth_year,
a.gender,
b.availableDocks AS start_station_availableDocks,
b.totalDocks AS start_station_totalDocks,
b.availableBikes AS start_station_availableBikes,
b.statusValue AS start_station_service_value
FROM
trip_details_stream a
LEFT JOIN
station_details_table b
ON
a.start_station_id=b.id

select

  • Add the end station details with trip details in another topic similar to the start station.
  • Extract the end time field as a long timestamp using the below commands:
CREATE STREAM
citibike_trip_end_station_details WITH
(
value_format='JSON'
) AS
SELECT
a.tripduration,
a.stoptime,
STRINGTOTIMESTAMP(a.stoptime, 'yyyy-MM-dd HH:mm:ss') AS stoptime_timestamp,
a.end_station_id,
a.end_station_name,
a.end_station_latitude,
a.end_station_longitude,
a.bikeid,
a.usertype,
a.birth_year,
a.gender,
b.availableDocks AS end_station_availableDocks,
b.totalDocks AS end_station_totalDocks,
b.availableBikes AS end_station_availableBikes,
b.statusValue AS end_station_service_value
FROM
trip_details_stream a
LEFT JOIN
station_details_table b
ON
a.end_station_id=b.id;

select

  • Join the streamed trip details with the station details table, as KSQL does not allow joining of two streams or two tables.

Grouping Data

To group data based on the station details and date, perform the following.

  • Format the date as YYYY-MM-DD from the long timestamp to group by date in the start trip details using the below commands:
CREATE STREAM
citibike_trip_start_station_details_with_date AS
SELECT
TIMESTAMPTOSTRING(startime_timestamp, 'yyyy-MM-dd') AS DATE,
starttime,
start_station_id,
start_station_name
FROM
citibike_trip_start_station_details;

select

  • Format the date as YYYY-MM-DD from the long timestamp to group by date in the end trip details using the below commands:
CREATE STREAM
citibike_trip_end_station_details_with_date AS
SELECT
TIMESTAMPTOSTRING(stoptime_timestamp, 'yyyy-MM-dd') AS DATE,
stoptime,
end_station_id,
end_station_name
FROM
citibike_trip_end_station_details;

select

  • Create a table by grouping the data based on the date and the stations for finding the started trip counts and the ended trip counts of each station for the day using the below commands:
CREATE TABLE
start_trip_count_by_stations AS
SELECT
DATE,
start_station_id,
start_station_name,
COUNT(*) AS trip_count
FROM
citibike_trip_start_station_details_with_date
GROUP BY
DATE,
start_station_name,
start_station_id;

select

CREATE TABLE
end_trip_count_by_stations AS
SELECT
DATE,
end_station_id,
end_station_name,
COUNT(*) AS trip_count
FROM
citibike_trip_end_station_details_with_date
GROUP BY
DATE,
end_station_name,
end_station_id;

select

  • List the topics to check whether they're created for persistent queries.

select

Producing Trip Details

To produce the trip details into the topic trip-details using Scala, use the below commands:

./bin/kafka-console-consumer –bootstrap-server localhost:9092 –topic station-details –from-beginning

select

From the above console output, it is evident that a total of 727,664 messages are produced for data enrichment at the stream.

Viewing Output

Let's see how to view trip details with station details and how to view the aggregate trip count of each station.

Viewing Trip Details With Station Details

To view the trip details with the station details, perform the following:

  • Consume the message using the topic CITIBIKE_TRIP_START_STATION_DETAILS to view the extra fields added to the trip details from the station details table and to extract the long timestamp field from the start and end times using the below commands:
/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic CITIBIKE_TRIP_START_STATION_DETAILS --from-beginning

select

  • Consume the message using the topic CITIBIKE_TRIP_END_STATION_DETAILS using the below commands:
./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic CITIBIKE_TRIP_END_STATION_DETAILS --from-beginning

select

From the above console output, it is evident that the fields of the station details are added to the trip while producing the trip details.

Viewing Aggregate Trip Count of Each Station

To view the aggregate trip count of each station based on the date, perform the following:

  • Consume the message via the console to check the trip counts obtained on the stream using the below commands:
./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic START_TRIP_COUNT_BY_STATIONS --from-beginning

select

From the above console output, it is evident that the trip counts are updated and added to the topic for each day when producing the message. This data can be filtered to the latest trip count in consumer for further analysis.

  • Obtain the end trip count details based on the stations using the below commands:
./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic END_TRIP_COUNT_BY_STATIONS --from-beginning

select

Conclusion

In this article, we discussed adding extra fields from the station details table, extracting date in the YYYY-MM-DD format, and grouping the details based on the station ID and the day to get the start and end trip count details of the station.

References

12 Best Practices for Modern Data Ingestion. Download White Paper.

Topics:
streaming analytics ,kafka ,data aggregation ,data analysis ,big data ,tutorial ,real-time data ,data monitoring

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}