Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Streaming and Transforming Data with Apache Ignite

DZone's Guide to

Streaming and Transforming Data with Apache Ignite

· Java Zone
Free Resource

Learn how to troubleshoot and diagnose some of the most common performance issues in Java today. Brought to you in partnership with AppDynamics.

In its 1.0 release Apache Ignite added much better streaming support with ability to perform various data transformations, as well as query the streamed data using standard SQL queries. Streaming in Ignite is generally used to ingest continuous large volumes of data into Ignite distributed caches (possibly configured with sliding windows). Streamers can also be used to simply preload large amounts of data into caches on startup.

Here is an example of processing a stream of random numbers.

  1. The stream gets partitioned to multiple cluster nodes in such a way that same numbers will always be processed on the same node. 
  2. Upon receiving a number, our StreamTransformer will get the current count for that number and increment it by 1.
try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer("numbers")) {
    // Allow data updates.
    stmr.allowOverwrite(true);
 
    // Configure data transformation to count random numbers 
    // added to the stream.
    stmr.receiver(StreamTransformer.from((e, arg) -> {
        // Get current count.
        Long val = e.getValue();
 
        // Increment count by 1.
        e.setValue(val == null ? 1L : val + 1);
 
        return null;
    }));
 
    // Stream 10 million of random numbers in the range of 0 to 1000.
    for (int i = 1; i <= 10_000_000; i++) {
        stmr.addData(RAND.nextInt(1000), 1L);
 
        if (i % 500_000 == 0)
            System.out.println("Number of tuples streamed into Ignite: " + i);
    }
}

As we are streaming the data into the system, we can also query it using standard SQL. In this case, the data type name (in the example below it is "Long") is treated as a table name. 

In the query below, we select 10 most popular numbers out of the stream.

// Query top 10 most popular numbers every.
SqlFieldsQuery top10Qry = new SqlFieldsQuery(
    "select _key, _val from Long order by _val desc limit 10");
 
// Execute query and get the whole result set.
List<List<?>> top10 = stmCache.query(top10Qry).getAll();


Understand the needs and benefits around implementing the right monitoring solution for a growing containerized market. Brought to you in partnership with AppDynamics.

Topics:
java ,bigdata ,framework

Published at DZone with permission of Dmitriy Setrakyan, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

The best of DZone straight to your inbox.

SEE AN EXAMPLE
Please provide a valid email address.

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.
Subscribe

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}