Over a million developers have joined DZone.

Kafka Streams: Catching Data in the Act (Part 1)

DZone's Guide to

Kafka Streams: Catching Data in the Act (Part 1)

Data streams need to be considered in conjunction with data from the other sensors in order to compute a health metric for the process. We'll use Kafka to do just that.

· Big Data Zone ·
Free Resource

Hortonworks Sandbox for HDP and HDF is your chance to get started on learning, developing, testing and trying out new features. Each download comes preconfigured with interactive tutorials, sample data and developments from the Apache community.

I have been playing with Kafka on and off lately. It is an excellent addition to the ecosystem of big data tools where scale with reliability is imperative. I find it intuitive and conceptually simple (according to the KISS principle) and the focus is squarely on reliability at scale. Unlike the traditional messaging systems that attempt to do many things, Kafka offloads some tasks to the users, where they rightly belong anyway — like consuming messages. Kafka employs the dumb-broker-smart-consumer paradigm, whereas the traditional systems do the opposite. In Kafka, the users get full control over the consumption of messages (like re-processing older messages if they want to, for example) and Kafka gets to focus on message throughput, persistence, parallelism, etc. — it is a win-win. More importantly, it enables Kafka to natively enable complex stream processing capabilities, so Kafka now describes itself as a "distributed streaming platform" rather than a messaging pub-sub system.

The purpose of this post is illustration. We pick a simple, made-up (but intuitive) set of data streams generated by different sensors. Each stream of data on its own is inconclusive in assessing the overall health of the process. They need to be looked at in the aggregate, i.e. in conjunction with data from the other sensors in order to compute a health metric for the process. And we will use Kafka Streams to do just that.

Stream Processing

Stream processing allows for catching the data in the act, so to speak. This is as opposed to waiting to ingest that data on a back-end platform and then running health queries to check if all went well. If we do find in that case that it did not go well — too bad, it already happened. The best one can do is learn from it and update the processes so the situation doesn't repeat. When this is unacceptable, we need the means to analyze the data as it flows so we can either take a remedial action or at least alert — in real-time. Having this ability to generate preliminary alerts based on incoming data "as it happens" can be useful before a deep dive later on as needed. Kafka, as the data bus, has been coupled with tools such as Spark for stream processing. But this adds extra layers and clusters to the platform. If one is not already heavily invested in these other tools for stream processing, Kafka Streams would be a natural choice, as the data is already being fielded by Kafka, anyway. I do see people converting their apps to use Kafka Streams on that simple account.

A data stream is an unbounded, ever-increasing set of events, and as such, these data streams are routinely obtained in daily life. In many cases, there is a need for real-time analysis across multiple streams of data for critical decision-making. For example:

  • Health monitors. Often, there are multiple monitors measuring different aspects, and they need to be considered together, rather than each on their own.
  • Reaction chambers. Monitoring a reaction chamber for temperature, pressure, and concentrations of hazardous gases/liquids is essential for safe operation. Each feature has its own sensor throwing off a stream of data. As the physicochemical processes are coupled, these data streams should be evaluated together to diagnose unsafe conditions.
  • Leak detection. The upstream flow gets split across multiple downstream pipes in a dynamic fashion as the valves are operated. The total downstream flow should conserve, however, or else there is a leak.
  • Earthquake sensors. The shear forces during an earthquake distort structures, and the resulting displacement at critical junctions can lead to a complete collapse. If these junctions have been fitted with sensors capable of sending (and still functional during and after the shaking event!) their positional data, engineers can analyze them together to evaluate the overall strain being built up in the structure. We will look at a very simple use case of this in this post.

Image title

Figure 1: A simulated view of deformation as A, B, and C experience somewhat periodic movements. f is a random amplitude designed to create noise around the base signal. Cumulative metrics could be better indicators of stress than snapshot metrics.

Figure 1 shows three metal plates riveted at their ends forming an equilateral triangle with unit sides (area 0.433 and perimeter 3.0) on a plane. The sensors at A, B, and C continuously throw off positional data — the X and Y coordinates in a common reference frame. A pseudo-periodic movement with randomized amplitude f is simulated for the three vertices. Movement at the junctions causes stresses in the plates and in the supporting structure. Overall stress metrics i - 1 with predictive ability can be computed and used for alerting. With the index and referring to successive measurements, we have:



Process Topology

Converting the above scenario to a topology of data streams & processors is straightforward as shown in Figure 2.

Image title

Figure 2: As many source topic partitions as the number of sensors/vertices. Each processor uses a local store to recover state in case of failure. The triangle processor uses a time-binned event store to detect simultaneity.

The core elements of the topology are as follows.

  • Topics. There are two topics: rawVertex and smoothedVertex. The topic rawVertex has three partitions and smoothedVertex has one partition.
  • Source producers. There are three producers — A, B, and C — each generating positional data to a distinct partition of the topic rawVertex. Hence, there are as many partitions for the rawVertex topic as the number of vertices being followed — here, equal to 3. The value of the message is the measurement object consisting of the current coordinates, the timestamp, a count number etc.
  • Vertex processor. A stream task attached to a partition of a rawVertex topic sees all the positional data for that vertex and in the same order as it has been produced, ignoring data loss and data delay issues prior to the data making it to the partition. These issues are mitigated in the Kafka implementation of the producer by having no more than one in-flight record at a time and no retries. That is nice because the vertex processor can confidently compute time averages locally as it receives the measurements. The sensors are noisy, so the raw data stream is time-averaged over short periods (short enough to not mask any trends, however), producing messages for the smoothedVertex topic.
  • Vertex state store. The vertex processor as mentioned above will use a local persistent key-value store. It will have the most recent smoothed measurement for that vertex, along with any locally computable metrics such as the total displacement of the vertex. If the stream thread crashes and is restarted, it will get the most current state for this vertex from this local store.
  • Triangle processor. This is where the merged data streams from the individual vertex processors are used to identify a triangle as a function of time. We compute the various global stress metrics indicated in Figure 1 and make the call on whether to alert.
  • Triangle window store. For a triangle to be identified, we need the coordinates of A, B, and C at the same time. The concept of exact simultaneity is problematic when working across streams. Here, we employ Kafka's tumbling windows that bin the smoothedVertex data for A, B, and C into time windows. All measurements that were made (as per the time in the smoothedVertex data) during a window time-period fall into that bin. The size of this time window should be small enough that we can take it as an instant in time (perhaps the center of the window) but big enough that at least few items of smoothedVertex data from each of the vertex processors for A, B, and C can land in that bin. See the schematic below (the diagram 2A is essentially a reproduction of this image put up by good folks at Confluent!)

