Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Custom Partitioning and Analysis Using Kafka SQL Windowing

DZone's Guide to

Custom Partitioning and Analysis Using Kafka SQL Windowing

Learn how to partition Citi Bike trip data based on user type using the Custom partitioning technique and analyze trip details at stream using Kafka SQL Windowing.

· Big Data Zone ·
Free Resource

Hortonworks Sandbox for HDP and HDF is your chance to get started on learning, developing, testing and trying out new features. Each download comes preconfigured with interactive tutorials, sample data and developments from the Apache community.

Apache Kafka uses a round-robin technique to produce messages to multiple partitions. The Custom partitioning technique is used to produce a particular type of message in the defined partition and to make the produced message to be consumed by a particular consumer. This technique allows us to take control over produced messages. Windowing allows event-time-driven analysis and data grouping based on time limits. The three different types of windowing are Tumbling, Session, and Hopping.

In this article, we will discuss processing Citi Bike trip data in the following ways:

  • Partitioning trip data based on user type using the Custom partitioning technique.
  • Analyzing trip details at stream using Kafka SQL Windowing.

Prerequisites

Install the following:

  • Scala
  • Java
  • Kafka
  • Confluent
  • KSQL

Data Description

The trip dataset of Citi Bike March 2017 is used as the source data. It contains basic details such as trip duration, start time, stop time, station name, station ID, station latitude, and station longitude.

Sample dataset:select

Use Case

  • Process Citi Bike trip data to two different brokers by partitioning the messages according to user types (subscriber or customer).
  • Use Kafka SQL Windowing concepts to analyze the following details:
    • Number of trips started at particular time limits using Tumbling Window.
    • Number of trips started using advanced time intervals using Hopping Window.
    • Number of trips started with session intervals using Session Window.

Synopsis

  • Set up Kafka cluster.
  • Produce and consume trip details using Custom partitioning.
  • Create trip data stream.
  • Perform streaming analytics using Window Tumbling.
  • Perform streaming analytics using Window Session.
  • Perform streaming analytics using Window Hopping.

Setting Up Kafka Cluster

To set up the cluster on the same server by changing the ports of the brokers in the cluster, perform the following steps:

  • Run ZooKeeper on default port 2181. The ZooKeeper data will be stored by default in /tmp/data.
  • Change the default path (/tmp/data) to another path with enough space for non-disrupted producing and consuming.
  • Edit the ZooKeeper configurations in zookeeper.properties file available in the confluent base path etc/kafka/zookeeper.properties, as shown in the below diagram:

select

  • Start ZooKeeper using the following command:
./bin/zookeeper-server-start etc/kafka/zookeeper.properties

You can view the below ZooKeeper startup screen below:select

  • Start the first broker in the cluster by running the default Kafka broker in port 9092 and setting the broker ID as 0. The default log path is /tmp/kafka-logs.
  • Edit the default log path (/tmp/kafka-logs) for starting the first broker in the server.properties file available in the Confluent base path vi etc/kafka/server.properties.

select

  • Start the broker using the following command:
./bin/kafka-server-start etc/kafka/server.properties

 You can view the first broker startup with broker ID 0 and port 9092:select

  • Start the second broker in the cluster by copying server.properties as server1.properties under etc/kafka/ for configuring the second broker in cluster.
  • Edit server1.properties vi etc/kafka/server1.properties.

select

  • Start the broker using the following command:
./bin/kafka-server-start etc/kafka/server1.properties

You can view the second broker startup with broker ID 1 and port 9093:select

  • List the brokers available in the cluster using the following command:
./bin/zookeeper-shell localhost:2181 ls /brokers/ids

You can view the brokers available in the cluster as shown in the below diagram:

selectIn the above case, two brokers are started on the same node. If the brokers are available in different nodes, parallel message processing can be made faster and the memory issue can be resolved when a large number of messages are produced by sharing the messages in different nodes memory.

