DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
  1. DZone
  2. Data Engineering
  3. Big Data
  4. A Step Towards a Centralized Streaming Hub

A Step Towards a Centralized Streaming Hub

Ingesting and streaming data from edge devices is an exciting proposition for data scientists. Read on to learn how to do this with Apache Spark.

Subhasish Guha user avatar by
Subhasish Guha
CORE ·
Nov. 28, 18 · Tutorial
Like (3)
Save
Tweet
Share
6.47K Views

Join the DZone community and get the full member experience.

Join For Free

The idea of a Centralized Streaming Hub or Next Generation Real-Time Data Processing tool came into the picture while I was trying to solve a typical IoT issue. There are too many streaming IoT data sources coming to the cloud, which need to be cleaned, processed, filtered, and monitored. There was an interesting requirement to create a Dynamic Rule Engine on Streaming Data. The requirement also demands typical ELT scenarios and putting data into a stream, where differen business applications can use the data. Nowadays, most clients are looking for streaming ingestions through Kafka. Typically, Kafka has a brilliant throughput and very low latency. Clients are looking for a streaming hub where streaming data is available.

Let's first look at the suggested architecture:

Image title

I am trying to create a platform which will expose my streaming data statistics, real-time monitoring system, ingestion to data lake, and highly used monitoring systems. In this article, I will explain how to create a custom Kafka Producer using Java. We will take a deep dive into Spark 2.3's continuous streaming property and also how custom data quality modules can help to identify malformed records.

To get streaming sources, I have installed Kafka 0.10.2.1 on my local machine. I've also created a simple Java producer to push messages in different topics. Here is an example:

public class DemoKafkaProducer {
 public static void main(String[] args) {
  //String directory=args[0];
  Properties props = new Properties();
  props.put("bootstrap.servers", "localhost:9092");
  props.put("serializer.class", "kafka.serializer.StringEncoder");
  props.put("request.required.acks", "1");
  props.put("batch.size", 16384);
  props.put("linger.ms", 1);
  props.put("buffer.memory", 33554432);
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

  KafkaProducer < String, String > producer = new KafkaProducer < String, String > (props);
  try {

   for (int i = 0; i <= 1000000; i++) {
    String msg = < json string > ;
    producer.send(new ProducerRecord < String, String > ("<topic name>", msg));
    System.out.println("successfull");
    Thread.sleep(2000);
   }
   producer.close();

  } catch (Exception e) {
   producer.close();
   e.printStackTrace();
  }
 }
}

In the next step, I am using structured streaming to read from these Kafka queues. A single Spark streaming job can consume data from multiple topics, but a max of four is recommended by one of the creators of the Spark Streaming, Tathagata Das.

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.10.2.1</version>
    <scope>compile</scope>
    <exclusions>
        <exclusion>
            <artifactId>jmxri</artifactId>
            <groupId>com.sun.jmx</groupId>
        </exclusion>
        <exclusion>
            <artifactId>jms</artifactId>
            <groupId>javax.jms</groupId>
        </exclusion>
        <exclusion>
            <artifactId>jmxtools</artifactId>
            <groupId>com.sun.jdmk</groupId>
        </exclusion>
    </exclusions>
</dependency>

We are using Spark 2.3 here and reading directly from multiple streams :

var df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "<Topic names>").load()

Stucture of the Reader Dataset

key (optional) and value are the fields which can be passed from the user's end. topic  and partition identify the unique sources of the data. The offset ID is a field which uniquely identifies a message. The key to structured streaming is that it needs to have a writer stream. From Spark 2.3, we get a trigger for continuous streaming which makes the latency about 10ms. For our case, we did some basic quality checks on the received messages and also checked wheter our dynamic rule engine is sending some requirements or not. We split the data service and dynamic rule service and loaded it to different topics.

root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions.{lit, max, row_number}
import spark.implicits._
import org.apache.spark.sql.Row

val recdValidator = udf(dataValidator _)

df=df.withColumn("value", recdValidator(df("value"),df("topic"),lit(rule_file_df)))

var data_df=df.filter(<topic name>)

var rule_df=df.filter(<topic name>)

This Rule can be configured from a metadata table. 

like : Null check, Improper Junk characters, valid json records, missing tags in json etc.

val query =data_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").writeStream.format("kafka").trigger(Trigger.Continuous("10 seconds")).option("kafka.bootstrap.servers", "localhost:9092").option("topic", "<data part>").start()

val query_rule =rule_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").writeStream.format("kafka").trigger(Trigger.Continuous("10 seconds")).option("kafka.bootstrap.servers", "localhost:9092").option("topic", "<Rule part>").start()

query.start()

query_rule.start()

Trigger.Continuous("10 seconds") means we are checkpointing every 10 seconds.

This pushes our ingested data to centralized streams based on topic checks. The next part is ingesting the data to a data lake. We'll analyze the data using NoSQL live (near real-time).

The next dynamic rule based check can either be achieved by structured streaming or asynchronous streaming and microbatch model. The limitation of structured streaming (until Spark 2.4) was that it couldnot perform full outer joins. It only worked on inner, left outer, and right outer joins based on the watermark condition. We need topic data-inner join-topic rule to perform these operations. In short, the dynamic rule should come with a topic name that's the same as name of the ingested data. This would get limited to certain operations only. But, for asynchronous streams, we can perform all the possible batch operations and write it into a NoSQL or time series database, like Influx.

If we can create micro batch for 30-60 seconds and pass our rules dynamically from a UI with a template format, we can report from NoSQL live. A dynamic rule engine demo will be posted in the next version of this article.

Hope this helps the people to use Spark streaming in a more interactive way.

Thanks, keep Sparking!

Data processing Database kafka

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • Core Machine Learning Metrics
  • Project Hygiene
  • Last Chance To Take the DZone 2023 DevOps Survey and Win $250! [Closes on 1/25 at 8 AM]
  • The Role of Data Governance in Data Strategy: Part II

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends: