{{announcement.body}}
{{announcement.title}}

JVM Advent Calendar: Frameworks for Big Data Processing in Java

DZone 's Guide to

JVM Advent Calendar: Frameworks for Big Data Processing in Java

Check out this post to learn more about the top libraries and frameworks for big data processing in Java.

· Java Zone ·
Free Resource

Read this post to learn more about the top libraries and frameworks for big data processing in Java.

The concept of big data is understood differently in a variety of domains where companies face the need to deal with increasing volumes of data. In most of these scenarios, the system under consideration needs to be designed in such a way so that it is capable of processing that data without sacrificing throughput as data grows in size.

This essentially leads to the necessity of building systems that are highly scalable so that more resources can be allocated based on the volume of data that needs to be processed at a given point in time.

You may also like: Big Data Building Blocks: Selecting Architectures and Open-Source Frameworks

Building such a system is a time-consuming and complex activity, and for that reason, third-party frameworks and libraries can be used to provide the scalability requirements out of the box. There are already a number of good choices that can be used in Java applications; this article we will discuss briefly some of the most popular ones:



The Frameworks in Action

We are going to demonstrate each of the frameworks by implementing a simple pipeline for processing of data from devices that measure the air quality index for a given area. For simplicity, we will assume that numeric data from the devices is either received in batches or in a streaming fashion. Throughout the examples, we are going to use the THRESHOLD constant to denote the value above which we consider an area being polluted.

Apache Spark

In Spark, we need to first convert the data into a proper format. We are going to use Datasets but we can also choose DataFrames or RDDs (Resilient Distributed Datasets) as an alternative for the data representation. We can then apply a number of Spark transformations and actions in order to process the data in a distributed fashion.

Java




xxxxxxxxxx
1
18


1
public long countPollutedRegions(String[] numbers) {
2
        // runs a Spark master that takes up 4 cores
3
        SparkSession session = SparkSession.builder().
4
                appName("AirQuality").
5
                master("local[4]").
6
                getOrCreate();
7
        // converts the array of numbers to a Spark dataset
8
        Dataset numbersSet = session.createDataset(Arrays.asList(numbers), 
9
                Encoders.STRING());
10
        
11
        // runs the data pipeline on the local spark
12
        long pollutedRegions = numbersSet.map(number -> Integer.valueOf(number), 
13
                Encoders.INT())
14
                .filter(number -> number > THRESHOLD).count();
15
        
16
        
17
        return pollutedRegions;
18
    }



If we want to change the above application to read data from an external source, write to an external data source, and run it on a Spark cluster rather than a local Spark instance, we would have the following execution flow:



The Spark driver might be either a separate instance or part of the Spark cluster.

Apache Flink

Similarly to Spark, we need to represent the data in a Flink DataSet and then apply the necessary transformations and actions over it:

Java




x


1
public long countPollutedRegions(String[] numbers) throws Exception {
2
        // creates a Flink execution environment with proper configuration
3
        StreamExecutionEnvironment env = StreamExecutionEnvironment.
4
                createLocalEnvironment();
5
 
           
6
    // converts the array of numbers to a Flink dataset and creates
7
    // the data pipiline
8
        DataStream stream = env.fromCollection(Arrays.asList(numbers)).
9
                map(number -> Integer.valueOf(number))
10
                .filter(number -> number > THRESHOLD).returns(Integer.class);
11
        long pollutedRegions = 0;
12
        Iterator numbersIterator = DataStreamUtils.collect(stream);
13
        while(numbersIterator.hasNext()) {
14
            pollutedRegions++;
15
            numbersIterator.next();
16
        }
17
        return pollutedRegions;
18
    }



If we want to change the above application to read data from an external source, write to an external data source, and run it on a Flink cluster, we would have the following execution flow:



The Flink client where the application is submitted to the Flink cluster is either the Flink CLI utility or JobManager's UI.

Apache Storm

In Storm, the data pipeline is created as a topology of Spouts (the sources of data) and Bolts (the data processing units). Since Storm typically processes unbounded streams of data, we will emulate the processing of an array of air quality index numbers as bounded stream:

Java




x


1
public void countPollutedRegions(String[] numbers) throws Exception {
2
 
           
3
        // builds the topology as a combination of spouts and bolts
4
        TopologyBuilder builder = new TopologyBuilder();
5
        builder.setSpout("numbers-spout", new StormAirQualitySpout(numbers));
6
        builder.setBolt("number-bolt", new StormAirQualityBolt()).
7
            shuffleGrouping("numbers-spout");
8
        
9
        // prepares Storm conf and along with the topology submits it for 
10
        // execution to a local Storm cluster
11
        Config conf = new Config();
12
        conf.setDebug(true);
13
        LocalCluster localCluster = null;
14
        try {
15
            localCluster = new LocalCluster();
16
            localCluster.submitTopology("airquality-topology", 
17
                    conf, builder.createTopology());
18
            Thread.sleep(10000);
19
            localCluster.shutdown();
20
        } catch (InterruptedException ex) {
21
            localCluster.shutdown();
22
        }
23
    }



We have one spout that provides a data source for the array of air quality index numbers and one bolt that filters only the ones that indicate polluted areas:

Java




x


1
public class StormAirQualitySpout extends BaseRichSpout {
2
 
           
3
    private boolean emitted = false;
4
 
           
5
    private SpoutOutputCollector collector;
6
 
           
7
    private String[] numbers;
8
 
           
9
    public StormAirQualitySpout(String[] numbers) {
10
        this.numbers = numbers;
11
    }
12
    
13
    @Override
14
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
15
        declarer.declare(new Fields("number"));
16
    }
17
 
           
18
    @Override
19
    public void open(Map paramas, 
20
            TopologyContext context, 
21
            SpoutOutputCollector collector) {
22
        this.collector = collector;
23
    }
24
 
           
25
    @Override
26
    public void nextTuple() {
27
        // we make sure that the numbers array is processed just once by 
28
        // the spout
29
        if(!emitted) {
30
            for(String number : numbers) {
31
                collector.emit(new Values(number));
32
            }
33
            emitted = true;
34
        }
35
    }
36
}



Java




xxxxxxxxxx
1
26


1
public class StormAirQualityBolt extends BaseRichBolt {
2
 
           
3
    private static final int THRESHOLD = 10;
4
 
           
5
    private int pollutedRegions = 0;
6
 
           
7
    @Override
8
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
9
        declarer.declare(new Fields("number"));
10
    }
11
 
           
12
    @Override
13
    public void prepare(Map params, 
14
TopologyContext context, 
15
OutputCollector collector) {
16
    }
17
 
           
18
    @Override
19
    public void execute(Tuple tuple) {
20
        String number = tuple.getStringByField("number");
21
        Integer numberInt = Integer.valueOf(number);
22
        if (numberInt > THRESHOLD) {
23
            pollutedRegions++;
24
        }
25
    }
26
}



We are using a  =LocalCluster instance for submitting to a local Storm cluster, which is convenient for development purposes, but we want to submit the Storm topology to a production cluster. In that case, we would have the following execution flow:



Apache Ignite

In Ignite, we need first to put the data in the distributed cache before running the data processing pipeline, which is the former of an SQL query executed in a distributed fashion over the Ignite cluster:

Java




xxxxxxxxxx
1
31


1
public long countPollutedRegions(String[] numbers) {
2
 
           
3
        IgniteConfiguration igniteConfig = new IgniteConfiguration();
4
        CacheConfiguration cacheConfig = 
5
                new CacheConfiguration();
6
        // cache key is number index in the array and value is the number
7
    cacheConfig.setIndexedTypes(Integer.class, String.class);
8
 
           
9
        cacheConfig.setName(NUMBERS_CACHE);
10
        igniteConfig.setCacheConfiguration(cacheConfig);
11
        
12
        try (Ignite ignite = Ignition.start(igniteConfig)) {
13
            IgniteCache cache = ignite.getOrCreateCache(NUMBERS_CACHE);
14
            // adds the numbers to the Ignite cache
15
            try (IgniteDataStreamer streamer = 
16
                    ignite.dataStreamer(cache.getName())) {
17
                int key = 0;
18
                for (String number : numbers) {
19
                    streamer.addData(key++, number);
20
                }
21
            }
22
 
           
23
            // performs an SQL query over the cached numbers
24
            SqlFieldsQuery query = new SqlFieldsQuery("select * from String where _val > " + THRESHOLD);
25
            
26
            FieldsQueryCursor<List> cursor = cache.query(query);
27
 
           
28
            int pollutedRegions = cursor.getAll().size();
29
            return pollutedRegions;
30
        }
31
}



If we want to run the application in an Ignite cluster, it will have the following execution flow:



Hazelcast Jet

Hazelcast Jet works on top of Hazelcast IMDG, and similarly to Ignite, if we want to process data, we need first to put it in the Hazelcast IMDG cluster:

Java




