Integrating Kafka With Spark Structured Streaming
Learn the method to integrate Kafka with Spark for consuming streaming data amd discover how to unleash your streaming analytics needs.
Join the DZone community and get the full member experience.Join For Free
Kafka has its own stream library and is best for transforming Kafka topic-to-topic whereas Spark streaming can be integrated with almost any type of system. For more detail, you can refer to this blog.
In this blog, I’ll cover an end-to-end integration of Kafka with Spark structured streaming by creating Kafka as a source and Spark structured streaming as a sink.
Let’s create a Maven project and add following dependencies in
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.1.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.1.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> <version>1.6.3</version> </dependency>
Now, we will be creating a Kafka producer that produces messages and pushes them to the topic. The consumer will be the Spark structured streaming DataFrame.
First, setting the properties for the Kafka producer.
val props = new Properties() props.put("bootstrap.servers", "localhost:9092") props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
bootstrap.servers: This contains the full list of servers with hostname and port. The list should be in the form of
host1: port, host2: port, and so on.
key.serializer: Serializer class for the key that implements serializer interface.
value.serializer: Serializer class for the key that implements the serializer interface.
Creating a Kafka producer and sending topic over the stream:
val producer = new KafkaProducer[String,String](props) for(count <- 0 to 10) producer.send(new ProducerRecord[String, String](topic, "title "+count.toString,"data from topic")) println("Message sent successfully") producer.close()
The send is asynchronous, and this method will return immediately once the record has been stored in the buffer of records waiting to be sent. This allows sending many records in parallel without blocking to wait for the response after each one. The result of the send is a
RecordMetadata specifying the partition the record was sent to and the offset it was assigned. After sending the data, close the producer using the
Kafka as a Source
Now, Spark will be a consumer of streams produced by Kafka. For this, we need to create a Spark session.
val spark = SparkSession .builder .appName("sparkConsumer") .config("spark.master", "local") .getOrCreate()
This is getting the topics from Kafka and reading it in Spark stream by subscribing to a particular topic that is to be provided in option. Following is the code to subscribe Kafka topics in Spark stream and read it using
val dataFrame = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "mytopic") .load()
Printing the schema of the DataFrame:
The output for the schema includes all the fields related to Kafka metadata.
root |-- key: binary (nullable = true) |-- value: binary (nullable = true) |-- topic: string (nullable = true) |-- partition: integer (nullable = true) |-- offset: long (nullable = true) |-- timestamp: timestamp (nullable = true) |-- timestampType: integer (nullable = true)
Create a dataset from DataFrame by casting the key and value from the topic as a string:
val dataSet: Dataset[(String, String)] =dataFrame.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)]
Write the data in the dataset to the console and hold the program from exit using the method
val query: StreamingQuery = dataSet.writeStream .outputMode("append") .format("console") .start() query.awaitTermination()
The complete code is on my GitHub.
Published at DZone with permission of Jatin Demla, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.