How to Stream Records to Kafka With Akka Streams and Alpakka
Welcome to Kafka streaming, with a helping hand from Akka Streams and the open source Alpakka.
Join the DZone community and get the full member experience.
Join For FreeThis blog will show you how records can be streamed to Kafka using Akka Streams with Alpakka. Alpakka is an open source project that provides a number of connectors, and in this blog, we will use the Alpakka connector for Kafka.
Before digging into it further, you can read more about Kafka here.
This blog will cover the producer stream only. Prior to starting the implementation, let's take a quick look at the codebase. First, we will add the library dependency of the Alpakka Kafka connector to the build.sbt, then we'll create the producer stream. The producer is responsible for producing (writing/publishing) data to Kafka.
The prerequisites for this project are:
- Scala Version: 2.12.8
- Bootstrap servers of the Kafka cluster
- Alpakka Kafka 1.0-RC2
Let's begin!
- Add your library dependencies:
libraryDependencies += "com.typesafe.akka" %% "akka-stream-kafka" % "1.0-RC1"
- To create a producer stream, you need to define a ProducerSetting, which creates a setting from the configuration and consists of the Bootstrap server of a Kafka Cluster key and value serializer, which must be passed explicitly. Here, Kafka is running on localhost:9092. You may also pass a list of comma-separated host port pairs.
val producerSettings = ProducerSettings(config, new StringSerializer, new StringSerializer) .withBootstrapServers("http://localhost:9092")
- Now we need to create a sink to write records to Kafka topics via Producer.plainSink. That will publish the records to our Kafka topic and take ProducerRecords, which consists of the Kafka topic for which the record needs to be sent and the value, i.e record content:
object KafkaProducer extends App { implicit val system = ActorSystem("QuickStart") implicit val materializer = ActorMaterializer() val config = ConfigFactory.load.getConfig("akka.kafka.producer") val producerSettings = ProducerSettings(config, new StringSerializer, new StringSerializer) .withBootstrapServers("http://localhost:9092") val producerSink: Future[Done] = Source(1 to 10) .map(_.toString) .map(value => new ProducerRecord[String, String]("test", "Record:: " + value)) .runWith(Producer.plainSink(producerSettings)) println("************ Message produced ************") }
4. Now to run and test the Kafka producer, please follow the steps below:
- Start zookeeper:
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
- Start Kafka:
.\bin\windows\kafka-server-start.bat .\config\server.properties
- After creating the topic, start the consumer to check the records:
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092
--topic test --from-beginning
Thanks!
Reference
Published at DZone with permission of Teena Vashist. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments