Aggregating REST and Real-Time Data Sources
Learn how to aggregate REST and real-time data sources using the open source Swim platform.
Join the DZone community and get the full member experience.
Join For FreeAggregating data from heterogeneous REST APIs and streaming sources can be a pain. In order to achieve real-time insights or visualizations, developers need to efficiently combine REST and streaming data sources. But streaming data is created continuously and storing high volumes of raw stream logs prior to processing requires significant storage and bandwidth resources. Furthermore, just getting data into a database doesn’t help you build an application and it certainly doesn’t help if you’re aiming for real-time performance. So what’s the fastest way to integrate multiple heterogeneous data sources so that developers can aggregate and perform real-time transformations on the new combined data streams?
Standardizing Data Streams Using Web Agents
The first step is to standardize heterogeneous data feeds into a unified format. One way to do this, which has the added benefit of implicit state management, is to use the open source Swim platform. Swim uses stateful Web Agents to provide a uniform way to handle data. Using Web Agents, streaming data and stateless REST API feeds are both converted into stateful WARP data streams. Web Agents can then subscribe to peers to create aggregations. These aggregator Web Agents can be treated by application developers as a single, new data source and consumed via a unique API.
Here’s an example of a Swim Web Agent:
// swim/basic/UnitAgent.java
package swim.basic;
import swim.api.agent.AbstractAgent;
public class UnitAgent extends AbstractAgent {
private void logMessage(Object msg) {
System.out.println(nodeUri() + ": " + msg);
}
}
Building Ingress Bridges From REST APIs or Data Streams
In order to get any kind of data to a Web Agent, you need to setup an ingress bridge. Ingress bridges can either push data into Swim or pull data from an external data source, such as a database. Below, we’ll cover building ingress bridges that push data into Swim, but you can learn about building bridges which pull from databases or other sources here.
Writes to a Swim server are most easily accomplished using a Swim client instance, but doing so requires one of the following:
The data source itself is written in either Java or Javascript (currently the only two languages that support Swim clients)
The data source pushes messages, using any networking protocol of your choice, to a different Java/Javascript process, which then uses a Swim client to relay data to the Swim server
Note that the second is simply the first with an intermediary process. Either way, the process that talks directly to the Swim server updates the server by either sending commands or writing to downlinks.
Here’s how to set up a push ingress bridge using Swim:
// swim/basic/SwimWriter.java
public class SwimWriter {
// example usage:
// new SwimWriter()
// .generateOnce("warp://localhost:9001", "/unit/foo", "publish",
// Text.from("PushOption1"));
// Note that this `SwimWriter` wrapper class is mostly just pedantic; nothing
// wrong with directly operating with `ClientRuntime` instances
private final ClientRuntime swim;
public SwimWriter() {
this.swim = new ClientRuntime();
this.swim.start();
}
public void generateOnce(String host, String node, String lane, Value v) {
this.swim.command(host, node, lane, v);
}
}
Building Ingress Bridges From Websockets
You can also create an ingress bridge using websockets. Because WARP is built on top of websockets, sending the right websocket messages in the right order, even without a proper Swim handle, can trigger actions on a Swim server. Downlinks are near-impossible instantiate in this manner, but sending commands is very simple (by design, because this is how we want non-Swim clients to write to Swim).
Commanding a lane without WARP just requires two steps:
Open a websocket connection to the desired server's
hostUri
Write a string of the form
@command(node:"%n",lane:"%l")%p
through this connection, where%n
is the desirednodeUri
,%l
is thelaneUri
, and%p
is the payload
Consequently, this kind of ingress bridge can be written in any language that supports websockets. For example, here's what it looks like Python:
# data_generator.py
# Prereq: install websocket-client: https://github.com/websocket-client/websocket-client
from websocket import create_connection
ws = create_connection('ws://localhost:9001')
# all parameters are strings
def generate_once(host, node, lane, v):
message = '@command(node:{},lane:{}){}'.format(node, lane, v)
# equivalent old-school syntax:
# message = '@command(node:%s,lane:%s)%s' % (node, lane v)
ws.send(message)
Creating Aggregations Using Web Agents
Fundamentally, the way to create aggregations between Web Agents is by using downlinks. Using downlinks, you can create aggregator Web Agents which subscribe to multiple data sources. This method works regardless of whether the original source was a REST API, Kafka instance, Kubernetes pod, or data stream.
Here’s how to create a downlink between Web Agents:
// swim/basic/ListenerAgent.java
public class ListenerAgent extends AbstractAgent {
private void openSubscription() {
final MapDownlink<String, Integer> downlink = downlinkMap()
.keyForm(Form.forString()).valueForm(Form.forInteger())
.nodeUri("/unit/0").laneUri("shoppingCart")
.didUpdate((k, n, o) -> {
logMessage("downlink: " + k + " count changed to " + n + " from " + o);
})
.open();
}
private void logMessage(Object msg) {
System.out.println(nodeUri() + ": " + msg);
}
}
Client-side, downlinks must be issued from a ClientRuntime
, but the builder syntax is otherwise identical:
// swim/basic/CustomClient.java
class CustomClient {
public static void main(String[] args) throws InterruptedException {
ClientRuntime swimClient = new ClientRuntime();
swimClient.start();
final String hostUri = "warp://localhost:9001";
final String nodeUriPrefix = "/unit/";
// map downlink example
final MapDownlink<String, Integer> link = swimClient.downlinkMap()
.keyForm(Form.forString()).valueForm(Form.forInteger())
.hostUri(hostUri).nodeUri(nodeUriPrefix+"0").laneUri("shoppingCart")
.keepLinked(false)
.open();
// Remember that downlinks can write, too!
link.put("FromClientLink", 25);
}
}
Learn More
Hopefully that was a useful introduction for creating real-time aggregations using the open source Swim platform. You can get started with Swim here or check us out on GitHub.
Published at DZone with permission of Bradley Johnson. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments