{{announcement.body}}
{{announcement.title}}

Easy Event Processing With Var, Lombok, and Fluxtion

DZone 's Guide to

Easy Event Processing With Var, Lombok, and Fluxtion

Event processing made easy with var, Lombok, and Fluxtion.

· Java Zone ·
Free Resource

In this article, I am combining two products, Lombok and Fluxtion, to demonstrate how tools can reduce both the code written and time to delivery while improving the readability of the code. The use of var from Java 10 improves the situation even further. Both products and var use inference at build time to accelerate development.

Fluxtion's ethos is to minimize waste; our goal here is to remove the boilerplate code, reduce code-noise, and simplify integration tasks. We want to expend as little development time as possible while still delivering an efficient and high-performance solution capable of processing millions of messages per second.

Using the techniques described, I compare a Fluxtion/Lombok implementation to a Scala example using Akka streams — the Java version requires less code and is simpler to build.

Also, just a little housekeeping, apologies for not acknowledging Richard Warburton of Opsian in my first blog.

So with that out of the way, let's get to it!

Code-Signal-to-Noise Ratio

When we code, we address two main tasks:

  • Translating business requirements into programmatic logic
  • Interfacing the logic with the deployment environment

Ideally, we would like to spend all our time on the first and nothing on the second. Additionally, the total volume of code written should be reduced as well. Balancing the abstraction while still empowering the developer is not easy. With too great of an abstraction, we remove expressive power. I hope to strike a good balance with the approach taken in this article.

Imagine writing a tax calculation that takes 50 lines, but writing code for databases, webservers, marshaling, logging, etc. requires 1000 lines. Although a demonstration of technical capability, there is no business value in the purely technical implementation details. Viewing this from another angle, we could see the business logic as a signal and the infrastructure code as noise. The solutions we write can be measured with a signal-to-noise ratio with respect to useful business logic.

Wikipedia defines signal-to-noise ratio as:

Signal-to-noise ratio (abbreviated SNR or S/N) is a measure used in science and engineering that compares the level of a desired signal to the level of background noise. SNR is defined as the ratio of signal power to the noise power, often expressed in decibels. A ratio higher than 1:1 (greater than 0 dB) indicates more signal than noise.

It is desirable to aim for a high SNR ratio in most systems. In programming terms, some of the advantages of a high SNR are:

  • Less code to write
  • Easier business logic to understand and maintain
  • Shorter learning curve
  • Simpler debugging/fault finding, fewer things to get wrong
  • More efficient development

In Java, we have felt this pressure for better code SNR over the years, moving from heavyweight j2ee container to simpler frameworks like Spark and Spring Boot. The language itself has accommodated this shift by introducing changes such as lambdas, streams, method references, and var variable declaration.

Combining Fluxtion and Lombok

Before the example, a quick primer on Fluxtion and Lombok:

Fluxtion Primer

Fluxtion is an embeddable streaming event processing engine written in Java. The developer describes the processing in a mixture of declarative and imperative forms so Fluxtion can generate a decision engine. The engine is serialized as Java code and can be embedded in any Java application. The application feeds events into the engine for stream processing.

Engine generation can happen inline in the application or as part of the build process with a Maven plugin.

Lombok Primer

Lombok is a utility that automatically writes boilerplate code for Java classes, saving developers time and reducing code noise. Executing as an annotation processing tool, Lombok generates bytecode representing the boilerplate code for annotated classes. An incomplete set of Lombok features include:

  • Automatic bean style getter and setter for properties
  • HashCode and equals generated for properties
  • Automatic toString method
  • Automatic constructor for all class properties

Just add Lombok to your Maven build and your IDE should just work, or it does with NetBeans and IntelliJ.

Streaming Max Temp Example

Let's look at a common Fluxtion usage pattern. Subscribe to a stream of events, extract a value from an event, perform a calculation on the value, and filter and push a result into a user object. In this simple example, we have the following requirements to meet:

  • Listening to temperature events
  • Extract the temperature
  • Maintain the maximum temperature
  • Pushing the temperature into a user-defined instance when there is a new maximum

Clone the repo from GitHub and use this article's tagged version. The project is here.

git clone --branch  article_lombok_july2019 https://github.com/gregv12/articles.git
cd articles/2019/june/lombok/
mvn clean install


The Fluxtion code to deliver the processing requirements:

select(TempEvent::getTemp)
  .map(max()).notifyOnChange(true)
  .push(new MyTempProcessor()::setMaxTemp);


