Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Use Apache Kafka to Transform a Batch Pipeline Into a Real-Time One, Part 2

DZone's Guide to

Use Apache Kafka to Transform a Batch Pipeline Into a Real-Time One, Part 2

In this post, we finish our discussion by examining aggreators in Kafka streams and how to expose your data to end users.

· Big Data Zone ·
Free Resource

Hortonworks Sandbox for HDP and HDF is your chance to get started on learning, developing, testing and trying out new features. Each download comes preconfigured with interactive tutorials, sample data and developments from the Apache community.

Welcome back! If you missed Part 1, you can check it out here.

3) Reviews Aggregator Kafka Streams

Target Architecture

Our third application is also a Kafka Streams application. It’s a stateful one so the state will be transparently stored in Kafka. From an external eye, it looks like the following:

Image title

KStream and KTables

In the previous section, we learned about the early concepts of Kafka Streams, to take a stream and split it in two based on a spam evaluation function. Now, we need to perform some stateful computations such as aggregations, windowing in order to compute statistics on our stream of reviews.

Thankfully we can use some pre-defined operators in the High-Level DSL that will transform a KStream into a KTable. A KTable is basically a table that gets new events every time a new element arrives in the upstream KStream. The KTable then has some level of logic to update itself. Any KTable updates can then be forwarded downstream. For a quick overview of KStream and KTable, I recommend the quickstart on the Kafka website.

Aggregation Key

In Kafka Streams, aggregations are always-key based, and our current stream messages have a null key. We want to aggregate over each course, therefore, we first have to re-key our stream (by course-id). Re-keying our stream aggregators in Kafka Stream is very easy if you look at the code:

KStream < String, Review > validReviews = builder.stream("udemy-reviews-valid").selectKey(((key, review) - > review.getCourse().getId()));

But you need to be aware of something. When you re-key a KStream, and chain that with some stateful aggregations (and we will), the Kafka Streams library will write the re-keyed stream back to Kafka and then read it again. That network round trip has to do with data distribution, parallelism, state storage, recovery, and it could be an expensive operation. So be efficient when you change the key of your stream!

Statistics Since Course Inception

Now, we profit! We have the guarantee that all the reviews belonging to one course will always go to the same Kafka Streams application instance. As our topic holds all the reviews since inception, we just need to create a KTable out of our stream and sink that somewhere.

// we build a long term topology (since inception)
KTable < String, CourseStatistic > longTermCourseStats = validReviews.groupByKey().aggregate(this::emptyStats, this::reviewAggregator, courseStatisticSpecificAvroSerde);
longTermCourseStats.toStream().to("long-term-stats");

Good things to note:

  • You need to define what the emptyStats() look like (course statistics with 0 reviews) — see the source code for such implementation.
  • You need to define how your stats change after a new review comes in (that’s your aggregator).
  • Each new review is seen as new data, not an update. The KTable has no recollection of past reviews. If you wanted to compute the statistics on updates as well, one could change the event format to capture “old” and “new” review state within one message.
  • You should make sure your source topic does not expire data! It’s a topic config. For this, you could either enable log compaction or set retention.ms to something like 100 years. As Jay Kreps (creator of Kafka, CEO of Confluent) wrote, it’s okay to store data in Kafka.

Statistics for the Past 90 Days

Here come the fun and funky part. When we are dealing with data streaming, most of the time a business application will only require us to analyze events over a time window. Some use cases include:

  • Am I under DDoS? (sudden peak of data)
  • Is a user spamming my forums? (high number of messages over a short period for a specific user-id)
  • How many users were active in the past hour?
  • How much financial risk does my company have at right now?

For us, this will be: "what is each course statistic over the past 90 days?"

Let’s note the aggregation computation is exactly the same. The only thing that changes over time is the data set we apply that aggregation onto. We want it to be recent (from the past 90 days), over a time window, and making sure that window advances every day. In Kafka Streams, it’s called a Hopping Window. You define how big the window is, and the size of the hop. Finally, to handle late arriving data, you define how long you’re willing to keep a window for:

// A hopping time window with a size of 91 days and an advance interval of 1 day. 
// the windows are aligned with epoch 
long windowSizeMs = TimeUnit.DAYS.toMillis(91);
long advanceMs = TimeUnit.DAYS.toMillis(1);
TimeWindows timeWindows = TimeWindows.of(windowSizeMs).advanceBy(advanceMs);

Please note that this will generate about 90 different windows at any time. We will only be interested in the first one.

We filter only for recent reviews (really helps speed up catching up with the stream), and we compute the course statistics over each time window:

KTable < Windowed < String > , CourseStatistic > windowedCourseStatisticKTable = validReviews.filter((k, review) - > !isReviewExpired(review)).groupByKey().aggregate(this::emptyStats, this::reviewAggregator, timeWindows, courseStatisticSpecificAvroSerde);

That operation can become a bit costly as we keep 90-time windows for each course, and only care about one specific window (the last one). Unfortunately we cannot perform aggregations on sliding windows (yet), but hopefully, that feature will appear soon! It is still good enough for our needs.

In the meantime, we need to filter to only get the window we’re interested in: it’s the window which ends after today and ends before tomorrow:

KStream < String, CourseStatistic > recentStats = windowedCourseStatisticKTable.toStream()
    // we keep the current window only
    .filter((window, courseStat) - > keepCurrentWindow(window)).selectKey((k, v) - > k.key());
// the course id 
recentStats.to("recent-stats");

And that’s it, we get a topic fully updated in real-time with the most recent stats for our course.

Running the App

Running the application is easy, you just start it like any other java application. We just first ensure the target topics are properly created:

$ kafka-topics --create --topic long-term-stats --partitions 3 --replication-factor 1 --zookeeper localhost:2181 $ kafka-topics --create --topic recent-stats --partitions 3 --replication-factor 1 --zookeeper localhost:2181

And then to run:

(from the root directory) $ mvn clean package 
$ java -jar udemy-reviews-aggregator/target/uber-udemy-reviews-aggregator-1.0-SNAPSHOT.jar

Feel free to fire up a few Avro consumers to see the results:

$ kafka-avro-console-consumer --topic recent-stats --bootstrap-server localhost:9092 --from-beginning 
$ kafka-avro-console-consumer --topic long-term-stats --bootstrap-server localhost:9092 --from-beginning

Results may include a stream of:

{“course_id”:1294188,”count_reviews”:51,”average_rating”:4.539} 
{“course_id”:1294188,”count_reviews”:52,”average_rating”:4.528} 
{“course_id”:1294188,”count_reviews”:53,”average_rating”:4.5}

We now have two topics that get a stream of updates for long-term and recent stats, which is pretty cool. By the way, this topic is a very good candidate for long compaction. We only really care about the last value for each course. Step 3: done.

Notes

Although the Kafka Streams syntax looks quite simple and natural to understand, a lot happened behind the scenes. Here are a few things to note:

  • Exactly Once: as we want that aggregation to be perfectly accurate, we need to enable exactly-once processing semantics (EOS). This feature appeared in 0.11, and the name stirred up a lot of debate. So, to make it short and clear, it means “effectively once,” and is exactly what we need (pun intended). That means no reviews will somehow be counted twice in case of broker, network, or application failure. Neat!
  • Incoming data format: as mentioned before, it’ll be awesome if the data had a “new” and an “old” field. This would allow to handle updates in reviews and compute the correct average in case of updates to a review.
  • Windowed aggregations: there is a massive performance hit to computing 90 windows only to discard them all and keep the last one. I have evaluated it and found it to be 25 times less efficient than using the (way more advanced) lower level API.
  • Lower Level API: using this API, you can create your own transformers and compute exactly what you need. In the source code, you can find how to do the recent statistics computations using that API, although I won’t discuss it in this post as it goes way beyond the already immense quantity of information I just threw at you.
  • Performance: these apps can be parallelized to the number of partitions in the incoming topic. It has horizontal scaling natively which is quite awesome. Kafka Streams in that regards makes it really easy to scale without maintaining some sort of back-end cluster.

…One last component!

4) Kafka Connect Sink — Exposing That Data Back to the Users

Eventually, all we care about is people browsing the Udemy website and visualizing the course statistics. As with most web services, serving information is often backed by some sort of database. For my example, I have chosen a relational database (PostgreSQL), but one could choose a NoSQL one like MongoDB, or a search index such as ElasticSearch. The possibilities are endless, and there exists Kafka Connect Sinks for pretty much any technology out there.

Image title

Kafka Connect

Kafka Connect is a framework upon which developers can create connectors. These connectors can be of two kinds: Source and Sink. Source are producers, Sink are consumers. The beautiful thing behind Kafka Connect is that it provides you infrastructure to run any connector. For an end user, running a connector is as easy as pushing configuration. Re-using other people’s work sounds like a dream, right? Well, that’s what Kafka Connect is about.