Image title

Figure 3: The window state store. (A) Flowing data is binned into time windows. The measurement time of a data point determines which bin it ends up in. This means that the points in a bin can be used to approximately identify the triangle at a point in time (perhaps the midpoint of the bin). (B) A window starts out empty and accumulates data as measurements flow in from the smoothedVertex topic. It reaches a maximum and stays there until the window is dropped after the retention time is reached. The retention time is chosen so that we can detect the plateauing of the data count in the bin and also have enough time to compute stress metrics for the triangle.

  • Triangle state store. The triangle processor needs a persistent state store, as well, to account for failure and restart. When a window plateaus, we compute the triangle state (current vertex locations, the lengths of the sides, perimeter, area, and other stress metrics).
  • Long-term storage. Periodically, the triangle processor dumps the state store data to long-term storage like Elasticsearch for deeper analytics.

Data Model: Avro Schemas

The disk is the life-blood of Kafka. Reading from and writing to disk is its bread and butter. Data is written to disk and pulled from disk as it moves on down from the source topics via intermediate topics to its long-term storage. So, the serialization of our raw and smoothed vertex objects as they are written to disk and their deserialization as they are read back into the code is important. Thankfully, we have the Avro schemas to define the data structures in and Kafka (via Confluent), providing built-in support for the serialization and deserialization of Avro objects.


The raw measurement from the sensor at a vertex is the value of the topic rawVertex. The key for the topic is the name of the sensor (A or B or C). and are the instantaneous values for the vertex location.

  "namespace": "com.xplordat.rtmonitoring.avro",
  "type": "record",
  "name": "RawVertex",
  "fields": [
    { "name": "sensor", "type": "string" },
    { "name": "timeInstant", "type": "long" },
    { "name": "X", "type": "double" },
    { "name": "Y", "type": "double" }


The vertex processor smooths the incoming measurements from the rawVertex topic over short periods of time. It saves the latest one to its local store and also pushes it onto the smoothedVertex topic. X and Y in these records are the time-averaged coordinates of this vertex/sensor over the period timeStart - timeEnd. The displacement distance of the vertex due to each successive measurement is accumulated in cumulativeDisplacementForThisSensor. If the vertex processor were to crash and be restarted, it has enough information in its local store to continue on from where it left off.

  "namespace": "com.xplordat.rtmonitoring.avro",
  "type": "record",
  "name" : "SmoothedVertex",
  "fields": [
    { "name": "sensor", "type": "string" },
    { "name": "timeStart", "type": "long" },
    { "name": "timeCenter", "type": "long" },
    { "name": "timeEnd", "type": "long" },
    { "name": "X", "type": "double" },
    { "name": "Y", "type": "double" },
    { "name": "numberOfSamplesIntheAverage", "type": "long" },
    { "name": "cumulativeDisplacementForThisSensor", "type": "double" },
    { "name": "totalNumberOfRawMeasurements", "type": "long" },
    { "name": "pushCount", "type": "long" }


The triangle processor is where the rubber meets the road. The evolution of the triangle and the stress metrics are computed and saved to a local store and purged to long-term storage periodically. The model for the triangle will have the usual suspects like the vertex locations, sides, area, etc. and the cumulative displacement of vertices over the entire time. On what basis to alert on is not the focus here: we can make it up.

  "namespace": "com.xplordat.rtmonitoring.avro",
  "type": "record",
  "name": "Triangle",
  "fields" : [
    { "name": "timeStart", "type": "long" },
    { "name": "timeInstant", "type": "long" },
    { "name": "timeEnd", "type": "long" },
    { "name": "numberOfAMeasurements", "type": "int" },
    { "name": "numberOfBMeasurements", "type": "int" },
    { "name": "numberOfCMeasurements", "type": "int" },
    { "name": "xA", "type": "double" },
    { "name": "yA", "type": "double" },
    { "name": "xB", "type": "double" },
    { "name": "yB", "type": "double" },
    { "name": "xC", "type": "double" },
    { "name": "yC", "type": "double" },
    { "name": "AB", "type": "double" },
    { "name": "BC", "type": "double" },
    { "name": "AC", "type": "double" },
    { "name": "perimeter", "type": "double" },
    { "name": "area", "type": "double" },
    { "name": "totalDisplacement", "type": "double" }


The core pieces of the problem and the solution approach have been laid out here. What is left is the implementation and a walk-thru of code snippets as needed to make the points. As this post is already getting a bit long, we will take it up in the next installment.

Hortonworks Community Connection (HCC) is an online collaboration destination for developers, DevOps, customers and partners to get answers to questions, collaborate on technical articles and share code examples from GitHub.  Join the discussion.

big data ,apache kafka ,tutorial ,kafka streams ,stream processing ,avro

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}