This gives a high-code SNR and low-line count; all the code is business -logic-focused. To achieve this, Fluxtion makes use of method references and type inference. The method references allow Fluxtion to infer the desired behavior, what functions to build, the source and target types, and how to pass data from one node to another in the execution graph. The method references give us a pleasant type-safe way to express arbitrary logic. It is the inference employed by the tool that removes the load from the developer to explicitly express every processing step, giving us a low-code environment to work in.

After Fluxtion generation, the serialized streaming event processor is here, represented as Java code. A test for the example is here.

    @Test
    public void testTemp() throws Exception{
        EventHandler handler = new InlineLombok().handler();
        ((Lifecycle)handler).init();
        handler.onEvent(new InlineLombok.TempEvent(10));
        handler.onEvent(new InlineLombok.TempEvent(9));
        handler.onEvent(new InlineLombok.TempEvent(17));
        handler.onEvent(new InlineLombok.TempEvent(16));
        handler.onEvent(new InlineLombok.TempEvent(14));
        handler.onEvent(new InlineLombok.TempEvent(24));
        Assert.assertEquals(3, MyTempProcessor.count);
    }


Output:

08:08:42.921 [main] INFO  c.f.generator.compiler.SepCompiler - generated sep: D:\projects\fluxtion\articles\2019\june\lombok\target\generated-sources\fluxtion\com\fluxtion\articles\lombok\temperature\generated\lombok\TempMonitor.java
new max temp:10.0
new max temp:17.0
new max temp:24.0
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 4.79 sec


Processing graph image:

Image title

Key:

Orange oval: incoming event

Blue rectangle: a node that has an event as input

Green rectangle: a node managed by the event processor

Blue arrow: points direction of event flow. References are reverse of arrow

Code Explanation

Looking closer at the first line in the example above, select(TempEvent::getTemp), we can examine the inference that Fluxtion is making. The logic implied here is:

  • Create a subscription for events of type TempEvent,
  • Add a node that extracts the value of getTemp from the incoming event
  • Make the temp value available as Number property of a node
  • Notify children of change to temp value when an incoming temperature event is received.

The map, notifyOnChange, and push functions are steps added to the execution chain. See the Wrapper interface of Fluxtion streaming module for details. Due to the high SNR, it is easy to understand their purpose and effect, but for completeness:

  •  map(max()): This extracts the Number property from the previous node (temperature). Apply the value to a stateful max function when a new value is received. Store the current max value in a node with a Number property. Notifies any child nodes on the value of the current max when an event is received.
  • notifyOnChangeA stateful function that triggers when the value monitored has updated and is different from the previous value. Only new max values are propagated to child nodes.
  • push(new MyTempProcessor()::setMaxTemp): Adds a user node, MyTempProcessor, into the execution chain. When triggered by a new max, the temp pushes the value of the node into setMaxTemp of MyTempProcessor. Carry out all type conversions for primitive types without generating garbage.

To use method references on the TempEvent, we first need to define a getter/setter style accessor method pair. Of course, IDEs can generate the required methods, but the SNR will still drop after the generation. Expand this to a larger domain and the problem multiplies. Lombok can come to our rescue here, removing unnecessary code and restoring our SNR.

Before Lombok

public class InlineNoLombok {

    public EventHandler handler() throws Exception {
        return sepInstance(c
                -> select(TempEvent::getTemp)
                        .map(max()).notifyOnChange(true)
                        .push(new MyTempProcessor()::setMaxTemp),
                "com.fluxtion.articles.lombok.temperature.generated.nolombok", "TempMonitor");
    }


    public static class TempEvent extends Event {

        private double temp;

        public TempEvent(double temp) {
            this.temp = temp;
        }

        public double getTemp() {
            return temp;
        }

        public void setTemp(double temp) {
            this.temp = temp;
        }

    }

}


After Lombok

Adding a single @Data annotation removes the getter/setter, and the @AllArgsConstructor removes the constructor:

public class InlineLombok {

    public EventHandler handler() throws Exception {
        return sepInstance(c
                -> select(TempEvent::getTemp)
                        .map(max()).notifyOnChange(true)
                        .push(new MyTempProcessor()::setMaxTemp),
                "com.fluxtion.articles.lombok.temperature.generated.nolombok", "TempMonitor");
    }

    @Data
    @AllArgsConstructor
    public static class TempEvent extends Event {
        private double temp;
    }
}


Even with this smallest of examples using Lombok and Fluxtion together, the actual business logic is much easier to read. A better code SNR makes the application more efficient to build and easier to understand.

Flight Data Example

Let’s extend this to a more complex example where a high SNR becomes apparent. In this example, we are processing flight data for a whole year. The example was inspired by this blog, and the code for the Akka streaming solution is here. The summary of requirements:

Process a year's worth of all US flight landing records stored in CSV format here.

  • Group the carriers by name
  • Filter records that have a delay > 0
  • Carrier name: column 8, delay: column 14
  • For a carrier grouping calculate:
    • Cumulative sum of total delay
    • Total number of delayed flights
    • Average delay for a flight if it is late
  • Calculate the total count of flights regardless of the delay

We need to define data types and processing logic to solve the problem. It would be easy to be overwhelmed by the noise in the solution. But Fluxtion allows us to concentrate on the business logic and Lombok makes the data types easy to work with, both tools using inference to reduce the code to write Java 10 var for intermediate variable declaration reduces the code noise:

public class FlightAnalyser {

  @SepBuilder(
          name = "FlightDelayAnalyser",
          packageName = "com.fluxtion.articles.lombok.flight.generated"
  )
  public void buildFlightProcessor(SEPConfig cfg) {
    var flightDetails = csvMarshaller(FlightDetails.class, 1)
            .map(14, FlightDetails::setDelay).converter(14, defaultInt(-1))
            .map(8, FlightDetails::setCarrier).converter(8, Converters::intern).build();
    //filter and group by
    var delayedFlight = flightDetails.filter(FlightDetails::getDelay, positive());
    var carrierDelay = groupBy(delayedFlight, FlightDetails::getCarrier, CarrierDelay.class);
    //derived values for a group
    carrierDelay.init(FlightDetails::getCarrier, CarrierDelay::setCarrierId);
    carrierDelay.avg(FlightDetails::getDelay, CarrierDelay::setAvgDelay);
    carrierDelay.count(CarrierDelay::setTotalFlights);
    carrierDelay.sum(FlightDetails::getDelay, CarrierDelay::setTotalDelayMins);
    //make public for testing
    var delayByGroup = cfg.addPublicNode(carrierDelay.build(), "delayMap");
    //dump to console, triggers on EofEvent
    printValues("\nFlight delay analysis\n========================",
            delayByGroup, eofTrigger());
  }

  @Data //input data from CSV
  public static class FlightDetails {
    private String carrier;
    private int delay;
  }

  @Data //derived data
  public static class CarrierDelay {
    private String carrierId;
    private int avgDelay;
    private int totalFlights;
    private int totalDelayMins;
  }

}


Implementation Analysis

Lombok allows us to deal with data classes and field types, ignoring the scaffolding of getters/setters. We define an input record, FlightDetails, and the grouping summary record, CarrierDelay.

The use of the var keyword for intermediate instance assignment simplifies reading and writing the code.

  • In line 8, Fluxtion maps the csv to the FlightDetails type, the 1 indicates an initial header line to ignore.
  • Line 9 maps column 14 to delay of FlightDetails. An optional converter function maps a missing or non-numeric delay to the value of  -1. Type inference by Fluxtion ensures a char to int conversion with zero gc.
  • Line 10 maps column 8 to carrier name. The carrier name is interned to reduce the unnecessary allocation of String objects as we expect the same carrier names to appear many times. Bearing in mind, there are 7 million records, so this will reduce gc pressure massively.
  • In line 12, the filter function, positive(), is applied to the field  FlightDetails::getDelay, only delayed flights are processed by child nodes.
  • Line 13 filtered records, delayedFlight, are grouped by the key  FlightDetails::getCarrier; the target of the group is CarrierDelay.
  • Line 15 defines the initialization function for a new carrier entry into the group only called when a new key is allocated in the group.
  • Line 16 applies the average function to delay and sets the value CarrierDelay:setAvgDelay
  • Line 17 applies count function to delay and sets the value CarrierDelay:setTotalFlights
  • Line 18 applies sum function to delay and sets the value CarrierDelay:setTotalDelayMinutes

The calculations are stateful and have unique values for each carrier; every time a FlightDelay record is received and the delay is greater than 0, the summary calculations update for the relevant carrier.

  • Line 21 assigns a delayMap as a public final variable to assist testing
  • Line 22 prints the map values when an end-of-file event is received

Performance

Executing the flight analysis for 2008, unzip the flight csv data and pass file location to the executable jar in the distribution.

java.exe -jar dist\flightanalyser.jar [FLIGHT_CSV_DATA]

  Flight delay analysis
