Event Stream Programming Unplugged —Part 1
This article shows what event stream programming is and demonstrated its use can be quite easy if we uncouple non-core concerns.
Join the DZone community and get the full member experience.
Join For FreeWelcome to event streaming unplugged; this is the first in a series of articles covering the topic of real-time event stream processing. This is a practical programming series with resources at unplugged part 1. What you will learn in these articles:
- The basics of unbounded event processing.
- Functional reactive programming introducing concepts like map, filtering, groupBy, and flatMap.
- Constructing directd acyclic execution graphs.
- Managing event streams of heterogeneous types.
- Imperative event programming, integrating application code.
- Input and output.
- Building, testing, and deploying.
- Auditing, logging, and debugging event execution flow.
The open-source event stream processing library Fluxtion is used to implement the examples. I hope to receive feedback on the library and make improvements.
Unplugged From What?
There are many great products and projects available that offer real-time stream processing capabilities, but in general, they come connected to an infrastructure service or some other unrelated concerns. This series is focused on processing logic, so we are unplugged from:
- Threads — No RxJava, Flow, or co-routines.
- Messaging infrastructure — No Kafka streams, Amazon Kinesis.
- Grid processing — No Hazelcast jet, Akka streams.
We are solely concerned with the logic of processing event streams. Imagine if Java 8 streams were tightly coupled to a persistent store being present. Although useful in many situations, this would create confusion for developers and reduce the uptake of the library.
Event stream processing as a subject suffers from complexity overload; I strongly believe less is more in this domain. Simplicity is the key to understanding and deriving value from event stream programming. It is the application's responsibility to read data, create events, manage threads, integrate a stream processor, and route events to it.
Real-World Trading Experience
- If you are involved with algorithmic trading, you will probably come across this pattern.
- Almost all banks/funds have some homegrown stream processing derivative they use.
- The learning curve is steep but worth it.
- Very useful once understood; the danger is all problems are nails for the stream processing hammer.
- Hand coding listeners/events become untenable over time, creating a large technical debt. Forcing the use of a stream processing library.
- There is usually an existing messaging and threading infrastructure that cannot be changed. Processing logic engines must integrate easily.
- Single-threaded applications with little shared state reading from lock-free queues are generally faster.
- Applications that are driven purely from events are deterministic, easier to reason about, and simpler to test and replay from a historical data source.
- Determining event flow in the processing engine requires tool support; without it, developer productivity plummets.
Hello World Event Stream
To get started, here is five minute hello world event stream primer before exploring a more complex example.
Listen to two unbounded data streams, and extract a value from a stream when a new event is received. Apply a binary function to the two values, in this case, add. If the sum is greater than 100, then log the value to the console. The function is stateless, but the streams move independently and must preserve the last value received to apply as an argument to the function. The function and dependent filter are exercised if either event stream updates.
Code Example
public class HelloWorld {
public static void main(String[] args) {
//builds the EventProcessor
EventProcessor eventProcessor = Fluxtion.interpret(cfg -> {
var data1Stream = subscribe(Data1.class)
.console("rcvd -> {}")
.mapToDouble(Data1::value);
subscribe(Data2.class)
.console("rcvd -> {}")
.mapToDouble(Data2::value)
.map(Double::sum, data1Stream)
.filter(d -> d > 100)
.console("OUT: sum {} > 100");
});
//init and send events
eventProcessor.init();
//no output < 100
eventProcessor.onEvent(new Data1(20.5));
//no output < 100
eventProcessor.onEvent(new Data2(63));
//output > 100 - log to console
eventProcessor.onEvent(new Data1(56.8));
}
public record Data1(double value) {
}
public record Data2(double value) {
}
}
Line 4-15: Creating an in-memory event processor with:
Fluxtion#interpret(Consumer<SEPConfig> cfg)
The instance returned is an event processor that is a target for application events and contains all processing logic. The next ten lines are the construction logic for the execution graph.
Lines 5-11: Subscribe to two independent unbounded streams for data types Data1 and Data2. The subscribe command creates a node in the graph connected to incoming events of that type. Extract the double value for each stream with a map operation. A console operation peeks into the stream and logs the value when the double value is updated. A fluent style API chains the map operations as a child of the parent node.
Line 12: Apply the binary function Double#sum if either parent stream updates and store the result in this stream. The two streams are joined at the binary function, effectively creating a connected graph.
Line 13: Apply the filter every time the sum updates; if the filter returns true, the value of the sum is published to the child operation; in this case, log to the console.
Events are sent to the event processor with a call to EventProcessor#onEvent. The processor will take care of all dispatch logic, state management, and invoking functions with the expected values and in the correct order.
Execution Output
rcvd -> Data1[value=20.5]
rcvd -> Data2[value=63.0]
rcvd -> Data1[value=56.8]
OUT: sum 119.8 > 100
Comparison to Java 8 Streams
On the surface, the code looks very similar to Java 8 streams, intentionally so. So what are the differences in behavior to Java 8 streams:- The data is unbounded; in Java streams, finite data sets are processed, and a terminal operation triggers the processing. With the event processor, a new event initiates a process cycle.
- The event processor is long-lived, and the state is held in the graph, such as the last event value received. Streams are essentially stateless objects that are transient and cannot be re-used.
- Streams are single pipelines with a single execution path; an event processor is a graph of nodes with an arbitrary number of execution paths.
- Streams only handle a single event type; the event processor supports an arbitrary number of input event types.
- Event processors have a lifecycle, they can be started, and stopped streams are one shot in operation.
Processing Graph
Under the covers, the call to Fluxtion#interpret generates an event processing graph that will be used to process events; the graph is a topologically sorted set of nodes. An event processor ensures nodes are invoked in topological and only if one of their parents has been invoked previously.The hello world event processor graph as a diagram:
Fluxtion
The open-source Fluxtion library has been created from my experience in various funds and banks and the demands of event stream processing. The library is confined to processing logic and the building of complex processing graphs. Advanced features such as ahead-of-time source code generation and zero GC are supported. I hope to get feedback from people following the tutorials and improve the end product.The fluxtion streaming API is intended to be similar to the java streams API to reduce the learning curve for developers new to the event stream programming paradigm.
Realtime Trade Calculation
The hello world example is a gentle introduction easily implemented with a handwritten solution. We next address a problem of medium complexity to demonstrate the advantage of using a framework over a bespoke solution. See here for the code example.
Requirements
- Publish to an application component, mark to market, positions, and overall profit for a set of traded instruments.
- Values are updated whenever there is a new trade, an instrument price update, or a reset action.
- Batches of trades should result in a single update.
- The system supports a reset action returning all values to their initial state.
- Only publish values that have changed.
Input Events From the Application
Trade
Instrument prices
Reset signal
Outputs Published to Listeners
Calculations
- One FX trade creates two position changes, one for each currency, positive for a buy and negative for a sell.
- Currency positions from each trade contribute to a cumulative position for that currency.
- The position for a currency is used to calculate the market value relative to another currency, in this case, USD.
- If no currency->USD rate is available, the mark-to-market value is NaN for that currency position.
- The sum of all currency mark to market values is the overall profit or loss of the trading position.
Solution
The interesting code for the solution is all in one class, TradingCalculator.java, with three main sections.
- Building the graph — Describes and constructs an instance of the event processor that supports the business logic.
- Public service methods — A set of methods for use by the enclosing application; each method results in one or more events pushed to the graph.
- Static helper functions — Stateless functions that are invoked by various nodes in the event processor.
Building the Graph
All of the constructs will be covered in greater detail in future articles, the point of this article is to compare building a similar solution with an imperative model, so the code breakdown is brief.
private void buildProcessor(SEPConfig config) {
var resetTrigger = subscribeToSignal("reset");
var publishTrigger = subscribeToSignal("publish");
var assetPosition = subscribe(Trade.class)
.flatMap(Trade::tradeLegs)
.groupBy(TradeLeg::id, TradeLeg::amount, Aggregates.doubleSum())
.resetTrigger(resetTrigger);
var assetPriceMap = subscribe(PairPrice.class)
.map(TradingCalculator::toCrossRate)
.groupBy(Trade.AssetPrice::id, Trade.AssetPrice::price)
.resetTrigger(resetTrigger);
var posDrivenMtmStream = assetPosition.map(GroupByStreamed::keyValue)
.map(TradingCalculator::markToMarketPosition, assetPriceMap.map(GroupBy::map))
.updateTrigger(assetPosition);
var priceDrivenMtMStream = assetPriceMap.map(GroupByStreamed::keyValue)
.map(TradingCalculator::markToMarketPrice, assetPosition.map(GroupBy::map))
.updateTrigger(assetPriceMap);
//Mark to market to sink as a map
var mtm = posDrivenMtmStream.merge(priceDrivenMtMStream)
.groupBy(KeyValue::getKey, KeyValue::getValueAsDouble)
.resetTrigger(resetTrigger)
.map(GroupBy::map)
.updateTrigger(publishTrigger)
.filter(Predicates.hasMapChanged())
.sink("mtm");
//Positions to sink as a map
assetPosition.map(GroupBy::map)
.updateTrigger(publishTrigger)
.filter(Predicates.hasMapChanged())
.sink("positions");
//sum of mtm is profit
mtm.mapToDouble(TradingCalculator::totalProfit)
.filter(Predicates.hasDoubleChanged())
.sink("profit");
}
Line 2-3 Because data is always live, we sometimes need to override when events are propagated from a node. The subscriptions connect a node to a signal event, keyed by a string key. These nodes are used later in the graph for triggering publishing and resetting behavior.
Line 5-8 Publishes a position map. Subscribes to Trade objects and uses a flatmap to create an iteration over the trade legs. A groupby maps with a cumulative sum function for each key and stores the result in a map. This is the whole position calculation. GroupBy is a stateful node holding the keys and values in a map. GroupBy node can be reset, and the underlying map is cleared; the reset trigger is connected to the reset signal node above.
Line 10-12 Publishes an asset price to base currency map. Subscribes to PairPrice and uses groupBy to partition into a map keyed by asset currency. A map function calls into a static user function TradingCalculator#toCrossRate to calculate the mark-to-market rate.
Line 15-17 Creates a stream of mark-to-market updates for an asset, triggered when the assetPosition has updated. TradingCalculator#markToMarketPosition is a stateless binary function and would trigger if either input changes, so we override the update trigger.
Line 19-21 Creates a stream of mark-to-market updates for an asset, triggered when the assetPriceMap has updated.
Line 24-30 Publishes the mark to map to a sink when there is a change in the map. The two market-to-market streams are merged into a single update stream. The update is merged into a map, and the whole map is published downstream(line 27) to an external sink. A filter gates the update to the sink and checks for a change to the previously published version. Triggers for resetting and overriding the update are provided. Batches of trades are handled. The update is published when the batch is finished processing.
Line 33-36 Publishes position map to a sink. The update trigger is overridden, and the changed filter is applied to the output.
A note on Input and Output:
- Subscriptions feed data from the application into the event processor,
- Sinks push data from a node in the event processor to an external consumer.
Mapping to Events
Service methods are mapped into events for consumption by the event processor. The event processor supports helper methods for sending keyed signals and registering sinks for output; these utility methods are only wrappers around sending events to the event processor.
public TradingCalculator() {
streamProcessor = Fluxtion.interpret(this::buildProcessor);
streamProcessor.init();
}
public void processTrade(Trade trade) {
System.out.println("\nrcvd trade -> " + trade);
streamProcessor.onEvent(trade);
streamProcessor.publishSignal("publish");
}
public void priceUpdate(PairPrice price) {
System.out.println("\nrcvd price -> " + price);
streamProcessor.onEvent(price);
streamProcessor.publishSignal("publish");
}
public void reset() {
System.out.println("\nreset");
streamProcessor.publishSignal("reset");
streamProcessor.publishSignal("publish");
}
public void markToMarketConsumer(Consumer<Map<String, Double>> listener) {
streamProcessor.addSink("mtm", listener);
}
public void positionsConsumer(Consumer<Map<String, Double>> listener) {
streamProcessor.addSink("positions", listener);
}
public void profitConsumer(DoubleConsumer listener) {
streamProcessor.addSink("profit", listener);
}
Helper Functions
The graph invokes a set of static helper methods, referenced in #map invocations in the graph construction. These functions are specific to the application's needs but are not special; library functions like Double#sum can be used if suitable.
public static KeyValue<String, Double> markToMarketPrice(
KeyValue<String, Double> assetPrice, Map<String, Double> assetPositionMap) {
if (assetPrice == null || assetPositionMap.get(assetPrice.getKey()) == null) {
return null;
}
return new KeyValue<>(assetPrice.getKey(), assetPositionMap.get(assetPrice.getKey()) * assetPrice.getValue());
}
public static KeyValue<String, Double> markToMarketPosition(
KeyValue<String, Double> assetPosition, Map<String, Double> assetPriceMap) {
if (assetPosition == null) {
return null;
}
if (assetPosition.getKey().equals(baseCurrency)) {
return new KeyValue<>(assetPosition.getKey(), assetPosition.getValue());
}
if(assetPriceMap == null){
return new KeyValue<>(assetPosition.getKey(), Double.NaN);
}
return new KeyValue<>(
assetPosition.getKey(),
assetPriceMap.getOrDefault(assetPosition.getKey(), Double.NaN) * assetPosition.getValue());
}
public static double totalProfit(Map<String, Double> m) {
return m.values().stream().mapToDouble(Double::doubleValue).sum();
}
public static AssetPrice toCrossRate(PairPrice pairPrice) {
if (pairPrice.id().startsWith(baseCurrency)) {
return (new AssetPrice(pairPrice.id().substring(3), 1.0 / pairPrice.price()));
}
return (new AssetPrice(pairPrice.id().substring(0, 3), pairPrice.price()));
}
Running the Example
Below is the actual output for running the main program. It is worth noting in the output how filters only publish data if there has been a change to the upstream value. This can greatly reduce the load on downstream systems.
Main Program
Creates the trading calculator, registers sinks for displaying output to the console and then exercises the service interface methods of the calculator.
public static void main(String[] args) {
TradingCalculator tradingCalculator = new TradingCalculator();
//add listeners for output
tradingCalculator.markToMarketConsumer(
m -> System.out.println("Asset mark to market\t:" + m));
tradingCalculator.positionsConsumer(
m -> System.out.println("Asset positions\t\t\t:" + m));
tradingCalculator.profitConsumer(
d -> System.out.println("Total trading profit\t:" + d));
//send trades and rates
tradingCalculator.processTrade(Trade.bought("EURUSD", 250d, 130d));
tradingCalculator.processTrade(Trade.bought("EURUSD", 250d, 130d));
tradingCalculator.processTrade(Trade.sold("EURCHF", 120d, 100d));
tradingCalculator.priceUpdate(new PairPrice("EURUSD", 1.5));
tradingCalculator.priceUpdate(new PairPrice("USDCHF", 1.2));
tradingCalculator.processTrade(Trade.bought("GBPJPY", 20d, 26000d));
tradingCalculator.priceUpdate(new PairPrice("EURUSD", 1.0));
//reset
tradingCalculator.reset();
//trade batch after reset
tradingCalculator.priceUpdate(new PairPrice("EURUSD", 1.5));
tradingCalculator.priceUpdate(new PairPrice("GBPUSD", 1.25));
tradingCalculator.priceUpdate(new PairPrice("USDJPY", 202));
tradingCalculator.priceUpdate(new PairPrice("USDCHF", 1.25));
tradingCalculator.processTrades(
Trade.bought("EURUSD", 20d, 11d),
Trade.bought("GBPJPY", 20d, 26000d),
Trade.sold("EURCHF", 120d, 100d)
);
}
Execution Output
The output is self-explanatory and demonstrates the functionality for:
- Position calculations.
- Mark to-market calculations.
- Total profit calculation.
- Only publishing changes to sinks.
- Resetting to zero states.
- Handling sets of trades as a batch.
rcvd trade -> Trade[dealt=TradeLeg[id=EUR, amount=250.0], contra=TradeLeg[id=USD, amount=-130.0]]
Asset mark to market :{EUR=NaN, USD=-130.0}
Total trading profit :NaN
Asset positions :{EUR=250.0, USD=-130.0}
rcvd trade -> Trade[dealt=TradeLeg[id=EUR, amount=250.0], contra=TradeLeg[id=USD, amount=-130.0]]
Asset mark to market :{EUR=NaN, USD=-260.0}
Asset positions :{EUR=500.0, USD=-260.0}
rcvd trade -> Trade[dealt=TradeLeg[id=EUR, amount=-120.0], contra=TradeLeg[id=CHF, amount=100.0]]
Asset mark to market :{CHF=NaN, EUR=NaN, USD=-260.0}
Asset positions :{CHF=100.0, EUR=380.0, USD=-260.0}
rcvd price -> PairPrice[id=EURUSD, price=1.5]
Asset mark to market :{CHF=NaN, EUR=570.0, USD=-260.0}
rcvd price -> PairPrice[id=USDCHF, price=1.2]
Asset mark to market :{CHF=83.33333333333334, EUR=570.0, USD=-260.0}
Total trading profit :393.33333333333337
rcvd trade -> Trade[dealt=TradeLeg[id=GBP, amount=20.0], contra=TradeLeg[id=JPY, amount=-26000.0]]
Asset mark to market :{CHF=83.33333333333334, JPY=NaN, EUR=570.0, GBP=NaN, USD=-260.0}
Total trading profit :NaN
Asset positions :{CHF=100.0, JPY=-26000.0, EUR=380.0, GBP=20.0, USD=-260.0}
rcvd price -> PairPrice[id=EURUSD, price=1.0]
Asset mark to market :{CHF=83.33333333333334, JPY=NaN, EUR=380.0, GBP=NaN, USD=-260.0}
reset
Asset mark to market :{}
Total trading profit :0.0
Asset positions :{}
rcvd price -> PairPrice[id=EURUSD, price=1.5]
rcvd price -> PairPrice[id=GBPUSD, price=1.25]
rcvd price -> PairPrice[id=USDJPY, price=202.0]
rcvd price -> PairPrice[id=USDCHF, price=1.25]
Trade batch - start
rcvd trade -> Trade[dealt=TradeLeg[id=EUR, amount=20.0], contra=TradeLeg[id=USD, amount=-11.0]]
rcvd trade -> Trade[dealt=TradeLeg[id=GBP, amount=20.0], contra=TradeLeg[id=JPY, amount=-26000.0]]
rcvd trade -> Trade[dealt=TradeLeg[id=EUR, amount=-120.0], contra=TradeLeg[id=CHF, amount=100.0]]
Trade batch - complete
Asset mark to market :{CHF=80.0, JPY=-128.7128712871287, EUR=-150.0, GBP=25.0, USD=-11.0}
Total trading profit :-184.7128712871287
Asset positions :{CHF=100.0, JPY=-26000.0, EUR=-100.0, GBP=20.0, USD=-11.0}
Process finished with exit code 0
Comparing to an Imperative Solution
As with any imperative versus functional approach, the same arguments hold true where appropriate functional solutions are advantageous. Functional solutions are not always the best fit for every problem when compared to imperative approaches, but in this case, I think the balance is in favor of the functional style.
State
There are at least six items of state for maps and filters. These would have to be coded and managed correctly in the imperative solution. All of the states are transparently handled in the functional approach.
Dispatch
With imperative implementation, connecting the state to the calculations has to be manually written. Specific entry methods for each type of input need to be created. Connecting the output to the application requires listener registration and mechanical code that adds no value.
Triggering
All the triggering of calculations must be manually written and tested; filters must be invoked in the right place and in order. Again the functional approach takes care of all this logic transparently.
Control Signals
Resetting and publishing signals need to be coordinated with state management.
Testing
Helper functions are easily testable as external static functions. Sinks can be registered with the processor, and a set of events fired in. This is a repeatable testing approach for any event processor. A custom set of tests has to be written for the imperative case. If there is a problem in the live application, we can guarantee the situation can be reproduced in the functional case if the event stream is captured. The same state guarantees cannot be made about the imperative solution.
Composing Complexity
Complexity and functions are composed in the construction phase. This is easy to alter and update as the pattern is well understood. In the imperative solution is freeform, any approach could have been formulated and built. This will require time to understand and fix bugs for anyone unfamiliar with the code base.
Conditional Logic
There are no explicit conditional statements in the functional solution. In the imperative approach, the code would be littered with if/else statements; each has to be understood and behave correctly. Poor conditional logic is a rich source of bugs.
Reasoning About the Logic
Because a repeatable construction pattern is followed, there is virtually no learning curve for a new developer to understand the code and fix bugs. Because the triggering and order of execution are predictable, there is no need to understand a custom event dispatch solution, again making the learning curve easier.
Code Size
The core imperative solution may be smaller in code. Still, the other requirements, such as listener registration, filters, state management, declaring variables, exposing methods for testing, etc., will create code noise and probably make the solution larger than the functional definition.
I hope this article has created interest in event stream programming and demonstrated its use could be quite easy if we uncouple non-core concerns. Comparing the imperative approach to the functional approach shows the value of the former in this case. As complexity rises, the functional case becomes compelling.
Published at DZone with permission of Greg Higgins. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments