Unbounded Stream Processing Using Apache Beam
Unbounded Stream Processing Using Apache Beam
Today, we are going to build a simple WordCount data pipeline using Apache Kafka for unbounded sources.
Join the DZone community and get the full member experience.Join For Free
From the last two weeks, I have been trying around Apache Beam API. I have read this excellent documentation provided by Beam and it helped me to understand the basics. I recommend readers go through it before we move ahead.
Today, we are going to build a simple WordCount data pipeline using Apache Kafka for unbounded sources. We could use any message broker for this application such as Google Pub/Sub and so on. Beam has a lot of built-in IO connectors for messaging. At the end of our pipeline, we will out the result to a text file.
You may also like: Making Data-Intensive Processing Efficient and Portable With Apache Beam
Before we jump into the code, we need to be aware of certain concepts of Streaming such as windowing, triggers, processing time vs Event time. I recommend reading this article Streamin 101 and Streaming 102 from Tyler Akidau.
Alright, let's go ahead and do the setup!
- Setup a Java environment. We are going to use Beam's Java API.
- Install Zookeeper and Apache Kafka. If you are too lazy to do that, go here (Don't worry about Yarn, just spin up Zookeeper and Kafka by using "bin/grid start all" and "bin/grid stop all" command.)
- Add $KAFKA_HOME variable to your .bashrc/.zshrc file and restart your terminal session.
- Clone this Git repo here
- Install Python
After things are wired up. We will create a Kafka Topic to push the message. Use below command:
We will use Beam's Direct runner as an execution engine for this example. Go the project home directory and execute this command to start the pipeline.
That's it! The pipeline job is listening to the Kafka topic and ready to process the data.
Execute this command to push the event to Kafka topic "messenger".
Let's Talk About Code Now!
In this example, we are going to count no. of words for a given window size (say 1-hour window). Windows in Beam are based on event-time i.e time derived from the message timestamp rather than system timestamp (processing timestamp). Based on the event time, the beam will put the message into an appropriate window.
When the window reaches the set watermark (Heuristic-based) i.e all the data is expected to have arrived into the system for a certain window, the window closes. Triggers provide flexible ways to kick-off the computation (more on this here) on accumulated events (inside window). In our case, We set to trigger after the window closes.
The remaining set of operation such as GroupBy and CountPerKey (map-reduce) can be performed on this result. The output of the map-reduce operation is persisted to a file partitioned by window timestamp.
Figure 1. Unbounded Stream processing on event time and windowing operation
Our main class looks something like this:
Event Time and Watermark
Watermark is a system notion to indicate when all the data in a certain window can be expected to have arrived in the system. Generally, watermarks are derived from the source system itself i.e Kafka consumer in our case. We have to configure the Kafka connector to use message event time for watermark generation instead of processing time which is the default.
In beam Kafka IO connector we can set this configuration using the withTimestampPolicyFactory method. Here we are providing custom policy to override the default behavior. The incorrect configuration would lead to no results being output from the pipeline which is hard to debug sometimes.
Another important part when processing events in event time is setting event timestamp for each message in a PCollection as shown below:
The window uses this timestamp to move events to appropriate windows.
You are likely to face this error message when you ignore to set the withTimestampPolicyFactory policy because we have set windows to use event time and Kafka connector is using processing time (default) which is always latest and the event time is millisecond old than processing time.
I hope you have learned how to process unbounded stream in event time using Beam and Kafka. Here I have skipped many parts of pipeline code such as encoders/decoders, serde for Kafka messages, Writing window per file and other PTransforms which are documented on Beam Javadocs. Please explore the git code on your leisure. Happy coding!
Opinions expressed by DZone contributors are their own.