Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Streaming and Transforming Data with Apache Ignite

DZone's Guide to

Streaming and Transforming Data with Apache Ignite

· Java Zone ·
Free Resource

Verify, standardize, and correct the Big 4 + more– name, email, phone and global addresses – try our Data Quality APIs now at Melissa Developer Portal!

In its 1.0 release Apache Ignite added much better streaming support with ability to perform various data transformations, as well as query the streamed data using standard SQL queries. Streaming in Ignite is generally used to ingest continuous large volumes of data into Ignite distributed caches (possibly configured with sliding windows). Streamers can also be used to simply preload large amounts of data into caches on startup.

Here is an example of processing a stream of random numbers.

  1. The stream gets partitioned to multiple cluster nodes in such a way that same numbers will always be processed on the same node. 
  2. Upon receiving a number, our StreamTransformer will get the current count for that number and increment it by 1.
try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer("numbers")) {
    // Allow data updates.
    stmr.allowOverwrite(true);
 
    // Configure data transformation to count random numbers 
    // added to the stream.
    stmr.receiver(StreamTransformer.from((e, arg) -> {
        // Get current count.
        Long val = e.getValue();
 
        // Increment count by 1.
        e.setValue(val == null ? 1L : val + 1);
 
        return null;
    }));
 
    // Stream 10 million of random numbers in the range of 0 to 1000.
    for (int i = 1; i <= 10_000_000; i++) {
        stmr.addData(RAND.nextInt(1000), 1L);
 
        if (i % 500_000 == 0)
            System.out.println("Number of tuples streamed into Ignite: " + i);
    }
}

As we are streaming the data into the system, we can also query it using standard SQL. In this case, the data type name (in the example below it is "Long") is treated as a table name. 

In the query below, we select 10 most popular numbers out of the stream.

// Query top 10 most popular numbers every.
SqlFieldsQuery top10Qry = new SqlFieldsQuery(
    "select _key, _val from Long order by _val desc limit 10");
 
// Execute query and get the whole result set.
List<List<?>> top10 = stmCache.query(top10Qry).getAll();


Developers! Quickly and easily gain access to the tools and information you need! Explore, test and combine our data quality APIs at Melissa Developer Portal – home to tools that save time and boost revenue. Our APIs verify, standardize, and correct the Big 4 + more – name, email, phone and global addresses – to ensure accurate delivery, prevent blacklisting and identify risks in real-time.

Topics:
java ,bigdata ,framework

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}