Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Developing Event-Driven Applications to Prevent Accidents

DZone 's Guide to

Developing Event-Driven Applications to Prevent Accidents

Develop your own event-driven apps to prevent accidents.

· IoT Zone ·
Free Resource

Let’s build an application that not only predicts a disaster but helps to prevent it. We will try and compare two well-known streaming technologies: Apache Spark and Apache Flink. Our application will consume an event stream, figure out that things are going bad, and trigger an alarm signal.

There is a variety of use cases, for example:

  • Events received from heat and smoke sensors in a room.

    • Alarm: fire in the room.

    •  Response: activate the fire protection system.

  • Events received from a virtualized network function (VNF): the capacity threshold has been reached.

    • Response: deploy an additional instance of this VNF.

  • An online payment system sent a sequence of events indicating a fraud.

    •  Response: block the fraudulent transaction.

We will consider the first use case – a fire alarm. A curious reader may find the code on GitHub.

The Use Case: Fire Alarm

There is a room with some important and expensive equipment that runs a 24/7 service for thousands of clients. The room is equipped with heat and smoke detectors. Our application must analyze events from the sensors, predict the fire threat, and activate the fire protection system when necessary. If we fail to prevent the fire or to extinguish it at an early stage, our equipment will be permanently damaged. Our 24/7 service will collapse, our clients will face outages. No need to describe the danger of fire for people.

Detecting fire alarm

The predictor application should recognize the events pattern:

  1. One or several heat events are followed by one or several smoke events.
  2. All correlated heat and smoke events occurred in the same room.
  3. Smoke events occurred after heat events, but not later than some timeout.

As we see, this events pattern involves two types of correlation: topology based correlation (events from the same room) and time-based correlation (smoke events within some time interval after a heat event). It is a crucial difference from the scenario that we explored in another article: Topology-based event correlation with Apache Spark Streaming, where we simply correlated events from the same base station controller.

Fire alarm: false positive

Let’s assume, for illustrative purposes, that we want to exclude an event sequence with smoke events preceding heat events. Smoke events in such a situation might not be in correlation with heat events. Smoke detectors may be affected by an old chap smoking at a staircase, which is not good by itself, but still not a reason to activate fire sprinklers.

Problem timeout

Additionally, we have to define a maximum interval between the latest heat event and the earliest smoke event. For example, if we get a smoke event 12 hours after a heat event, then these two events are unlikely in a causal relationship.

With these requirements in mind, let’s advance to the solution architecture.

Solution Architecture

Here is the proposed architecture of the solution.

Solution architecture

Event Producers collect readings from sensors, generate heat, and smoke events in a standard format (that we will discuss a bit later) and push them to the events Kafka topic. The Fire Predictor application consumes events and compares them with the fire alarm pattern. In case of positive result, Fire Predictor generates an alarm and publishes it to the alarms Kafka topic. The downstream Fire Prevention System handles the alarm.

For brevity, we omit other components of the solution like event storage, alarm lifecycle or incident management. Our goal is to implement the correlation logic in Fire Predictor.

Before touching Fire Predictor itself, we have to clarify some important points. The first one is about event timestamps. All heat and smoke events must have correctly set timestamps; otherwise, Fire Predictor will not be able to correlate them. Here are two possibilities: the sensors provide timestamps or Event Producers set timestamps of events before pushing them to Kafka. In any case, we assume that each event in the standard format has a timestamp attribute.

The second important point is incurred by the requirement to correlate events from the same site. Fire Predictor is a distributed application; it can be spread across multiple executors running on different machines. In order to correlate events from a particular site, Fire Predictor has to process them on the very same executor. This means that Event Producers must partition events by the site. In other words, all events generated on the same site must travel through the same partition of the events Kafka topic.

To illustrate this scalability problem, let’s imagine that Fire Predictor has only one executor: executor-1. In this case, executor-1 processes events from all sites. As the data volume increases, we want to scale out and add one more executor. For example, events from site-A are processed on executor-1, and events from site-B are processed on executor-2. How can we ensure that executor-1 receives all events from site-A? The only way to achieve this is to consume events from all partitions of the Kafka topic. This solution introduces many-to-many network communications between Kafka nodes and application nodes. To avoid this poor configuration, we need the per-site events partitioning, as in this case, we can efficiently distribute the load across application executors.

Out of order events

