Streaming Analytics Using Kafka SQL
Streaming Analytics Using Kafka SQL
Learn how to enrich data from Citi Bike and find the number of trips on a particular day to and from a particular station.
Join the DZone community and get the full member experience.Join For Free
Kafka SQL, a streaming SQL engine for Apache Kafka by Confluent, is used for real-time data integration, data monitoring, and data anomaly detection. KSQL is used to read, write, and process Citi Bike trip data in real-time, enrich the trip data with other station details, and find the number of trips started and ended in a day for a particular station. It is also used to publish trip data from the source to other destinations for further analysis.
In this article, let's discuss enriching the Citi Bike trip data and finding the number of trips on a particular day to and from a particular station.
Install the following:
- Apache Kafka
The trip dataset from March 2017 is used as the source data. It contains basic details such as trip duration, ride start time, ride end time, station ID, station name, station latitude, and station longitude.
The station dataset is used to enrich trip details for further analysis after data consumption. It contains basic details such as
- Enrich Citi Bike trip data in real time using joining and aggregation concepts.
- Find the number of trips on a day to and from a particular station.
- View trip details with station details and aggregate the trip count of each station.
- Produce station details.
- Join stream data and table data.
- Group data.
- Produce trip details.
- View output.
- View trip details with station details.
- View aggregate trip count for each station.
Producing Station Details
To produce the station details using Scala, perform the following.
station-detailstopics in Kafka using the below commands:
./bin/kafka-topics --create --zookeeper localhost:2181 --topic station-details --replication-factor 1 --partitions 1 ./bin/kafka-topics --create --zookeeper localhost:2181 --topic trip-details --replication-factor 1 --partitions 1
- Read station data from the URL https://feeds.citibikenyc.com/stations/stations.json using the below commands:
- Iterate the station list to produce a JSON file using the below commands:
- Produce station data into the
station-detailstopic via the below Scala command:
java -cp kafka_producer_consumer.jar com.treselle.kafka.core.Producer station-details localhost:9092 station_data
- Iterate and produce the
station-detailslist in JSON format.
- Check the produced and consumed station details using the below command:
./bin/kafka-console-consumer –bootstrap-server localhost:9092 –topic station- details –from-beginning
Joining Stream Data and Table Data
To join the stream and table data, perform the following.
- In the KSQL console, create a table for the station details to join it with the trip details while producing the stream using the below commands:
CREATE TABLE station_details_table ( id BIGINT, stationName VARCHAR, availableDocks BIGINT, totalDocks BIGINT, latitude DOUBLE, longitude DOUBLE, statusValue VARCHAR, statusKey BIGINT, availableBikes BIGINT, stAddress1 VARCHAR, stAddress2 VARCHAR, city VARCHAR, postalCode VARCHAR, location VARCHAR, altitude VARCHAR, testStation BOOLEAN, lastCommunicationTime VARCHAR, landMark VARCHAR ) WITH ( kafka_topic='station-details', value_format='JSON' )
- In the KSQL console, create a stream for the trip details to enrich the data with the start station details and to find the trip count of each station for the day using the below commands:
CREATE STREAM trip_details_stream ( tripduration BIGINT, starttime VARCHAR, stoptime VARCHAR, start_station_id BIGINT, start_station_name VARCHAR, start_station_latitude DOUBLE, start_station_longitude DOUBLE, end_station_id BIGINT, end_station_name VARCHAR, end_station_latitude DOUBLE, end_station_longitude DOUBLE, bikeid INT, usertype VARCHAR, birth_year VARCHAR, gender VARCHAR ) WITH ( kafka_topic='trip-details', value_format='DELIMITED' );
- Join the stream with the station details table to get fields such as
availableDocks, using the station ID as the key.
- Extract the
start timein date format as the timestamp to get only the day from the start time and get the started trip count details in the day using the below commands:
CREATE STREAM citibike_trip_start_station_details WITH ( value_format='JSON' ) AS SELECT a.tripduration, a.starttime, STRINGTOTIMESTAMP(a.starttime, 'yyyy-MM-dd HH:mm:ss') AS startime_timestamp, a.start_station_id, a.start_station_name, a.start_station_latitude, a.start_station_longitude, a.bikeid, a.usertype, a.birth_year, a.gender, b.availableDocks AS start_station_availableDocks, b.totalDocks AS start_station_totalDocks, b.availableBikes AS start_station_availableBikes, b.statusValue AS start_station_service_value FROM trip_details_stream a LEFT JOIN station_details_table b ON a.start_station_id=b.id
- Add the end station details with trip details in another topic similar to the start station.
- Extract the end time field as a long timestamp using the below commands:
CREATE STREAM citibike_trip_end_station_details WITH ( value_format='JSON' ) AS SELECT a.tripduration, a.stoptime, STRINGTOTIMESTAMP(a.stoptime, 'yyyy-MM-dd HH:mm:ss') AS stoptime_timestamp, a.end_station_id, a.end_station_name, a.end_station_latitude, a.end_station_longitude, a.bikeid, a.usertype, a.birth_year, a.gender, b.availableDocks AS end_station_availableDocks, b.totalDocks AS end_station_totalDocks, b.availableBikes AS end_station_availableBikes, b.statusValue AS end_station_service_value FROM trip_details_stream a LEFT JOIN station_details_table b ON a.end_station_id=b.id;
- Join the streamed trip details with the station details table, as KSQL does not allow joining of two streams or two tables.
To group data based on the station details and date, perform the following.
- Format the date as YYYY-MM-DD from the long timestamp to group by date in the start trip details using the below commands:
CREATE STREAM citibike_trip_start_station_details_with_date AS SELECT TIMESTAMPTOSTRING(startime_timestamp, 'yyyy-MM-dd') AS DATE, starttime, start_station_id, start_station_name FROM citibike_trip_start_station_details;
- Format the date as YYYY-MM-DD from the long timestamp to group by date in the end trip details using the below commands:
CREATE STREAM citibike_trip_end_station_details_with_date AS SELECT TIMESTAMPTOSTRING(stoptime_timestamp, 'yyyy-MM-dd') AS DATE, stoptime, end_station_id, end_station_name FROM citibike_trip_end_station_details;
- Create a table by grouping the data based on the date and the stations for finding the started trip counts and the ended trip counts of each station for the day using the below commands:
CREATE TABLE start_trip_count_by_stations AS SELECT DATE, start_station_id, start_station_name, COUNT(*) AS trip_count FROM citibike_trip_start_station_details_with_date GROUP BY DATE, start_station_name, start_station_id;
CREATE TABLE end_trip_count_by_stations AS SELECT DATE, end_station_id, end_station_name, COUNT(*) AS trip_count FROM citibike_trip_end_station_details_with_date GROUP BY DATE, end_station_name, end_station_id;
- List the topics to check whether they're created for persistent queries.
Producing Trip Details
To produce the trip details into the topic
trip-details using Scala, use the below commands:
./bin/kafka-console-consumer –bootstrap-server localhost:9092 –topic station-details –from-beginning
From the above console output, it is evident that a total of 727,664 messages are produced for data enrichment at the stream.
Let's see how to view trip details with station details and how to view the aggregate trip count of each station.
Viewing Trip Details With Station Details
To view the trip details with the station details, perform the following:
- Consume the message using the topic
CITIBIKE_TRIP_START_STATION_DETAILSto view the extra fields added to the trip details from the station details table and to extract the long timestamp field from the start and end times using the below commands:
/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic CITIBIKE_TRIP_START_STATION_DETAILS --from-beginning
- Consume the message using the topic
CITIBIKE_TRIP_END_STATION_DETAILSusing the below commands:
./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic CITIBIKE_TRIP_END_STATION_DETAILS --from-beginning
From the above console output, it is evident that the fields of the station details are added to the trip while producing the trip details.
Viewing Aggregate Trip Count of Each Station
To view the aggregate trip count of each station based on the date, perform the following:
- Consume the message via the console to check the trip counts obtained on the stream using the below commands:
./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic START_TRIP_COUNT_BY_STATIONS --from-beginning
From the above console output, it is evident that the trip counts are updated and added to the topic for each day when producing the message. This data can be filtered to the latest trip count in consumer for further analysis.
- Obtain the end trip count details based on the stations using the below commands:
./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic END_TRIP_COUNT_BY_STATIONS --from-beginning
In this article, we discussed adding extra fields from the station details table, extracting date in the YYYY-MM-DD format, and grouping the details based on the station ID and the day to get the start and end trip count details of the station.
Published at DZone with permission of Rathnadevi Manivannan . See the original article here.
Opinions expressed by DZone contributors are their own.