{{announcement.body}}
{{announcement.title}}

Unbounded Stream Processing Using Apache Beam

DZone 's Guide to

Unbounded Stream Processing Using Apache Beam

Today, we are going to build a simple WordCount data pipeline using Apache Kafka for unbounded sources.

· Big Data Zone ·
Free Resource

From the last two weeks, I have been trying around Apache Beam API. I have read this excellent documentation provided by Beam and it helped me to understand the basics. I recommend readers go through it before we move ahead.

Introduction

Today, we are going to build a simple WordCount data pipeline using Apache Kafka for unbounded sources. We could use any message broker for this application such as Google Pub/Sub and so on. Beam has a lot of built-in IO connectors for messaging. At the end of our pipeline, we will out the result to a text file.

You may also like: Making Data-Intensive Processing Efficient and Portable With Apache Beam

Before we jump into the code, we need to be aware of certain concepts of Streaming such as windowing, triggers, processing time vs Event time. I recommend reading this article Streamin 101 and Streaming 102 from Tyler Akidau.

Alright, let's go ahead and do the setup!

Setup

  1. Setup a Java environment. We are going to use Beam's Java API.
  2. Install Zookeeper and Apache Kafka. If you are too lazy to do that, go here (Don't worry about Yarn, just spin up Zookeeper and Kafka by using "bin/grid start all" and "bin/grid stop all" command.)
  3. Add $KAFKA_HOME variable to your .bashrc/.zshrc file and restart your terminal session.
  4. Shell
     






  5. Clone this Git repo here
  6. Install Python

After things are wired up. We will create a Kafka Topic to push the message. Use below command:

Shell
 







We will use Beam's Direct runner as an execution engine for this example. Go the project home directory and execute this command to start the pipeline.

Shell
 







That's it! The pipeline job is listening to the Kafka topic and ready to process the data.

Execute this command to push the event to Kafka topic "messenger".

Shell
 






Let's Talk About Code Now!

In this example, we are going to count no. of words for a given window size (say 1-hour window). Windows in Beam are based on event-time i.e time derived from the message timestamp rather than system timestamp (processing timestamp). Based on the event time, the beam will put the message into an appropriate window.

When the window reaches the set watermark (Heuristic-based) i.e all the data is expected to have arrived into the system for a certain window, the window closes. Triggers provide flexible ways to kick-off the computation (more on this here) on accumulated events (inside window). In our case, We set to trigger after the window closes.

The remaining set of operation such as GroupBy and CountPerKey (map-reduce) can be performed on this result. The output of the map-reduce operation is persisted to a file partitioned by window timestamp.

Figure 1. Unbounded Stream processing on event time and windowing operation

Our main class looks something like this:

Java
 







Event Time and Watermark

Watermark is a system notion to indicate when all the data in a certain window can be expected to have arrived in the system. Generally, watermarks are derived from the source system itself i.e Kafka consumer in our case. We have to configure the Kafka connector to use message event time for watermark generation instead of processing time which is the default.

In beam Kafka IO connector we can set this configuration using the withTimestampPolicyFactory method. Here we are providing custom policy to override the default behavior. The incorrect configuration would lead to no results being output from the pipeline which is hard to debug sometimes.

Java
 







Another important part when processing events in event time is setting event timestamp for each message in a PCollection as shown below:

Java
 







The window uses this timestamp to move events to appropriate windows.

You are likely to face this error message when you ignore to set the withTimestampPolicyFactory policy because we have set windows to use event time and Kafka connector is using processing time (default) which is always latest and the event time is millisecond old than processing time.

Conclusion

I hope you have learned how to process unbounded stream in event time using Beam and Kafka. Here I have skipped many parts of pipeline code such as encoders/decoders, serde for Kafka messages, Writing window per file and other PTransforms which are documented on Beam Javadocs. Please explore the git code on your leisure. Happy coding!


Further Reading

Making Sense of Stream Processing

Real-Time Stream Processing With Apache Kafka Part One

An Introduction to Stream Processing With Pulsar Function

Topics:
apache beam ,big data ,stream processing ,tutorial

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}