Comparing Akka Streams, Kafka Streams and Spark Streaming
We'll discuss the pros and cons of the frameworks and which to use when.
Join the DZone community and get the full member experience.
Join For FreeYou can also find this article on the Rock the JVM blog or in video form on YouTube or down below:
Kafka Streams
What Kafka Streams Looks Like
xxxxxxxxxx
object WordCountApplication extends App {
import Serdes._
val props: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "myFabulousWordCount")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "my-kafka-broker-url:9092")
p
}
val builder: StreamsBuilder = new StreamsBuilder
val textLines: KStream[String, String] =
builder.stream[String, String]("TextLinesTopic")
val wordCounts: KTable[String, Long] = textLines
.flatMapValues(textLine => textLine.toLowerCase.split("\\W+"))
.groupBy((_, word) => word)
.count()(Materialized.as("word-counts-table"))
wordCounts.toStream.to("WordsWithCountsTopic")
val streams: KafkaStreams = new KafkaStreams(builder.build(), props)
streams.start()
sys.ShutdownHookThread {
streams.close(10, TimeUnit.SECONDS)
}
}
This is how a word count application would look like in Kafka Streams. Now this code is quite heavy to take all in at once, so I'll break it down.
xxxxxxxxxx
import Serdes._
Kafka stores records in binary for performance, which means it's up to us to serialize and deserialize them. We can do this with this import of serializers and deserializers (Serdes) automatically in Scala.
xxxxxxxxxx
val props: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "myFabulousWordCount")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "my-kafka-broker-url:9092")
p
}
The first part of an application invariably needs to configure the details of the Kafka cluster it's going to connect to. This will use a Java-style API, which I personally hate as a predominantly Scala programmer. Java folks might be much more comfortable with this.
xxxxxxxxxx
val builder: StreamsBuilder = new StreamsBuilder
val textLines: KStream[String, String] =
builder.stream[String, String]("TextLinesTopic")
Next, we read the records as key-value pairs from the topic that we want, using a builder pattern.
xxxxxxxxxx
val wordCounts: KTable[String, Long] = textLines
.flatMapValues(textLine => textLine.toLowerCase.split("\\W+"))
.groupBy((_, word) => word)
.count()(Materialized.as("word-counts-table"))
Then we will apply some functional programming operators on the stream, as if it were a collection, and we will turn that stream into a table. Kafka Streams has this notion of a table which allows for data aggregation and processing. Kafka has a stream-table duality which allows us to convert back and forth between them.
xxxxxxxxxx
wordCounts.toStream.to("WordsWithCountsTopic")
Speaking of conversion, we might want to convert this table to a stream and feed it into another topic that some other application might be interested in reading from.
xxxxxxxxxx
val streams: KafkaStreams = new KafkaStreams(builder.build(), props)
streams.start()
sys.ShutdownHookThread {
streams.close(10, TimeUnit.SECONDS)
}
And finally, we just need to start the streams and setup a graceful stop, because otherwise the streams are static and won't do anything.
Kafka Streams Strengths and Weaknesses
Akka Streams
How Akka Streams Looks Like
xxxxxxxxxx
val source1 = Source(List("Akka", "is", "awesome"))
val source2 = Source(List("learning", "Akka", "Streams"))
val sink = Sink.foreach[(String, Int)](println)
val graph = GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
•
val wordCounter = Flow[String]
.fold[Map[String, Int]](Map()) { (map, record) =>
map + (record -> (map.getOrElse(record, 0) + 1))
}
.flatMapConcat(m => Source(m.toList))
val merge = builder.add(Merge[String](2))
val counter = builder.add(wordCounter)
source1 ~> merge ~> counter ~> sink
source2 ~> merge
•
ClosedShape
}
RunnableGraph.fromGraph(graph).run()
So let's look at how we could build a word count application with Akka Streams. Even if you're experienced with Scala, this code might still be too concise. This is one of the drawbacks of Akka Streams. From my experience and from my students' experience, it's generally pretty tough on beginners regardless of your previous experience. But fear not, I'll break it down. Here are the main pieces of the code.
xxxxxxxxxx
val source1 = Source(List("Akka", "is", "awesome"))
val source2 = Source(List("learning", "Akka", "Streams"))
val sink = Sink.foreach[(String, Int)](println)
The first 3 lines build the original sources, which will emit the elements (in our case strings) asynchronously.
xxxxxxxxxx
val wordCounter = Flow[String]
.fold[Map[String, Int]](Map()) { (map, record) =>
map + (record -> (map.getOrElse(record, 0) + 1))
}
.flatMapConcat(m => Source(m.toList))
xxxxxxxxxx
val merge = builder.add(Merge[String](2))
val counter = builder.add(wordCounter)
source1 ~> merge ~> counter ~> sink
source2 ~> merge
However, that's not the interesting piece and the big strength of Akka Streams. The magic happens here, where we simply plug the different streaming components with their own logic.
xxxxxxxxxx
source1 ~> merge ~> counter ~> sink
source2 ~> merge
So notice we have a very similar representation of the streaming topology directly in the code! Some of you might go "meh", but it's hard to overstate how easy it is to construct arbitrary streaming layouts in just 3 lines of code, completely asynchronous, high speed and fault-tolerant.
Akka Streams Strengths and Weaknesses
Spark Streaming
How Spark Streaming Looks Like
xxxxxxxxxx
val spark = SparkSession.builder()
.appName("Word count")
.master("local[*]")
.getOrCreate()
•val streamingDF = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "your-kafka-broker:9092")
.option("subscribe", "myTopic")
.load()
•val wordCount = streamingDF
.selectExpr("cast(value as string) as word")
.groupBy("word")
.count()
•wordCount.writeStream
.format("console")
.outputMode("append")
.start()
.awaitTermination()
So let's go though the standard word counting application and in here we will use the high-level Structured Streaming API. Clean and separable. Let's break it down
xxxxxxxxxx
val spark = SparkSession.builder()
.appName("Word count")
.master("local[*]")
.getOrCreate()
The only boilerplate you'll need is to start a Spark Session. After that...
xxxxxxxxxx
val streamingDF = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "your-kafka-broker:9092")
.option("subscribe", "myTopic")
.load()
...you can read your data by specifying the data source to read from, and Spark Streaming naturally supports Kafka out of the box.
xxxxxxxxxx
val wordCount = streamingDF
.selectExpr("cast(value as string) as word")
.groupBy("word")
.count()
The actual logic is also plain and simple. In SQL, that's just a "group by" with a count, which we are doing here. Because Kafka stores data in binary, we have to add a cast at the beginning.
wordCount.writeStream
.format("console")
.outputMode("append")
.start()
.awaitTermination()
Finally, all you have to do is to point the stream to an output sink (where again we can use Kafka) and just start the streaming query.
Spark Streaming Strengths and Weaknesses
What to use When
Published at DZone with permission of Daniel Ciocirlan. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments