Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Streaming and Transforming Data with Apache Ignite

DZone's Guide to

Streaming and Transforming Data with Apache Ignite

· Java Zone ·
Free Resource

Microservices. Streaming data. Event Sourcing and CQRS. Concurrency, routing, self-healing, persistence, clustering...learn how Akka enables Java developers to do all this out of the box! Brought to you in partnership with Lightbend.

In its 1.0 release Apache Ignite added much better streaming support with ability to perform various data transformations, as well as query the streamed data using standard SQL queries. Streaming in Ignite is generally used to ingest continuous large volumes of data into Ignite distributed caches (possibly configured with sliding windows). Streamers can also be used to simply preload large amounts of data into caches on startup.

Here is an example of processing a stream of random numbers.

  1. The stream gets partitioned to multiple cluster nodes in such a way that same numbers will always be processed on the same node. 
  2. Upon receiving a number, our StreamTransformer will get the current count for that number and increment it by 1.
try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer("numbers")) {
    // Allow data updates.
    stmr.allowOverwrite(true);
 
    // Configure data transformation to count random numbers 
    // added to the stream.
    stmr.receiver(StreamTransformer.from((e, arg) -> {
        // Get current count.
        Long val = e.getValue();
 
        // Increment count by 1.
        e.setValue(val == null ? 1L : val + 1);
 
        return null;
    }));
 
    // Stream 10 million of random numbers in the range of 0 to 1000.
    for (int i = 1; i <= 10_000_000; i++) {
        stmr.addData(RAND.nextInt(1000), 1L);
 
        if (i % 500_000 == 0)
            System.out.println("Number of tuples streamed into Ignite: " + i);
    }
}

As we are streaming the data into the system, we can also query it using standard SQL. In this case, the data type name (in the example below it is "Long") is treated as a table name. 

In the query below, we select 10 most popular numbers out of the stream.

// Query top 10 most popular numbers every.
SqlFieldsQuery top10Qry = new SqlFieldsQuery(
    "select _key, _val from Long order by _val desc limit 10");
 
// Execute query and get the whole result set.
List<List<?>> top10 = stmCache.query(top10Qry).getAll();


Microservices. Streaming data. Event Sourcing and CQRS. Concurrency, routing, self-healing, persistence, clustering...learn how Akka enables Java developers to do all this out of the box! Brought to you in partnership with Lightbend. 

Topics:
java ,bigdata ,framework

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}