Anomaly Detection With Kafka Streams
Anomaly Detection With Kafka Streams
Learn how to perform anomaly detection using Kafka Streams with an example of a loan payment website that needs to send an alert if the payment is too high.
Join the DZone community and get the full member experience.Join For Free
Hortonworks Sandbox for HDP and HDF is your chance to get started on learning, developing, testing and trying out new features. Each download comes preconfigured with interactive tutorials, sample data and developments from the Apache community.
Apache Kafka is being adopted by every IT organization. Kafka is used as:
The backbone of event infrastructure
A streaming platform
A platform that can move data from real-time to offline analytics systems/platforms
An event/command source back-end
A platform to store all the data that can be converted to specific purpose materialized views
Many distributed engines such as Apache Spark, Apache Flink, etc. are connected to Kafka topics/streams for data intake and to spit out computation output. Kafka architecture supports low latency, high scale, fault tolerance, replayability, change data capture semantics, and compacted topics.
If you are getting started with Kafka, please refer to this article to get familiar with Kafka broker and Kafka clients and message delivery semantics. Note that Kafka version 11 has introduced exactly once semantics in the producer client library.
Kafka streams is a set of libraries that is introduced in Kafka versions 10+. Key benefits of using Kafka streams library are:
- The client of the Streams application connects to one or many Kafka topics to read data in a streaming fashion to perform aggregation, CEP, and anomaly detection. We'll talk more below about other use cases that can be implemented using the Kafka Streams library.
- The core logic in the compute layer can be implemented in a programming language such as Java using Kafka Streams DSL or the Processor API. Kafka also has introduced a new feature called KSQL in which SQL syntax can be used to implement core logic.
- There's no need for any additional cluster (like Apache Spark or Apache Flink cluster) to manage the scale and resiliency needs for the aggregation logic. The compute layer can be simply horizontally scaled up by adding more processing nodes. Kafka clusters and the Streams library take care resiliency and scaling needs. We'll learn more about this in the section titled Kafka Streams Architecture Explained.
- The client/consumer code can also store any intermediate state during the aggregation logic calculation. The Kafka Streams library manages the state transparently without application developers coding for it. It is stored in an in-memory store (by default, RocksDB) and backed up by a compacted topic. And in case of processing node failure, the state is automatically failed over to next available node and starts processing from where it left off. Again, we'll learn more about this in the section titled Kafka Streams Architecture Explained.
- Systems can be further scaled up by Kafka topic partitions to accommodate high throughput of messages.
The Kafka Streams library has great support for event-time, process-time, ingestion-time, watermarks, and late event binding semantics. Refer to the Reference section for these details.
- Kafka streams library has first-class support for unit and integration tests on the developer machine.
This article and accompanying reference application source code showcase how to use the Kafka Streams library to build scalable and resilient real-time monitoring solutions or complex event processing (CEP) engines to detect anomalies.
The problem is in loan payment domain, where customers have taken a loan and they need to make monthly payments to repay the loan amount.
Assume there are millions of customers in the system and all these customers need to make monthly payments to their account. Each customer may have a different monthly due date depending on their monthly loan due date.
Each customer payment will appear as a PaymentScheduleEvent event. Customers can make more than one PaymentScheduleEvent per month. Each monthly due date for a customer will appear as a PaymentDueEvent.
An arbitrarily chosen anomaly condition for this example is that if the amount due is more than $150 for any customer at any point in time, this generates an anomaly.
The above diagram shows how various events appear across space and time for two customer accounts. Red events are for customers with account #1 and blue events are for customers with account #2. Imagine that there are millions of possible customer events within a month.
As one can imagine, the events can appear at any point in time, the events can come in any order, and there can be thousands of events at any point in time. The anomaly detector needs to group the similar events that belong to a customer and apply the appropriate logic to these grouped events to determine the impact of these events on a customer at a point in time in a streaming fashion.
The anomaly detector should generate anomaly on a per-event and per-customer basis. The anomaly condition is that if an account has more than a $150 payment due, then anomaly needs to be generated. The anomaly should be recalculated for only the affected customers based on the arrival of a new event for that customer. It should not recalculate for any other customers. This way, the solution is scalable, does not waste the CPU cycle, and is simple to reason about and maintain.
How to Run the Sample Application
Start Kafka and ZooKeeper using Confluent downloads. Turn on the feature that allows topic to be dynamically created, as the sample depends on that feature. The best way to install it is by taking it from the Confluent platform. Install latest confluent version platform (as of this writing, 3.3.0) by following these instructions.
Start the needed component by following these instructions. Make sure Kafka is running on localhost:9092. If it is a different port, change it in the
Start Kafka and ZooKeeper by executing these steps:
Startup.java. This class will create two topics (schedule payment event and due event), send events, and create a stream that looks for the anomaly condition. (Remember that the anomaly condition is: If an account has more than a $150 payment due, then an anomaly needs to be generated.)
After running the application, wait 12 seconds. The log will show the sending of all the events and two anomalies being generated.
Refer to the next section to learn more about additional notes and about the Kafka Stream library.
To run the unit testing code, run KafkaStreamTest.
Kafka Streams Architecture Explained
This section explains the key capabilities of Kafka streams architecture.
Support for Building Scalable System With Scalable Compute Nodes Without a Compute Cluster
A solution can scale up by adding horizontal stateless compute nodes developed in Java, Spring, and other programming languages without needing an additional compute cluster.
Kafka and its streaming library used in the compute layer scale the overall solution up to the number of available compute nodes.
There is no need for another compute cluster such as Spark, Flink, etc. The Kafka cluster comes with its own resource manager, load balancer, health monitors, etc. All those components are at play and manage the stateless compute layer.
Support for Building Resilient Systems
The compute layer node uses Kafka Stream library and it comes with a transparent RocksDB store to store the local state. This state is seamlessly backed up in a Kafka compacted topic.
Various aggregate results/states that are in the RocksDB (again, transparently stored without using the RocksDB API), and this state is automatically migrated over to another compute node by Kafka in case of a compute node failure. The event store can be queried back as needed.
Kafka also provides a REST API to query all the nodes in the compute layer. The host and port number of these nodes are exchanged automatically among the nodes by Kafka.
Performing Unit/Integration Tests From a Developer Machine
The solution includes KafkaStreamTest, which showcases how to create end-to-end test cases that stand up an embedded Kafka and ZooKeeper server.
Kafka topics are created on the embedded Kafka broker and a stream topology is created that reads from these Kafka topics.
Each test case sets up and tears down the entire stack and is repeatable across each test case. The local state store is also removed after each test case execution.
A QueryableAnomalyPublisher is provided, which helps to listen to the anomalies published by the Stream engine from inside the JUnit.
Other Use Cases That Can Be Implemented With the Kafka Streams Library
- Implement highly scalable anomaly detection use cases.
- Implement complex event processing engines.
- Implement real-time feeds for live dashboards charting.
- Implement a real-time monitoring solution.
- There is also an emerging KSQL, which is like a SQL language that can be used to implement anomalies/CEP logic and store the results in the state store instead of using the DSL or streaming processor API directly.
- If needed, the results of the state store can be moved into a Kafka topic and further by using the Kafka connect library moved into a scalable data-store such as Elastic, S3, HDFS, JDBC, Redis, Cassandra, etc.
Cons of Using Kafka Streams Library
- The solution is Kafka-centric approach. Incoming data needs to be available on Kafka topics.
- Alternate solutions are to implement it using Spark/Flink or to use CEP (Esper) solutions. In such cases, a separate cluster needs to be maintained.
If the aggregate result data needs to be stored permanently or needs to be scaled out independently, then push the state to the topic and use Kafka connect to push the state into a scalable data layer.
Following are some of the key articles referenced in preparation of this example project. These articles provide good background and concepts on streaming applications. Take time and digest these concepts.
Part 1 of a six-article series from Confluent on Kafka's role in data-intensive applications Also, read all the six articles that are listed at the end of the first article.
GitHub page about using event-time instead of ingestion time or process time by using EventTimeExtractor
Getting to a Clean State
Sometimes, Kafka needs to be cleaned up to get to a clean state. To do this, perform these steps:
rm -rf zookeeper schema-registry-logs kafka-logs
rm -rf logs
Outer Join Sliding Window Explained
Kafka allows performing outer join across topics and applying sliding windows to group events within a timeframe. For this example, the sliding window size does not matter because events from both topics are stored in a local store (KTable), are expired, and are controlled by the application-specific code in PaymentDueAnomalyDetector. The events are sent via EventPublisher and outer joins and sliding windows (five-seconds length) are configured in PaymentAnomalyApp. As stated, for this application, the sliding window size does not matter but the consequence of these settings is explained below.
EventPublisher sends events to two topics (schedule and due events topics) and all events have the same key, which is customer account #1.
A Kafka stream
paymentEventsOuterJoinStreamis created inside PaymentAnomalyApp across these two topics. This stream has a sliding window across these two topics, which is set to five seconds.
Send a payment event for $100.
Send another payment event for $101.
Send a due event of $200 after a one-second delay.
All the above three events will appear together because of the outer join of a five-second sliding window.
Make a four-second delay and send another due event for $211. This event ($211) appears in the next sliding window stream. And the $101 payment event will repeatedly appear in this sliding window, as well. This happens because the five-second sliding window is a continuously overlapping window and that makes the $101 payment event fall into the two overlapping sliding windows.
The last event, for $212, will appear by itself because it is beyond the 5-second sliding window's reach of all the schedule payment events.
This example uses event-time instead of ingestion-time or process-time by using EventTimeExtractor. Kafka supports all three of these settings.
Opinions expressed by DZone contributors are their own.