Over a million developers have joined DZone.

Streaming and Transforming Data with Apache Ignite

· Java Zone

Check out this 8-step guide to see how you can increase your productivity by skipping slow application redeploys and by implementing application profiling, as you code! Brought to you in partnership with ZeroTurnaround.

In its 1.0 release Apache Ignite added much better streaming support with ability to perform various data transformations, as well as query the streamed data using standard SQL queries. Streaming in Ignite is generally used to ingest continuous large volumes of data into Ignite distributed caches (possibly configured with sliding windows). Streamers can also be used to simply preload large amounts of data into caches on startup.

Here is an example of processing a stream of random numbers.

  1. The stream gets partitioned to multiple cluster nodes in such a way that same numbers will always be processed on the same node. 
  2. Upon receiving a number, our StreamTransformer will get the current count for that number and increment it by 1.
try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer("numbers")) {
    // Allow data updates.
    stmr.allowOverwrite(true);
 
    // Configure data transformation to count random numbers 
    // added to the stream.
    stmr.receiver(StreamTransformer.from((e, arg) -> {
        // Get current count.
        Long val = e.getValue();
 
        // Increment count by 1.
        e.setValue(val == null ? 1L : val + 1);
 
        return null;
    }));
 
    // Stream 10 million of random numbers in the range of 0 to 1000.
    for (int i = 1; i <= 10_000_000; i++) {
        stmr.addData(RAND.nextInt(1000), 1L);
 
        if (i % 500_000 == 0)
            System.out.println("Number of tuples streamed into Ignite: " + i);
    }
}

As we are streaming the data into the system, we can also query it using standard SQL. In this case, the data type name (in the example below it is "Long") is treated as a table name. 

In the query below, we select 10 most popular numbers out of the stream.

// Query top 10 most popular numbers every.
SqlFieldsQuery top10Qry = new SqlFieldsQuery(
    "select _key, _val from Long order by _val desc limit 10");
 
// Execute query and get the whole result set.
List<List<?>> top10 = stmCache.query(top10Qry).getAll();


The Java Zone is brought to you in partnership with ZeroTurnaround. Check out this 8-step guide to see how you can increase your productivity by skipping slow application redeploys and by implementing application profiling, as you code!

Topics:
java ,bigdata ,framework

Published at DZone with permission of Dmitriy Setrakyan, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

The best of DZone straight to your inbox.

SEE AN EXAMPLE
Please provide a valid email address.

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.
Subscribe

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}