========================
FlightAnalyser.CarrierDelay(carrierId=OO, avgDelay=31, totalFlights=219367, totalDelayMins=6884487)
FlightAnalyser.CarrierDelay(carrierId=AA, avgDelay=35, totalFlights=293277, totalDelayMins=10414936)
FlightAnalyser.CarrierDelay(carrierId=MQ, avgDelay=35, totalFlights=205765, totalDelayMins=7255602)
FlightAnalyser.CarrierDelay(carrierId=FL, avgDelay=31, totalFlights=117632, totalDelayMins=3661868)
FlightAnalyser.CarrierDelay(carrierId=DL, avgDelay=27, totalFlights=209018, totalDelayMins=5839658)
FlightAnalyser.CarrierDelay(carrierId=NW, avgDelay=28, totalFlights=158797, totalDelayMins=4482112)
FlightAnalyser.CarrierDelay(carrierId=UA, avgDelay=38, totalFlights=200470, totalDelayMins=7763908)
FlightAnalyser.CarrierDelay(carrierId=9E, avgDelay=32, totalFlights=90601, totalDelayMins=2907848)
FlightAnalyser.CarrierDelay(carrierId=CO, avgDelay=34, totalFlights=141680, totalDelayMins=4818397)
FlightAnalyser.CarrierDelay(carrierId=XE, avgDelay=36, totalFlights=162602, totalDelayMins=5989016)
FlightAnalyser.CarrierDelay(carrierId=AQ, avgDelay=12, totalFlights=1908, totalDelayMins=23174)
FlightAnalyser.CarrierDelay(carrierId=EV, avgDelay=35, totalFlights=122751, totalDelayMins=4402397)
FlightAnalyser.CarrierDelay(carrierId=AS, avgDelay=27, totalFlights=62241, totalDelayMins=1714954)
FlightAnalyser.CarrierDelay(carrierId=F9, avgDelay=21, totalFlights=46836, totalDelayMins=992044)
FlightAnalyser.CarrierDelay(carrierId=B6, avgDelay=42, totalFlights=83202, totalDelayMins=3559212)
FlightAnalyser.CarrierDelay(carrierId=WN, avgDelay=26, totalFlights=469518, totalDelayMins=12633319)
FlightAnalyser.CarrierDelay(carrierId=OH, avgDelay=34, totalFlights=96154, totalDelayMins=3291908)
FlightAnalyser.CarrierDelay(carrierId=HA, avgDelay=18, totalFlights=18736, totalDelayMins=342715)
FlightAnalyser.CarrierDelay(carrierId=YV, avgDelay=37, totalFlights=111004, totalDelayMins=4159465)
FlightAnalyser.CarrierDelay(carrierId=US, avgDelay=28, totalFlights=167945, totalDelayMins=4715728)

millis:2682


Processing performance analysis:

file size           = 673 Mb
record count        = 7,009,728
processing time     = 2.689 seconds
bytes process rate  = 250 Mb per second
record process time = 383 nanos per record
record process rate = 2.6 million records per second


Comparing the two solutions, Fluxtion/Lombok with Akka streams, we observe the following:

  • The Java version uses less code than the Scala version
  • Fluxtion removes the need to define a graph, just business logic
  • Building a graph manually is a source of errors and adds code noise
  • Lombok makes data types as terse as scala case classes
  •  var reduces code bloat
  • The signal-to-noise ratio is high in the Java version, making the code easier to maintain and understand
  • Fluxtion is much easier to run; it requires no server setup — just compile and go.

It is difficult to compare performance numbers; the Akka version takes about a minute to run the example, but I do not have sufficient Akka experience to validate this. Additionally, it is an old blog, so the situation has probably moved on. The Java version runs in 2.7 seconds.

Conclusion

We set out to demonstrate that Java can be a terse language for event streaming, even if we select a good set of tools to use. Lombok and Fluxtion combine elegantly, allowing the declarative definition of processing logic to be both simple and type-safe. The use of var makes the code even more readable and easier to write. The key to all of this is inference; each tool infers a different type of behavior, and all of them save the coder from having to explicitly specify their intention, removing noise:

  •  var — type inference
  • Lombok — infers boilerplate implementation
  • Fluxtion — infers processing graph

In the case of Fluxtion, we compare how the Akka version requires a processing graph to be explicitly defined by the developer. This does not scale for larger, more complex situations and will be a source of errors. Even worse, the business logic is being obscured with technical infrastructure, making maintenance even more costly in the future.

As a final note, the performance of the solution is excellent, processing 2.6 million records per second with zero gc. I hope you enjoyed this job and are now tempted to try Fluxtion and Lombok.

Acknowledgements

AllSimon on GitHub: His comments contributing to Fluxtion led me to experiment with Lombok. 

Thanks!

Topics:
java ,streaming ,lombok ,high performance ,low latency message processing ,java 10 ,readability ,fluxtion ,var ,scala

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}