The third important point is about the event order. Thanks to the per-site events partitioning, we can be sure that Fire Predictor receives events from a site in the same order as the corresponding Event Producer sends them. However, we don’t have guarantees about the order in which the Event Producer polls these events from sensors. Assuming that each sensor sets event timestamps correctly, we still can expect that events from the same site are pushed to the Kafka topic in the wrong order. We will solve this problem by introducing an additional parameter: the maximum interval of event out-of-orderness.

Event Format

The event format should be simple and generic, allowing integration with a large variety of sensors. Here is the minimal set of event attributes that we need:

  • Event timestamp

  • Site identifier

  • Arbitrary textual information

For the same reason of simplicity, we serialize events as JSON strings. A more advanced solution may employ better serialization techniques, like Apache Avro or Google Proto Buffers. Therefore, a serialized event passing through the system looks like:

{"timestamp":1554654092232, "siteId":100, "info":"Smoke detected"}


Note that events do not have unique identifiers. A simple sensor is not supposed to provide unique identifiers. Of course, we could equip Event Producer with an identifier generation function, but we do not need it in our case.

However, we still need some mechanism to deduplicate events. This can be done by the pair of event attributes: (timestamp, siteId). Two events having the same timestamp and the same site identifier are treated as the same event duplicated.

Now, we have all the preparation work done, so let’s proceed with the Fire Predictor implementation. The first technology to try is Apache Spark Streaming.

Using Apache Spark Structured Streaming

The Spark Structured Streaming API uses the concept of a dataset. A stream of data is treated as an unbounded Dataset, and the API provides a rich set of functions on such datasets. However, our fire prediction scenario deals with the chronological order of events: It looks for smoke events occurred after heat events. It turns out that such a scenario is hard to implement with Spark window aggregation functions. We rather need a stateful processor that tracks all events within a bounded time window.

Spark Streaming provides an API for arbitrary stateful operations. Using this API, we can write the code as follows:

val alarms = events
  .filter(e => isFireCandidate(e))
  .withWatermark("timestamp", "10 seconds")
  .groupByKey(_.siteId)
  .flatMapGroupsWithState(
    OutputMode.Update(),
    GroupStateTimeout.EventTimeTimeout()
  )(alarmStateFunction.updateAlarmState)


The stateful processing is going on in a rather clumsy function:

def flatMapGroupsWithState[S: Encoder, U: Encoder](
  outputMode: OutputMode,
  timeoutConf: GroupStateTimeout
)(func: (K, Iterator[V], GroupState[S]) => Iterator[U]): Dataset[U]


Let’s go over this function and try to understand it.

The returned Dataset[U] is the data set with alarms generated by Fire Predictor, so the type U, in our case, corresponds to the type of alarm.

The type K denotes the key of the group. Remember, we want to correlate events for each site (or room). Therefore, K, in our case, means site identifier. In other words, we maintain a state independently for each site.

TheIterator[V] is the collection of the input events received for each trigger for the given value of key K. This is hard to understand without recollecting that Spark Streaming processes events in micro-batches. The streaming engine invokes the functionfunc for each micro-batch for each key value. To put it short, in our case, Iterator[V] has all events received in one micro-batch for this site K.

Finally, the function  func: (K, Iterator[V], GroupState[S]) => Iterator[U] must implement the logic to handle the state. It returnsIterator[U] – one alarm or no alarms. The current state has the type S and must be stored in a container GroupState[S]. When implementing the function func, we have to properly update the current state or to remove it on timeout.

You can check out this implementation: GroupStateFirePredictor.scala. Note that the current state is coded in a separate class: FireAlarmState; we will get back to this class a bit later.

As we see, the  flatMapGroupsWithState API is hard to understand and, hence, difficult to use. The notion of micro-batches introduces additional complexity because we need to reason about these micro-batches. Do heat and smoke events come in the same micro-batch or in two different micro-batches? Do we need separate test cases for the same batch and different batches scenarios? Yes, we do.

A few more notes on Spark Streaming API for stateful operations. The streaming engine re-shuffles the data before applying the state processing function. It is necessary to direct events having the same key value to the same executor. This re-shuffling behavior allows us to correlate all events from the same site on a single executor. However, the Spark Structured Streaming engine consumes events from Kafka with only one consumer instance, running within the driver application (for details, see this discussion on Stack Overflow). Therefore, we have a bottleneck in the data flow: the Kafka consumer within the Spark driver application.

The user-defined state S has a restriction: it must be encodable to Spark SQL types. This means that you either use only data types, for which Spark provides encoders out-of-the-box, or you bother about providing your custom encoders.

Well, it was a great challenge, but we have even more interesting things ahead.

Using Apache Flink

Apache Flink provides a very rich programming model. A developer may choose the level of abstraction from the following options:

It’s hard to resist the temptation of trying all the available features, but let’s go step-by-step.

Apache Flink: Low-level Stateful Processing

Ok, we want to try Flink for our Fire Predictor. The first question is: Can we reuse the state implementation that we’ve already done for the Spark Streaming application? Indeed, we can if we use the Process Function API. You can find the code in the ProcessFunctionFirePredictor class. It is remarkable that this Fire Predictor implementation uses the very same current state class: FireAlarmState.

The code looks rather simple:

val fireCandidates = events
  .filter(e => isFireCandidate(e))
  .assignTimestampsAndWatermarks(TimestampAssigner)
  .keyBy(_.siteId)
val alarms: DataStream[Alarm] = fireCandidates.process(AlarmDetector)


Here, the  AlarmDetector object implements the per-event processing function:

def processElement(event: Event, ctx: Context, out: Collector[Alarm]): Unit


This function accesses the current state through the context object: ctx. Each time a new event comes, the function updates the current state and generates an alarm when necessary.

However, let’s remember that some events may arrive in the wrong order. Therefore, the  processElement function should not trigger alarm immediately but wait a bit for possible out-of-order events. We can achieve this by setting a timer:

ctx.timerService()
  .registerEventTimeTimer(event.timestamp + eventOutOfOrdernessMillis)


When the timer triggers, it’s a proper moment to create an alarm. The API provides the onTimer function to overwrite:

override def onTimer( timestamp: Long, ctx: OnTimerContext, out: Collector[Alarm] ): Unit = {
  val fireAlarm = ... // Create alarm when needed
  fireAlarm.foreach { alarm =>
    out.collect(alarm)
    state.clear()
  }
}


Unlike Spark, Flink does not impose serious restrictions on the data type of the state (Data Types and Serialization). For simple data types, Flink uses its unique serialization technique; for generic types, it falls back to Kryo serialization. Flink State API provides a convenient way to access the state in the streaming operations. For us, this means that we can just pack an instance of the FireAlarmState class to a ValueState container and that’s enough.

Thanks to  keyBy(_.siteId), the event received from the same site come to the same Flink executor.

Finally, Flink Kafka source launches Kafka consumers on all Flink executors. Comparing to the only Kafka consumer in Spark Structured Streaming, Flink provides more scalable integration with Kafka.

Well, even low-level Flink API seems very promising. Shall we proceed to the next level?

Apache Flink: Session Windows

Session windows are frequently used to monitor user activity on a website. A session window starts when a user begins his activity on the website. The session window terminates when the user remains inactive on the website for a long period. Any subsequent activity of that user triggers the creation of another session window.

Session window

It seems that we just need to replace website users with sites generating events. Thus, we have SessionWindowFirePredictor implemented. Looks neat:

val alarms = fireCandidates
  .window(EventTimeSessionWindows.withDynamicGap(TimeGapExtractor))
  .trigger(EventTimeTrigger.create())
  .aggregate(
    AlarmAggregator,
    WindowFunction
  )


The tricky thing here is that only heat events are supposed to start session windows. This is done by effectively disabling session windows created by smoke events:

private object TimeGapExtractor
  extends SessionWindowTimeGapExtractor[Event] {
  ...

  override def extract(event: Event): Long =
    if (event.isHeat)
      problemTimeoutMillis
    else
      1L
  }


Apache Flink: Interval Joins

Flink interval joins allow joining elements of two streams in a way that an event from the first stream lies in a relative time interval to an event from the second stream.

Interval join

The code in IntervalJoinFirePredictor simply looks for smoke events occurred in the time interval:

(heatEvent.timestamp; heatEvent.timestamp + problemTimeout)


Here is how it looks:

val alarmCandidates: DataStream[(Event, Event)] = heats.intervalJoin(smokes)
  .between(Time.milliseconds(0L), Time.milliseconds(problemTimeoutMillis))
  .lowerBoundExclusive()
  .upperBoundExclusive()
  .process(JoinEventsFunction)
val alarms: DataStream[Alarm] = alarmCandidates
  .keyBy(_._1.siteId)
  .timeWindow(Time.milliseconds(problemTimeoutMillis))
  .apply(CreateAlarmFunction)


Note that we perform an additional window aggregation in order to convert alarm candidate events (heat-smoke pairs) to alarms. This step is necessary to avoid creating an alarm for each candidate pair; instead, an alarm should be created only for the first candidate pair. That is, having candidate pairs:

