Comparison API for Apache Kafka
Learn about a variety of use cases for Kafka and Kafka's API — from from consuming and writing data to streams to more reactive approaches with Akka.
Join the DZone community and get the full member experience.
Join For FreeWith the demand for processing large amounts of data, Apache Kafka is a standard message queue in the big data world. Apache Kafka is publish-subscribe-messaging rethought as a distributed, partitioned, replicated, commit log service, and it has a lot of convenient APIs for many languages.
In this article, I would like to share my experience with leveraging Kafka's API for multiple purposes — from consuming and writing data to streams to a more reactive approach with Akka. In this tutorial, all examples are written in Scala. If you use another programming language, you can easy remake code from Scala.
First of all, you need to install Kafka. For this, I use a Docker image:
docker run --rm -it \
-p 2181:2181 -p 3030:3030 -p 7081:8081 \
-p 7082:8082 -p 7083:8083 -p 9092:9092 \
-e ADV_HOST=127.0.0.1 \
landoop/fast-data-dev
Of course, you can use another image or launch Kafka manually; it's up to you.
Integrating Spark Streaming and Kafka is incredibly easy. Your middleware, backend (proxy-like), or IoT devices can send millions of records per second to Kafka while it effectively handling them. Spark Streaming provides simple parallelism, 1:1 correspondence between Kafka partitions and Spark partitions, and access to offsets and metadata. Primarily, we need to set up Kafka's parameters to Spark — like a host, port, offset committing strategy, etc.
def kafkaParams = Map[String, Object](
"bootstrap.servers" -> "127.0.0.1:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "mygroup1",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
After setting the necessary configurations, we can deal with the direct stream. All logic with creating streams is located in the KafkaUtils
class:
val topics = Array("sample_topic")
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent, //It will consistently distribute partitions across all executors.
Subscribe[String, String](topics, kafkaParams)
)
stream.map(record => (record.key, record.value)).print()
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition { _ =>
val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
}
Note: The code above uses the Spark Streaming API, which we will discuss below.
Spark operates with RDD (the basic abstraction in Spark; represents an immutable, partitioned collection of elements that can be operated in parallel). All RDD in a specific batch (represented by parameter heartbeat.interval.ms
) can be manipulated with the foreachRDD
method. In this example, we simply print names of the topic, offset, and partition. You can also use more complicated logic, like retrieving data from a stream of tweets.
Kafka provides three different ways to warranty fault tolerance behavior. The first is checkpointing. Spark specification says:
"A streaming application must operate 24/7 and hence must be resilient to failures unrelated to the application logic (e.g., system failures, JVM crashes, etc.). For this to be possible, Spark Streaming needs to checkpoint enough information to a fault- tolerant storage system such that it can recover from failures. There are two types of data that are checkpointed."
In code, it looks as follows:
def kafkaStream010Checkpointing() =
launchWithCheckpointing(kafkaStreaming010,
appName = "Kafka010_DirectStream",
checkpointPath = "checkpointing")
I call the second strategy the Kafka itself strategy. Kafka has an offset commit API that stores offsets in a special Kafka topic. By default, the new consumer will periodically auto-commit offsets. After output from Kafka is consumed by the streaming, you can commit offsets to Kafka using the commitAsync
API. Kafka is not transactional, so your outputs must still be idempotent. In code it looks as follows:
def storingOffsetsItself(stream: InputDStream[ConsumerRecord[String, String]]) = {
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
}
And the last strategy is using its own data store. Yes, you can use storage like RDBMS or ZooKeeper for storing offsets — this is a very popular solution. It gives the equivalent of exactly-once semantics. Applying this strategy is especially useful in situations when it's hard to make idempotent logic with complicated aggregation:
val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
new TopicPartition(resultSet.string("topic"),
resultSet.int("partition")) -> resultSet.long("offset")
}.toMap
If you want to read data between certain offsets, you can simply obtain RDDs that represent content in this range in the topic:
val offsetRanges = Array(
// topic, partition, inclusive starting offset, exclusive ending offset
OffsetRange("sample_topic", 0, 10, 20),
OffsetRange("sample_topic", 1, 10, 20)
)
val params = new ju.HashMap[String, Object](kafkaParams)
val kafkaRDD = KafkaUtils.createRDD[String, String](sparkContext,
params ,
offsetRanges,
PreferConsistent)
println(kafkaRDD.map(_.value()).first())
Pay the attention to the different ways to acquire createStream
and createDirectStream
. You can read more about the differences between them here. This is an important concept; you must distinguish use cases for them.
Also, Kafka provides seamless integration with binary protocols like Avro and Protobuff. Integration of Apache Spark with Kafka and Avro can be organized in a separate module, so include it as on-demand (usage of Tweeter's bijection simplifies code with transforming):
@transient lazy implicit private val avroSpecificInjection = SpecificAvroCodecs.toBinary[A]
def decodeValue(payload: KafkaPayload): Option[A] = {
val decodedTry = Injection.invert[A, Array[Byte]](payload.value)
decodedTry match {
case Success(record) =>
Some(record)
case Failure(ex) =>
logger.warn("Could not decode payload", ex)
None
}
}
def encodeValue(value: A): KafkaPayload = {
val encoded = Injection[A, Array[Byte]](value)
KafkaPayload(None, encoded)
}
Spark Structured Streaming API
Spark structured streaming is one of the most exciting ideas presented in Apache Spark. It's the next step in the process of developing Spark Streaming. Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. Structured streaming gives the ability to build ETL pipelines in a very clear way. As a result, the code is similar to the Java 8 Stream API, so if you don't know Scala but know Java, it will not be difficult for you to understand what is happening:
val sparkSession = SparkSession
.builder
.master("local")
.appName("kafka")
.getOrCreate()
sparkSession.sparkContext.setLogLevel("ERROR")
import sparkSession.implicits._
val kafkaDF = sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "127.0.0.1:9092")
.option("subscribe", "structured_topic")
.load()
val data: Dataset[(String, String)] = kafkaDF.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
kafkaDF.printSchema()
data.writeStream
.outputMode("append")
.format("console")
.start()
.awaitTermination()
Writing Data to Kafka Stream
There is a third-party library for this not-so-standard task. This tool provides simple API for writing data to the stream. The next example shows how to read data from the socket and write it to the stream:
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9087)
lines.writeToKafka(numbersProducerConfig,
s => new ProducerRecord[String, String](topic, "key " + s , s.toString)
)
ssc.start()
ssc.awaitTermination()
Akka Streams
Akka Streams Kafka, also known as Reactive Kafka, is an Akka Streams connector to Apache Kafka. Akka Streams allows you to write data to Kafka topics via a Sink API:
val done = Source(50 to 60)
.map(_.toString)
.map { elem =>
println(s"PlainSinkProducer produce: $elem")
new ProducerRecord[Array[Byte], String](topic, elem)
}.runWith(Producer.plainSink(producerSettings))
And exactly the same via a Flow API:
val done = Source(100 to 111)
.map { n =>
val partition = 1
ProducerMessage.Message(new ProducerRecord[Array[Byte], String](
topic , partition, null, n.toString
), n)
}
.via(Producer.flow(producerSettings))
.map { result =>
val record = result.message.record
println(s"${record.topic}/${record.partition} ${result.offset}: ${record.value}" +
s"(${result.message.passThrough})")
result
}
.runWith(Sink.ignore)
Consuming data with Akka Streams is very clear, you can build sophisticated data flows with Graph DSL where Kafka will part of it:
val done = Consumer.committableSource(consumerSettings, Subscriptions.topics(topic1))
.map { msg =>
println(s"topic1 -> topic2: $msg")
ProducerMessage.Message(new ProducerRecord[Array[Byte], String](
topic2,
msg.record.value
), msg.committableOffset)
}
.via(Producer.flow(producerSettings))
.mapAsync(producerSettings.parallelism) { result =>
result.message.passThrough.commitScaladsl()
}
.runWith(Sink.ignore)
Like in the example with Apache Spark, you can save offset in a database or in ZooKeeper:
private val offset = new AtomicLong(2)
def save(record: ConsumerRecord[Array[Byte], String]): Future[Done] = {
println(s"DB.save: ${record.value}")
offset.set(record.offset)
Future.successful(Done)
}
def loadOffset(): Future[Long] =
Future.successful(offset.get)
def update(data: String): Future[Done] = {
println(s"DB.update: $data")
Future.successful(Done)
}
Akka Actors
Akka gives you the opportunity to make logic for producing/consuming messages from Kafka with the Actor model. It's very convenient if actors are widely used in your code and it significantly simplifies making data pipelines with actors. For example, you have your Akka Cluster, one part of which allows you to crawl of web pages and the other part of which makes it possible to index and send indexed data to Kafka. The consumer can aggregate this logic. Producing data to Kafka looks as follows:
def actorProducer() = {
val system = ActorSystem()
val producer = system.actorOf(KafkaProducerActor.props(kafkaProducerConf))
val batch: Seq[ProducerRecord[String, String]] = Seq(
KafkaProducerRecord(topic, "foo"),
KafkaProducerRecord(topic, "key", "value"),
KafkaProducerRecord(topic, "bar")
)
val message = ProducerRecords(batch)
producer ! message
}
Consuming messages is obvious — you set a supervisor strategy for handling messages and write the logic for incoming record in the receive
method:
class ConsumerRecovery(kafkaConfig: KafkaConsumer.Conf[String, String],
actorConfig: KafkaConsumerActor.Conf) extends Actor with ActorLogging {
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10) {
case _: KafkaConsumerActor.ConsumerException =>
log.info("Consumer exception caught. Restarting consumer.")
SupervisorStrategy.Restart
case _ =>
SupervisorStrategy.Escalate
}
val recordsExt: Extractor[Any, ConsumerRecords[String, String]] = ConsumerRecords.extractor[String, String]
val consumer: ActorRef = context.actorOf(
KafkaConsumerActor.props(kafkaConfig, actorConfig, self)
)
consumer ! Subscribe.AutoPartition(List(topic))
override def receive: Receive = {
// Consume from Kafka
case recordsExt(records) =>
processRecords(records.pairs)
sender() ! Confirm(records.offsets, commit = true)
}
private def processRecords(records: Seq[(Option[String], String)]) =
records.foreach { case (key, value) =>
log.info(s"Received [$key,$value]")
}
}
Since version 0.10 Kafka supports SSL/TLS, I strongly recommend you to use encryption everywhere in a production environment. The configuration of keys and certificates in multiple locations is a routine task, so I collected all necessary scripts and configuration for this task here.
That's all. You can find a full listing of the source code on my GitHub repository.
Opinions expressed by DZone contributors are their own.
Comments