Understanding Bistro Streams: Counting Clicks by Region
In this post, we go over a real world use case in which we use this open source data streaming platform to collect user data.
Join the DZone community and get the full member experience.
Join For FreeIntroduction
Bistro Streams [1] is a light-weight stream and batch processing library which radically changes the way stream data is processed by relying on a new column-oriented approach to data processing. Although its features are focused on stream analytics with applications in IoT and edge computing, it can also be applied to batch processing, including such tasks as data integration, data migration, extract-transform-load (ETL) or big data processing. Bistro Streams is based on novel principles including conceptual design and physical architecture by relying on Bistro Engine library [2] and this technology can be shortly described as follows:
Bistro Streams does for stream analytics what column stores did for databases.
More specifically, Bistro Streams has the following major distinguishing features:
Bistro Streams defines its data processing logic using column operations as opposed to using only set operations in traditional systems including Kafka Streams and Flink. Bistro makes columns and column operations first-class elements of data modeling and data processing. In particular, Bistro Streams does not use such difficult to comprehend and execute operations like join, group-by or reduce.
Bistro Streams also has a column-oriented physical representation of data (in-memory). The idea is apparently not new and widely used in column stores but it is new for stream processing. In the case of long histories (which is needed for complex analysis) and complex analytic workflows, it can provide higher performance. It is also important for running on edge devices with limited resources.
The third feature of Bistro Streams is how it organizes data processing in time. Bistro Streams separates the logic of (1) triggering the processes for appending, evaluating, and deleting data from the logic of (2) what to do during evaluation, that is, data processing itself. In particular, the frequency and conditions for starting evaluations are specified using a separate API. The same for retention policies where deletion time is determined independently of the data processing operations applied to data.
Bistro Streams also does not distinguish between batch and stream processing. In particular, the data is internally represented in the same way for both workloads and the difference is only in how frequently the state is updated. Therefore, it can be applied to a wider range of problems and use cases including batch processing, data ingestion, ETL or real-time stream processing. The data processing logic is always the same.
In this article, we provide an introduction to Bistro Streams by demonstrating its basic features using an example [3] where we need to count the number of clicks made by users from different regions.
Problem
Let us assume we have an input stream sending asynchronous events. Each event contains a username and number of clicks made by the user. For example, a sequence of messages could be as follows: {"User": "Alice", "Count": 3}
, {"User": "Bob", "Count": 5}
, {"User": "Max", "Count": 2}
, and so on.
Each user originates from some region which can be retrieved from the table of users. This means that we can convert a sequence of usernames into a sequence of their regions like "Americas"
, "Americas"
, "Europas"
for the above messages.
Our goal is to continuously monitor the number of clicks originating from each region for the latest period of time, for example, for the last 10 seconds. We want to regularly update this information, for example, every 2 seconds.
If we represent all regions as a table then the first column would store region names and the second column would store the number of clicks received from it for the last 10 seconds. For example, we could see that there were 111 clicks from Americas and 222 clicks from Europas for the last 10 seconds. However, in 2 seconds, these numbers will probably change. Of course, the period for moving aggregation (10 seconds) and the frequency of updating the result (2 seconds) can be adjusted if necessary.
In Bistro Streams, the data processing functions are distributed between the following layers:
Data state is essentially a database, that is, a number of tables and columns.
Data processing logic is encapsulated into table and column definitions which involve user-defined functions written in Java. Defining a table means that it will be populated from other tables and defining a column means that its values will be evaluated from other columns.
Actions which are submitted to the server and describe what needs to be done, for example, add new records or provide an output or evaluate the expressions.
In the following sections, we will describe how these tasks are implemented.
Solution
Define Data Schema
Before we can process data we need to define where it will be stored and, for that purpose, we define a number of tables in the same way as we do it for databases. In our case, we need three tables:
CLICKS
: This table will store source events and hence it should have a column with the username and a column for the number of clicks. In addition, it will need a (derived) column which references this user record in theUSERS
table.USERS
: This table stores all users and their properties. In particular, it has columns for user name and user region. We will find the region for each user from this table. It needs also a (derived) column which directly references records from theREGIONS
tables.REGIONS
: This table is a list of regions and it also has a column which will store the results of analysis, that is, the total number of clicks for some period of time. Therefore, it has two columns: region name and the total number of clicks.
Here is the code for defining these schema elements in Bistro:
Schema schema = new Schema("Example 4");
Table regions = schema.createTable("REGIONS");
Column regionName = schema.createColumn("Name", regions);
Column regionClicks = schema.createColumn("Clicks", regions);
Table users = schema.createTable("USERS");
Column userName = schema.createColumn("Name", users);
Column userRegion = schema.createColumn("Region", users);
Table clicks = schema.createTable("CLICKS");
Column clickTime = schema.createColumn("Time", clicks);
Column clickUser = schema.createColumn("User", clicks);
Column clickCount = schema.createColumn("Count", clicks);
Define Data Processing Logic
Data processing in Bistro Streams is performed in so-called derived columns and derived tables. These are normal columns and tables but their data is computed from the data in other columns and tables which in turn can be derived elements.
The idea of data processing is that for each new record in the CLICKS
table, we need to find the user record in the USERS
table and then for this user find its region in the REGIONS
table. And then we can update the number of clicks for this region by increasing its current value in the Clicks
columns. For example, if we get a new event {"User": "Alice", "Count": 3}
then we find the record for the username "Alice"
and the record for the region "Americas"
. Finally, we add 3 to the column Count
of this region record, so if it had 10 clicks then it will have 13 clicks.
First, we need to define two so-called link columns for the tables CLICKS
and USERS
which will reference the USERS
and REGIONS
tables, respectively. This can be done as follows:
// Link column: CLICKS -> USERS
Column clickUserLink = schema.createColumn("User Link", clicks, users);
clickUserLink.link(
new Column[] { clickUser },
userName
);
// Link column: USERS -> REGIONS
Column userRegionLink = schema.createColumn("Region Link", users, regions);
userRegionLink.project(
new Column[] { userRegion },
regionName
);
Note that the second column is defined using the project operation. It is equivalent to the link column except that the target record will be added if not found. We decided to use a project column (and not link column) because it will automatically populate the REGIONS
table from the data in the USERS
table. Alternatively, we could populate the regions by loading this data from an external data source and then link to it from the table of users.
Once the tables have been connected, we can define the aggregation logic. It is done by defining a so-called accumulate column in the REGIONS
tables.
regionClicks.setDefaultValue(0.0);
regionClicks.accumulate(
new ColumnPath(clickUserLink, userRegionLink),
(a,p) -> (double)a + (double)p[0], // Add the clicks when an event is received
(a,p) -> (double)a - (double)p[0], // Subtract clicks when the event is deleted
new ColumnPath(clickCount) // Measure to be aggregated
);
This definition means that we group all records from the CLICKS
table depending on the region they belong to. And then we sum up the values in the Clicks
column of the original events for one group.
Importantly, it is done differently than normal when using group-by or reduce. Here we define two functions: one will process a fact when it is added to the source table (adder), and the other will process a fact when it is removed from the source table (remover). The intention is to keep only events during some time (10 seconds in our example) and then delete them. The system will not compute the aggregate for the whole window each time we want to get the result. Instead, the result will be continuously updated as new events are appended and when old events are deleted. It is important in the case of long windows, for example, if we wanted to monitor the number of clicks received for 1 day.
Actions and Data Evaluation Logic
Once the logic of data processing has been defined, we need to determine the sequence of operations with the data. In particular, it is necessary to define how data is fed into the system, when it is processed, when data is removed from the system, and how we view the result.
In our example, we use a simulator to produce input events. It is implemented by the ClickSimulator
class:
ClickSimulator simulator = new ClickSimulator(server, clicks, 1000);
Once this connector instance is started, it will generate events as described at the beginning with some random click counts and with random time delays. For each new event, it will create an ActionAdd
and submit it to the server:
Action action = new ActionAdd(this.table, record);
Task task = new Task(action, null);
this.server.submit(task);
The result of executing this action by the server is a new record in the CLICKS
table. In real applications, we could subscribe to a topic in some message bus or we could receive events via an HTTP listener. If we now start our server then the simulator will generate new events and they will be stored in this table which will continuously grow and no processing will be done.
In order to trigger data processing, we define a timer. It is actually also a connector but it wakes up at regular time intervals in order to trigger some processing or perform any other actions. We want to do data processing every 2 seconds and, therefore, we configure the timer accordingly by passing this frequency as a constructor parameter:
ConnectorTimer timer = new ConnectorTimer(server,2000);
What do we want to do every 2 seconds? Our goal is to aggregate the click counts for the last 10 seconds and hence we need to ensure that we have only data for this period and not more. This can be done by submitting a standard ActionRemove
and specifying that it has to remove all records older than 10 seconds:
timer.addAction(
new ActionRemove(clicks, clickTime, Duration.ofSeconds(Example4.windowLengthSeconds))
);
If we start the server now then the input CLICKS
table will not continuously grow anymore and it will contain only records no older than 10 seconds. Yet, no data processing will be performed.
Now we can really process data by evaluating its derived columns which is done by submitting a standard ActionEval
:
timer.addAction(
new ActionEval(schema)
);
Note that the system will process only new and deleted records by updating the totals.
Now all the totals are up-to-date and we can choose how to output this information. In our example, we simply print the numbers in the console for each region:
timer.addAction(
x -> {
System.out.print("=== Totals for the last " + Example4.windowLengthSeconds + " seconds: ");
Range range = regions.getIdRange();
for(long i=range.start; i<range.end; i++) {
String name = (String) regionName.getValue(i);
Double count = (Double) regionClicks.getValue(i);
System.out.print(name + " - " + count + " clicks; ");
}
System.out.print("\n");
}
);
We can now start the server and observe the running click counts for incoming events:
=== Totals for the last 10 seconds: Americas - 50.0 clicks; Europas - 33.0 clicks;
=== Totals for the last 10 seconds: Americas - 42.0 clicks; Europas - 42.0 clicks;
=== Totals for the last 10 seconds: Americas - 58.0 clicks; Europas - 28.0 clicks;
=== Totals for the last 10 seconds: Americas - 70.0 clicks; Europas - 29.0 clicks;
Normally, however, the results will be written to an output connector which can store them in a database or send to a message bus.
Conclusion
Bistro Streams uses a new data processing paradigm by relying on a column-oriented data model and in-memory storage model. This approach is especially beneficial for complex analytics and stream processing because column operations are frequently much simpler at design time and more efficient at runtime. What is important is that Bistro Streams can be used for both batch and stream analytics because the way data is processed and how frequently it is processed is under full control of the developer. In particular, there is no problem in doing stream and batch analytics simultaneously by processing fast data from an input stream and loading large quantities of data from a persistent data store. Finally, Bistro Streams is implemented as a light-weight software library which can be embedded into devices or run at the edge without the need to provision complex infrastructure [4]. This makes it a good candidate for application in IoT and other areas where near real-time response is required.
The source code of this example can be checked out from [3] and more information can be found in [1] and [2].
Links
[1] Bistro Streams: https://github.com/asavinov/bistro/tree/master/server
[2] Bistro Engine: https://github.com/asavinov/bistro/tree/master/core
[3] Source code of the example described in the article: https://github.com/asavinov/bistro/blob/master/examples/src/main/java/org/conceptoriented/bistro/examples/server/Example4.java
[4] Is Your Stream Processor Obese? https://dzone.com/articles/is-your-stream-processor-obese
Opinions expressed by DZone contributors are their own.
Trending
-
How To Check IP Addresses for Known Threats and Tor Exit Node Servers in Java
-
Auditing Tools for Kubernetes
-
Logging Best Practices Revisited [Video]
-
Effortlessly Streamlining Test-Driven Development and CI Testing for Kafka Developers
Comments