Producing and Consuming Trip Details Using Custom Partitioning

To produce and consume trip details using Custom partitioning, perform the following steps:

  • Create a topic trip-data with two partitions using the following command:
./bin/kafka-topics --create --zookeeper localhost:2181 --topic trip-data --replication-factor 1 --partitions 1

select

  • Describe the topic to view the leaders of partitions created.

You can see broker 0 responsible for partition 0 and broker 1 responsible for partition 1 for message transfer as shown in the below diagramselect

  • Use the Custom partitioner technique to produce messages.
  • Create a CustomPartitioner class by overriding the partitioner interface using the below commands:

override def partition(topic: String, key: Any, keyBytes: Array[Byte], value: Any, valueBytes: Array[Byte], cluster: Cluster): Int = {
 var partition = 0
 val keyInt = Integer.parseInt(key.asInstanceOf[String])
 val tripData = value.asInstanceOf[String]
 //Gets the UserType from the message produced
 val userType = tripData.split(",")(12)
 //Assigns the partitions to the messages based on the user types
 if ("Subscriber".equalsIgnoreCase(userType)) {
  partition = 0;
 } else if ("Customer".equalsIgnoreCase(userType)) {
  partition = 1;
 }
 println("Partition for message " + value + " is " + partition)
 partition
}

You can view the Subscriber user type messages produced into partition 0 and the Customer user type messages turned to partition 1.

  • Define the CustomPartitioner class in the producer properties, as shown below:
//Splits messages to particular partitions
props.put("partitioner.class", "com.treselle.core.CustomPartitioner");
  • Define the partitions to the topic in the consumer by assigning different partitions to the consumers as shown below:
val topicPartition = new TopicPartition(TOPIC,partition)
consumer.assign(Collections.singletonList(topicPartition))
  • Pass the partition as input in arguments in the consumer when running multiple consumers with each consumer listening to different partitions.
  • Start multiple consumers with different partitions.
  • Start Consumer1 using the below command:
java -cp custom_partitioner.jar com.treselle.core.ConsumerBasedOnPartition trip-data localhost:9092 0
  • Start Consumer2 using the below command:
java -cp custom_partitioner.jar com.treselle.core.ConsumerBasedOnPartition trip-data localhost:9092 1
  • Produce the trip details by defining the Custom partitioner using the below command:
java –cp custom_partitioner.jar com.treselle.core.CustomPartionedProducer trip-data localhost:9092

You can view consumer 1 consuming only Subscriber messages from Partition 0 and consumer 2 consuming only Customer messages from partition 1.

Consumer1:select

Consumer2:select





  • Check the memory of the brokers after consuming all the messages in both consumers. The memory shared between the brokers and the memory of the brokers’ logs can be viewed in the below diagram:

select

Here, the Customer messages are consumed by broker localhost:9092 and Subscriber messages are consumed by broker localhost:9093. As the Customer messages are fewer, less memory is occupied in kafka-logs (localhost:9092).

Creating Trip Data Stream

In KSQL, there is no option to consume messages based on partitions. Messages are consumed from all the partitions in the given topic for stream or table creation. To create a trip data stream, perform the following steps:

  • Separate the Subscriber and Customer data using conditions for Window processing.
  • Create trip_data_stream with columns in trip data produced using the following command:
CREATE STREAM
trip_data_stream
(
tripduration BIGINT,
starttime VARCHAR,
stoptime VARCHAR,
start_station_id BIGINT,
start_station_name VARCHAR,
start_station_latitude DOUBLE,
start_station_longitude DOUBLE,
end_station_id BIGINT,
end_station_name VARCHAR,
end_station_latitude DOUBLE,
end_station_longitude DOUBLE,
bikeid INT,usertype VARCHAR,
birth_year VARCHAR,
gender VARCHAR
)
WITH
(
kafka_topic='trip-data',
value_format='DELIMITED'
);
  • Extract Unix TIMESTAMP for Windowing using the start times of trips.
  • Set the extracted start time Unix TIMESTAMP as a property of the stream for Windowing using the start times of trips instead of the message produced time.
  • Create the stream with extracted Unix TIMESTAMP and the subscriber messages for finding the trip details of the subscribers using the below command:
CREATE STREAM
subscribers_trip_data_stream
WITH
(
TIMESTAMP='startime_timestamp',
PARTITIONS=2
) AS
select
STRINGTOTIMESTAMP(starttime, 'yyyy-MM-dd HH:mm:ss') AS startime_timestamp,
tripduration,
starttime,
usertype
FROM TRIP_DATA_STREAM
where usertype='Subscriber';

Performing Streaming Analytics Using Window Tumbling

Window Tumbling groups the data in the given interval into non-overlapping, fixed-size Windows. It is used in anomaly detection of the stream on a certain time interval. For example, consider tumbling with a time interval of five minutes.

select

To find the number of trips started by Subscribers at the interval of five minutes, execute the following command:

SELECT
COUNT(*),
starttime
FROM subscribers_trip_data_stream
WINDOW TUMBLING (SIZE 5 MINUTE)
GROUP BY usertype;

select

From the above result, it is evident that 19 trips have been started at the end of the fourth minute, 25 trips have been started at the end of the ninth minute, and 26 strips have been started at the end of the 14th minute. Thus, the started trips are counted at each given interval of time.

Performing Streaming Analytics Using Window Session

In the Window session, data is grouped in a particular session. For example, when a session 1 minute is set and if data is not available in the interval of one minute, then a new session is started for grouping the data. For example, consider a session of one minute working as stated in the following diagram:

select

To group start the trip details of the subscribers in the particular session, set the session interval as 20 seconds using the below command:

SELECT
count(*),
starttime
FROM subscribers_trip_data_stream
WINDOW SESSION (20 SECOND)
GROUP BY usertype;

select

From the above diagram, it is evident that the data grouping is made in the particular interval session. When the data is not available in 20-second intervals, a new session is started for grouping the data.

For example, consider the time interval between 00:01:09 and 00:01:57. At an interval between 00:01:09 and 00:01:33, you can view no time difference of 20 seconds or more than that. So, trip counts are incremented. At an interval between 00:01:33 and 00:01:57, you can view an inactivity gap of more than 20 seconds. So, a new session is started from the 57th second.

Performing Streaming Analytics Using Window Hopping

In Window Hopping, data are grouped in a given time interval into overlapping Windows by advancing to the given interval of time. For example, consider a five-minute interval with an advanced interval of one minute working as shown in the below diagram:select

To group start the trip details in the interval of five minutes advanced by one minute, execute the following command for Hopping Window analysis:

SELECT
count(*),
starttime
FROM subscribers_trip_data_stream
WINDOW HOPPING (SIZE 5 MINUTE, ADVANCE BY 1 MINUTE)
GROUP BY usertype;

select

From the above diagram, it is evident that five entries for each record are consumed in an interval of five minutes’ size and advanced by one minute. Entry size varies based on the interval size and advanced interval given.

In the above example, consider a 00:02:12 time record scenario to check the working of the Hopping with five minutes and advanced one-minute size given. The 00:02:12 scenario has five entries with trip counts 7,7,7,6,1. In two minutes, only two advances of one minute are made for the first three entries. The 00:00:00 to 00:02:12 time interval has seven trips started. The fourth entry made an advance of one minute. The 00:01:00 to 00:02:12 time interval has six trips and the fifth entry made another advance of one minute. So, the trip considered from 00:02:00 to 00:02:12 has only one trip.

References

Hortonworks Community Connection (HCC) is an online collaboration destination for developers, DevOps, customers and partners to get answers to questions, collaborate on technical articles and share code examples from GitHub.  Join the discussion.

Topics:
partitioning ,big data ,tutorial ,data analytics ,kafka ,sql ,windowing functions

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}