(heat1, smoke1), (heat1, smoke2), (heat1, smoke3),


An alarm will be created only for(heat1, smoke1).

Of course, session windows and interval joins are not the only options to use in the fire prediction scenario. Now, we are advancing to even more interesting Flink features.

Apache Flink: Complex Event Processing

FlinkCEP is a Complex Event Processing library on top of Flink. Here is the corresponding implementation of Fire Predictor: CepFirePredictor. Just have a look at the pattern declaration:

val pattern = Pattern.begin[Event](
  "heat",
   AfterMatchSkipStrategy.skipPastLastEvent()
).where(_.isHeat).oneOrMore
  .next("smoke").where(_.isSmoke)
  .within(Time.milliseconds(problemTimeoutMillis))


Could you imagine a correlation engine with rules written almost in plain English? FlinkCEP hides all low-level state management under the hood and exposes a declarative DSL to describe the patterns to search. The pattern declaration is self-explanatory. Gosh, now even sales guys will be able to develop streaming applications (but their customers will do it better).

Just one clarification:  AfterMatchSkipStrategy.skipPastLastEvent() means: Keep only full matches and discard partial matches, having matches like:

(heat1, smoke), (heat1, heat2, smoke), (heat1, heat2, heat3, smoke),


Flink will keep only the full sequence, (heat1, heat2, heat3, smoke), and discard all partial sequences.

So far, we have tried several Flink APIs – from low to high level. Indeed, Flink provides a rich choice. Now, let’s consider the two streaming engines from the angle of testing.

Unit Testing

As long as we are working at a low level, Spark and Flink look similar. A developer has to implement the function running stateful processing:

  • With Spark Streaming:  func: (K, Iterator[V], GroupState[S]) => Iterator[U]
  • With Flink:  def processElement(event: Event, ctx: Context, out: Collector[Alarm]): Unit

Hence, the developer has to cover this function with unit tests. There is no much difference between the two technologies here.

However, Spark and Flink provide very different conveniences for the integration unit testing. So far, we have one Spark implementation and four Flink implementations of the same business logic:

  •  GroupStateFirePredictor 
  •  ProcessFunctionFirePredictor 
  •  SessionWindowFirePredictor 
  •  IntervalJoinFirePredictor 
  •  CepFirePredictor 

Obviously, we want the same set of validation tests with the same input data, having the same expectations. This test set ensures that each particular implementation meets the Fire Predictor requirements.

It turns out that such integration tests are rather easy with Flink. Firstly, Flink allows creating data streams for unit tests:

val events = env.fromElements(
  Event(timestamp = 500L, siteId = 10L, info = “Heat event”),
  …
)


Moreover, Flink provides a utility abstract class AbstractTestBase that simplifies the creation of the runtime environment. Actually, this testing harness runs a mini-cluster for unit tests.

The output of the stream can be directed to a local collection, which can be compared with the expected output.

Unfortunately, such testing tools are not available in Spark Structured Streaming. One cannot simply create a streaming dataset from a collection of objects. Although it is technically possible to create a local Spark cluster within a unit test, there is no easy way to push a sample data set to a stream and to collect the output.

Conclusions

In this article, we employed two great technologies – Apache Spark and Apache Flink – for the same job: analyze a stream of events to prevent an accident. We have seen that Apache Flink provides a richer set of APIs to process event streams. FlinkCEP library provides a concise and convenient DSL to declare event patterns, while DataStream API provides a large variety of operations on event streams.

It's important that Apache Flink provides convenient tools to write unit tests. These tools enable faster and more reliable development. Personally, I consider this as a key advantage.

Nevertheless, both technologies are mature and backed up by established communities. Both Spark and Flink allow us to create great solutions, thanks to the following features:

  • Scalable processing on multiple executors
  • Fault tolerance and recovery from a checkpoint
  • Event deduplication
  • Handling late and out-of-order events
  • Integration with messaging systems like Apache Kafka

 It is remarkable that open-source technologies conquer and overthrow some legacy commercial products.

Interesting Links

Real-Time Analysis of Popular Uber Locations using Apache APIs: Spark Structured Streaming, Machine Learning, Kafka, and MapR-DB.

A brief history of time with Apache Flink: Quality monitoring of Telecom networks at Bouygues Telecom.

Topics:
big data ,streaming analytics ,apache spark ,flink ,internet of things ,telecommunications ,iot

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}