To learn about Kafka Connect in details, check out my Kafka Connect course

The JDBC Sink Connector

Here’s the good news: I’m not going to show you any more Java code. We’re not going to re-invent the wheel to put our topic data into a PostgreSQL table. Instead, we’re going to leverage a well written and battled tested Kafka connector by just pushing a bit of configuration.

We are using the excellent Kafka Connect JDBC Sink by Confluent. The configuration itself is dead simple:

name=SinkTopics connector.class=io.confluent.connect.jdbc.JdbcSinkConnector tasks.max=3 connection.url=jdbc:postgresql://localhost:5432/postgres connection.user=postgres connection.password=postgres insert.mode=upsert pk.mode=record_value pk.fields=course_id auto.create=true topics=recent-stats,long-term-stats key.converter=org.apache.kafka.connect.storage.StringConverter

Things to note:

  • tasks.max=3: that’s the level of parallelism of your connector. That means we will spin at most three tasks to read the input topics. You can increase that number to scale up, up to the number of partitions you’re reading from. That’s because any Kafka Connect Sink is behind the scene just a Kafka Consumer
  • key.converter: I chose to have my topics keyed by course-id exposed as a String. The default converter provided to the connect workers being Avro, it would throw an error if I didn’t override the key.converter. Hence, we use the simplistic StringConverter here.
  • You could deploy many connectors (more than one configuration) to a Kafka Connect Cluster. Benefits? Well, we could sink our topics in 10 different databases, 10 different technologies, to serve different purposes and applications in your organization, all from the same connect cluster. We could also extend the list of topics to sink so that some data scientists can perform some cool analysis on your fraud algorithm effectiveness for example.

Image title

Last but Not Least — Final Notes

As you have seen, the possibilities in Kafka are endless. The ecosystem is extremely large and there are tons of patterns and cool concepts to learn. The takeaways I want you to have today are the following:

  • Event sourcing in Kafka is awesome. Getting a stream of every event that has happened in your company ever could be a dream come true.
  • Kafka is an excellent candidate as a backbone for your microservices. Break down some complex flows into easy ones, and make each microservice perform its core capability at its best. If the fraud application improves, there would be no disruption to your other systems!
  • Use the Confluent Schema registry. Data is your first class citizen in Apache Kafka, and schemas make everyone’s life so much simpler. Documentation is embedded, parsing errors are virtually inexistent. You can even make your schema evolve over time, as long as you ensure they’re forward and backward compatible.
  • Leverage the right tools for each job. As you’ve seen, there was a mix of Producer, Streams, and Connect. I made the maximum effort in order not to re-invent the wheel. Take your time to evaluate solutions before diving right into your favorite technology.
  • Never stop learning. I have been using Kafka for over a year now, and I keep on learning every day. I also want to share my experience, so check out Apache Kafka for Beginners, Kafka Connect, Kafka Streams, Kafka Setup & Administration, Confluent Schema Registry & REST Proxy, Apache Kafka Security.
  • What this blog did not cover (and the range of stuff there’s yet to learn or write about): Kafka Consumers API, Kafka Admin Client, Kafka Streams Lower Level API, Kafka Streams joins to enrich data, Kafka Connect Source, Kafka Security, Kafka Monitoring, Kafka Setup and Administration, Kafka REST Proxy, KSQL, Zookeeper (and I might have forgotten other things). The ecosystem is huge.
  • KSQL is the future: Most, if not all, of the written Kafka Streams applications in this blog can be replaced by only a few KSQL statements as soon as it has official Avro support. It will open up stream processing to a much wider audience and enable the rapid migration of many batch SQL applications to Kafka. I plan on publishing a subsequent blog when I migrate the code to KSQL. Stay tuned!

Kafka is a fantastic piece of technology. I am convinced it will make all organizations thrive in flexibility and reactiveness. There is a ton to learn about Kafka, and I sincerely hope that through this blog, I have clearly exposed how to chain microservices in order to transform a batch pipeline into a real-time one.

Share, comment, give me feedback. I’d love to hear your thoughts! Thanks to Michael, Gwen, Cam, Octav and Eric for proofreading and providing improvements :)

Happy learning!

Hortonworks Community Connection (HCC) is an online collaboration destination for developers, DevOps, customers and partners to get answers to questions, collaborate on technical articles and share code examples from GitHub.  Join the discussion.

Topics:
big data ,apache kafka ,kafka streams

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}