xxxxxxxxxx
1
25


 
1
public long countPollutedRegions(String[] numbers) {
2
 
           
3
        // prepares the Jet data processing pipeline
4
        Pipeline p = Pipeline.create();
5
        p.drawFrom(Sources.list("numbers")).
6
            map(number -> Integer.valueOf((String) number))
7
            .filter(number -> number > THRESHOLD).drainTo(Sinks.list("filteredNumbers"));
8
 
           
9
        JetInstance jet = Jet.newJetInstance();
10
        IList numbersList = jet.getList("numbers");
11
        numbersList.addAll(Arrays.asList(numbers));
12
 
           
13
        try {
14
            // submits the pipeline in the Jet cluster
15
            jet.newJob(p).join();
16
 
           
17
            // gets the filtered data from Hazelcast IMDG
18
            List filteredRecordsList = jet.getList("filteredNumbers");
19
            int pollutedRegions = filteredRecordsList.size();
20
 
           
21
            return pollutedRegions;
22
        } finally {
23
            Jet.shutdownAll();
24
        }
25
    }



Note, however, that Jet also provides integration without external data sources and data does not need to be stored in the IMDG cluster. You can also do the aggregation without first storing the data into a list (review the full example in Github that contains the improved version). Thanks to Jaromir and Can from Hazelcast engineering team for the valuable input!

If we want to run the application in a Hazelcast Jet cluster, it will have the following execution flow:



Kafka Streams

Kafka Streams is a client library that uses Kafka topics as sources and sinks for the data processing pipeline. To make use of the Kafka Streams library for our scenario, we would be putting the air quality index numbers in a numbers Kafka topic:

Java




xxxxxxxxxx
1
40


1
public long countPollutedRegions() {
2
 
           
3
        List result = new LinkedList();
4
    // key/value pairs contain string items
5
        final Serde stringSerde = Serdes.String();
6
 
           
7
        // prepares and runs the data processing pipeline
8
        final StreamsBuilder builder = new StreamsBuilder();        
9
        builder.stream("numbers", Consumed.with(stringSerde, stringSerde))
10
                .map((key, value) -> new KeyValue(key, Integer.valueOf(value))).
11
                    filter((key, value) -> value > THRESHOLD)
12
                .foreach((key, value) -> {
13
                    result.add(value.toString());
14
                });
15
    
16
        final Topology topology = builder.build();
17
        final KafkaStreams streams = new KafkaStreams(topology, 
18
                createKafkaStreamsConfiguration());
19
        streams.start();
20
 
           
21
        try {
22
            Thread.sleep(10000);
23
        } catch (InterruptedException e) {
24
            e.printStackTrace();
25
        }
26
        int pollutedRegions = result.size();
27
        System.out.println("Number of severely polluted regions: " + pollutedRegions);
28
        streams.close();
29
        return pollutedRegions;
30
    }
31
 
           
32
    private Properties createKafkaStreamsConfiguration() {
33
        Properties props = new Properties();
34
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "text-search-config");
35
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
36
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
37
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
38
 
           
39
        return props;
40
    }



We will have the following execution flow for our Kafka Stream application instances:



Pulsar Functions

Apache Pulsar Functions are lightweight compute processes that work in a serverless fashion along with an Apache Pulsar cluster. Assuming we are streaming our air quality index in a Pulsar cluster, we can write a function to count the number of indexes that exceed the given threshold and write the result back to Pulsar as follows:

Java




xxxxxxxxxx
1
16


 
1
public class PulsarFunctionsAirQualityApplication 
2
    implements Function {
3
 
           
4
    private static final int HIGH_THRESHOLD = 10;
5
 
           
6
    @Override
7
    public Void process(String input, Context context) throws Exception {
8
        
9
        int number = Integer.valueOf(input);
10
        
11
        if(number > HIGH_THRESHOLD) {
12
            context.incrCounter("pollutedRegions", 1);
13
        }
14
        return null;
15
    }
16
}



The execution flow of the function along with a Pulsar cluster is the following:



The Pulsar function can run either in the Pulsar cluster or as a separate application.

Conclusion

In this article, we reviewed briefly some of the most popular frameworks that can be used to implement big data processing systems in Java. Each of the presented frameworks is fairly big and deserves a separate article on its own.

Although quite simple, our air quality index data pipeline demonstrates the way these frameworks operate and you can use that as a basis for expanding your knowledge in each one of them that might be of further interest.

You can review the complete code samples here.

This post was originally published as part of the Java Advent series. If you like it, please spread the word by sharing it on Twitter, Facebook, etc.!

Want to write for the Java Advent blog? We are looking for contributors to fill all 24 slots and would love to have your contribution! Contact the Java Advent Admin at contribute@javaadvent.com!

Further Reading

 Big Data Building Blocks: Selecting Architectures and Open-Source Frameworks

DZone Guide to Big Data: Volume, Variety, and Velocity

Topics:
big data, big data processing, java, jvm, processing

Published at DZone with permission of Martin Toshev . See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}