DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • How To Get Closer to Consistency in Microservice Architecture
  • Change Data Captures CDC from MySQL Database to Kafka with Kafka Connect and Debezium
  • The Foundations for Building an Apache Flink Application
  • How to Design Event Streams, Part 1

Trending

  • Teradata Performance and Skew Prevention Tips
  • A Guide to Container Runtimes
  • How to Build Scalable Mobile Apps With React Native: A Step-by-Step Guide
  • How to Format Articles for DZone
  1. DZone
  2. Testing, Deployment, and Maintenance
  3. DevOps and CI/CD
  4. Unbounded Stream Processing Using Apache Beam

Unbounded Stream Processing Using Apache Beam

Today, we are going to build a simple WordCount data pipeline using Apache Kafka for unbounded sources.

By 
Sunil pandith user avatar
Sunil pandith
·
Mar. 09, 20 · Tutorial
Likes (3)
Comment
Save
Tweet
Share
15.7K Views

Join the DZone community and get the full member experience.

Join For Free

From the last two weeks, I have been trying around Apache Beam API. I have read this excellent documentation provided by Beam and it helped me to understand the basics. I recommend readers go through it before we move ahead.

Introduction

Today, we are going to build a simple WordCount data pipeline using Apache Kafka for unbounded sources. We could use any message broker for this application such as Google Pub/Sub and so on. Beam has a lot of built-in IO connectors for messaging. At the end of our pipeline, we will out the result to a text file.

Before we jump into the code, we need to be aware of certain concepts of Streaming such as windowing, triggers, processing time vs Event time. I recommend reading this article Streamin 101 and Streaming 102 from Tyler Akidau.

Alright, let's go ahead and do the setup!

Setup

  1. Setup a Java environment. We are going to use Beam's Java API.
  2. Install Zookeeper and Apache Kafka. If you are too lazy to do that, go here (Don't worry about Yarn, just spin up Zookeeper and Kafka by using "bin/grid start all" and "bin/grid stop all" command.)
  3. Add $KAFKA_HOME variable to your .bashrc/.zshrc file and restart your terminal session.
  4. Shell
     




    xxxxxxxxxx
    1


     
    1
    export $KAFKA_HOME=<install folder>/hello-samza/deploy/kafka


  5. Clone this Git repo here
  6. Install Python

After things are wired up. We will create a Kafka Topic to push the message. Use below command:

Shell
 




xxxxxxxxxx
1


 
1
# create topic by name "messenger"
2
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic messenger
3
# to list the existing topic
4
./bin/kafka-topics.sh  --list --zookeeper localhost:2181



We will use Beam's Direct runner as an execution engine for this example. Go the project home directory and execute this command to start the pipeline.

Shell
 




xxxxxxxxxx
1


 
1
mvn compile exec:java -Dexec.mainClass=com.sunil.WindowedWordCount -Pdirect-runner -Dexec.args="--output=<output folder>"



That's it! The pipeline job is listening to the Kafka topic and ready to process the data.

Execute this command to push the event to Kafka topic "messenger".

Shell
 




xxxxxxxxxx
1


 
1
cd scripts
2
python3 GenMessage.py <name> <message<optional>> <epoch time in ms<optional>>
3
# example: python3 GenMessage.py sunil
4
# {"name":"sunil","message": "This message is from sunil","ets": 1580054229156}


Let's Talk About Code Now!

In this example, we are going to count no. of words for a given window size (say 1-hour window). Windows in Beam are based on event-time i.e time derived from the message timestamp rather than system timestamp (processing timestamp). Based on the event time, the beam will put the message into an appropriate window.

When the window reaches the set watermark (Heuristic-based) i.e all the data is expected to have arrived into the system for a certain window, the window closes. Triggers provide flexible ways to kick-off the computation (more on this here) on accumulated events (inside window). In our case, We set to trigger after the window closes.

The remaining set of operation such as GroupBy and CountPerKey (map-reduce) can be performed on this result. The output of the map-reduce operation is persisted to a file partitioned by window timestamp.

Figure 1. Unbounded Stream processing on event time and windowing operation

Our main class looks something like this:

Java
 




xxxxxxxxxx
1
46


 
1
public class WindowedWordCount {
2
 
          
3
    static void runWithOptions(WindowedWordCountOptions options) {
4
        Pipeline pipeline = Pipeline.create(options);
5
        Duration WINDOW_TIME = Duration.standardMinutes(1);
6
        Duration ALLOWED_LATENESS = Duration.standardMinutes(1);
7
 
          
8
        CoderRegistry cr = pipeline.getCoderRegistry();
9
        cr.registerCoderForClass(Record.class, new RecordSerializableCoder());
10
 
          
11
 
          
12
        pipeline.apply(
13
                KafkaIO.<Long, Record>read()
14
                        .withBootstrapServers(options.getBootstrap())
15
                        .withTopic(options.getInputTopic())
16
                        .withKeyDeserializer(LongDeserializer.class)
17
                        .withValueDeserializer(RecordDeserializer.class)
18
                        .withTimestampPolicyFactory((tp, previousWaterMark) -> new CustomFieldTimePolicy(previousWaterMark))
19
                        .withoutMetadata()
20
        )
21
                .apply(Values.<Record>create())
22
                .apply("append event time for PCollection records", WithTimestamps.of((Record rec) -> new Instant(rec.getEts())))
23
                .apply("extract message string", MapElements
24
                        .into(TypeDescriptors.strings())
25
                        .via(Record::getMessage))
26
                .apply("apply window", Window.<String>into(FixedWindows.of(WINDOW_TIME))
27
                        .withAllowedLateness(ALLOWED_LATENESS)
28
                        .triggering(AfterWatermark.pastEndOfWindow())
29
                        .accumulatingFiredPanes()
30
                )
31
                .apply("count words", new CountWords())
32
                .apply("format result to String",MapElements
33
                        .into(TypeDescriptors.strings())
34
                        .via((KV<String, Long> rec) -> rec.getKey() + ":" + rec.getValue()))
35
                .apply("Write it to a text file", new WriteOneFilePerWindow(options.getOutput()));
36
 
          
37
 
          
38
        pipeline.run();
39
    }
40
 
          
41
    public static void main(String[] args) {
42
        WindowedWordCountOptions options = PipelineOptionsFactory.fromArgs(args).as(WindowedWordCountOptions.class);
43
        options.setStreaming(true);
44
        runWithOptions(options);
45
    }
46
}



Event Time and Watermark

Watermark is a system notion to indicate when all the data in a certain window can be expected to have arrived in the system. Generally, watermarks are derived from the source system itself i.e Kafka consumer in our case. We have to configure the Kafka connector to use message event time for watermark generation instead of processing time which is the default.

In beam Kafka IO connector we can set this configuration using the withTimestampPolicyFactory method. Here we are providing custom policy to override the default behavior. The incorrect configuration would lead to no results being output from the pipeline which is hard to debug sometimes.

Java
 




xxxxxxxxxx
1
24


 
1
/**
2
 * Custom TimestampPolicy for Kafka source to manage timestamp and watermark when it pulls data from broker
3
 */
4
public class CustomFieldTimePolicy extends TimestampPolicy<Long, Record> {
5
 
          
6
 
          
7
    protected Instant currentWatermark;
8
 
          
9
    public CustomFieldTimePolicy(Optional<Instant> previousWatermark) {
10
        currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
11
    }
12
 
          
13
 
          
14
    @Override
15
    public Instant getTimestampForRecord(PartitionContext ctx, KafkaRecord<Long, Record> record) {
16
        currentWatermark = new Instant(record.getKV().getValue().getTimestamp());
17
        return currentWatermark;
18
    }
19
 
          
20
    @Override
21
    public Instant getWatermark(PartitionContext ctx) {
22
        return currentWatermark;
23
    }
24
}



Another important part when processing events in event time is setting event timestamp for each message in a PCollection as shown below:

Java
 




xxxxxxxxxx
1


 
1
... ...
2
.apply("append event time for PCollection records", WithTimestamps.of((Record rec) -> new Instant(rec.getEts())))
3
... ...



The window uses this timestamp to move events to appropriate windows.

You are likely to face this error message when you ignore to set the withTimestampPolicyFactory policy because we have set windows to use event time and Kafka connector is using processing time (default) which is always latest and the event time is millisecond old than processing time.

Conclusion

I hope you have learned how to process unbounded stream in event time using Beam and Kafka. Here I have skipped many parts of pipeline code such as encoders/decoders, serde for Kafka messages, Writing window per file and other PTransforms which are documented on Beam Javadocs. Please explore the git code on your leisure. Happy coding!


Further Reading

Real-Time Stream Processing With Apache Kafka Part One

An Introduction to Stream Processing With Pulsar Function

Stream processing kafka Apache Beam Event Pipeline (software) Java (programming language) Git Data (computing)

Opinions expressed by DZone contributors are their own.

Related

  • How To Get Closer to Consistency in Microservice Architecture
  • Change Data Captures CDC from MySQL Database to Kafka with Kafka Connect and Debezium
  • The Foundations for Building an Apache Flink Application
  • How to Design Event Streams, Part 1

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!