Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

A Step Towards a Centralized Streaming Hub

DZone's Guide to

A Step Towards a Centralized Streaming Hub

Ingesting and streaming data from edge devices is an exciting proposition for data scientists. Read on to learn how to do this with Apache Spark.

· Big Data Zone ·
Free Resource

How to Simplify Apache Kafka. Get eBook.

The idea of a Centralized Streaming Hub or Next Generation Real-Time Data Processing tool came into the picture while I was trying to solve a typical IoT issue. There are too many streaming IoT data sources coming to the cloud, which need to be cleaned, processed, filtered, and monitored. There was an interesting requirement to create a Dynamic Rule Engine on Streaming Data. The requirement also demands typical ELT scenarios and putting data into a stream, where differen business applications can use the data. Nowadays, most clients are looking for streaming ingestions through Kafka. Typically, Kafka has a brilliant throughput and very low latency. Clients are looking for a streaming hub where streaming data is available.

Let's first look at the suggested architecture:

Image title

I am trying to create a platform which will expose my streaming data statistics, real-time monitoring system, ingestion to data lake, and highly used monitoring systems. In this article, I will explain how to create a custom Kafka Producer using Java. We will take a deep dive into Spark 2.3's continuous streaming property and also how custom data quality modules can help to identify malformed records.

To get streaming sources, I have installed Kafka 0.10.2.1 on my local machine. I've also created a simple Java producer to push messages in different topics. Here is an example:

public class DemoKafkaProducer {
 public static void main(String[] args) {
  //String directory=args[0];
  Properties props = new Properties();
  props.put("bootstrap.servers", "localhost:9092");
  props.put("serializer.class", "kafka.serializer.StringEncoder");
  props.put("request.required.acks", "1");
  props.put("batch.size", 16384);
  props.put("linger.ms", 1);
  props.put("buffer.memory", 33554432);
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

  KafkaProducer < String, String > producer = new KafkaProducer < String, String > (props);
  try {

   for (int i = 0; i <= 1000000; i++) {
    String msg = < json string > ;
    producer.send(new ProducerRecord < String, String > ("<topic name>", msg));
    System.out.println("successfull");
    Thread.sleep(2000);
   }
   producer.close();

  } catch (Exception e) {
   producer.close();
   e.printStackTrace();
  }
 }
}

In the next step, I am using structured streaming to read from these Kafka queues. A single Spark streaming job can consume data from multiple topics, but a max of four is recommended by one of the creators of the Spark Streaming, Tathagata Das.

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.10.2.1</version>
    <scope>compile</scope>
    <exclusions>
        <exclusion>
            <artifactId>jmxri</artifactId>
            <groupId>com.sun.jmx</groupId>
        </exclusion>
        <exclusion>
            <artifactId>jms</artifactId>
            <groupId>javax.jms</groupId>
        </exclusion>
        <exclusion>
            <artifactId>jmxtools</artifactId>
            <groupId>com.sun.jdmk</groupId>
        </exclusion>
    </exclusions>
</dependency>

We are using Spark 2.3 here and reading directly from multiple streams :

var df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "<Topic names>").load()

Stucture of the Reader Dataset

key (optional) and value are the fields which can be passed from the user's end. topic  and partition identify the unique sources of the data. The offset ID is a field which uniquely identifies a message. The key to structured streaming is that it needs to have a writer stream. From Spark 2.3, we get a trigger for continuous streaming which makes the latency about 10ms. For our case, we did some basic quality checks on the received messages and also checked wheter our dynamic rule engine is sending some requirements or not. We split the data service and dynamic rule service and loaded it to different topics.

root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions.{lit, max, row_number}
import spark.implicits._
import org.apache.spark.sql.Row

val recdValidator = udf(dataValidator _)

df=df.withColumn("value", recdValidator(df("value"),df("topic"),lit(rule_file_df)))

var data_df=df.filter(<topic name>)

var rule_df=df.filter(<topic name>)

This Rule can be configured from a metadata table. 

like : Null check, Improper Junk characters, valid json records, missing tags in json etc.

val query =data_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").writeStream.format("kafka").trigger(Trigger.Continuous("10 seconds")).option("kafka.bootstrap.servers", "localhost:9092").option("topic", "<data part>").start()

val query_rule =rule_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").writeStream.format("kafka").trigger(Trigger.Continuous("10 seconds")).option("kafka.bootstrap.servers", "localhost:9092").option("topic", "<Rule part>").start()

query.start()

query_rule.start()

Trigger.Continuous("10 seconds") means we are checkpointing every 10 seconds.

This pushes our ingested data to centralized streams based on topic checks. The next part is ingesting the data to a data lake. We'll analyze the data using NoSQL live (near real-time).

The next dynamic rule based check can either be achieved by structured streaming or asynchronous streaming and microbatch model. The limitation of structured streaming (until Spark 2.4) was that it couldnot perform full outer joins. It only worked on inner, left outer, and right outer joins based on the watermark condition. We need topic data-inner join-topic rule to perform these operations. In short, the dynamic rule should come with a topic name that's the same as name of the ingested data. This would get limited to certain operations only. But, for asynchronous streams, we can perform all the possible batch operations and write it into a NoSQL or time series database, like Influx.

If we can create micro batch for 30-60 seconds and pass our rules dynamically from a UI with a template format, we can report from NoSQL live. A dynamic rule engine demo will be posted in the next version of this article.

Hope this helps the people to use Spark streaming in a more interactive way.

Thanks, keep Sparking!

12 Best Practices for Modern Data Ingestion. Download White Paper.

Topics:
streaming analytics ,big data ,apache spark tutorial ,apache spark tutorial scala

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}