Developing Apache Kafka Producers and Consumers
Join the DZone community and get the full member experience.
Join For FreeI gave a presentation recently on Real-time streaming and data pipelines with Apache Kafka.
A correction in the talk (~ 22 minutes in) : I said that you have to have all your topic data fit on one server. That is not true, you can’t span logs so you have to have all of your data for a partition fit on one server. Kafka will spread your partitions around for you within topics.
For that presentation I put together sample code for producing and consuming with an Apache Kafka broker using Scala.
To get up and running, use vagrant.
1) Install Vagrant http://www.vagrantup.com/
2) Install Virtual Box https://www.virtualbox.org/
git clone https://github.com/stealthly/scala-kafka cd scala-kafka vagrant up ./sbt test
Your entry point is the test file.
/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package ly.stealth.testing import org.specs2.mutable._ import java.util.UUID import kafka.consumer._ import kafka.producer._ import kafka.utils._ import kafka.akka._ import akka.actor.{Actor, Props, ActorSystem} import akka.routing.RoundRobinRouter class KafkaSpec extends Specification with Logging { "Simple Producer and Consumer" should { "send string to broker and consume that string back" in { val testMessage = UUID.randomUUID().toString val testTopic = UUID.randomUUID().toString val groupId_1 = UUID.randomUUID().toString var testStatus = false info("starting sample broker testing") val producer = new KafkaProducer(testTopic,"192.168.86.10:9092") producer.sendString(testMessage) val consumer = new KafkaConsumer(testTopic,groupId_1,"192.168.86.5:2181") def exec(binaryObject: Array[Byte]) = { val message = new String(binaryObject) info("testMessage = " + testMessage + " and consumed message = " + message) testMessage must_== message consumer.close() testStatus = true } info("KafkaSpec is waiting some seconds") consumer.read(exec) info("KafkaSpec consumed") testStatus must beTrue // we need to get to this point but a failure in exec will fail the test } "send string to broker and consume that string back in different consumer groups" in { val testMessage = UUID.randomUUID().toString val testTopic = UUID.randomUUID().toString val groupId_1 = UUID.randomUUID().toString val groupId_2 = UUID.randomUUID().toString var testStatus1 = false var testStatus2 = false info("starting sample broker testing") val producer = new KafkaProducer(testTopic,"192.168.86.10:9092") producer.sendString(testMessage) val consumer1 = new KafkaConsumer(testTopic,groupId_1,"192.168.86.5:2181") def exec1(binaryObject: Array[Byte]) = { val message1 = new String(binaryObject) info("testMessage 1 = " + testMessage + " and consumed message 1 = " + message1) testMessage must_== message1 consumer1.close() testStatus1 = true } info("KafkaSpec : consumer 1 - is waiting some seconds") consumer1.read(exec1) info("KafkaSpec : consumer 1 - consumed") val consumer2 = new KafkaConsumer(testTopic,groupId_2,"192.168.86.5:2181") def exec2(binaryObject: Array[Byte]) = { val message2 = new String(binaryObject) info("testMessage 2 = " + testMessage + " and consumed message 2 = " + message2) testMessage must_== message2 consumer2.close() testStatus2 = true } info("KafkaSpec : consumer 2 - is waiting some seconds") consumer2.read(exec2) info("KafkaSpec : consumer 2 - consumed") testStatus2 must beTrue // we need to get to this point but a failure in exec will fail the test } } "Akka Producer and Consumer" should { "send string to broker and consume that string back in different consumer groups" in { val testMessage = UUID.randomUUID().toString val testTopic = UUID.randomUUID().toString val groupId_1 = UUID.randomUUID().toString val groupId_2 = UUID.randomUUID().toString var testStatus1 = false var testStatus2 = false info("starting akka producertesting") val system = ActorSystem("testing") val actorCount = 1 val producer = system.actorOf(Props[KafkaAkkaProducer].withRouter(RoundRobinRouter(actorCount)), "router") 1 to actorCount foreach { i =>( producer ! (testTopic,"192.168.86.10:9092")) } producer ! testMessage val consumer1 = new KafkaConsumer(testTopic,groupId_1,"192.168.86.5:2181") def exec1(binaryObject: Array[Byte]) = { val message1 = new String(binaryObject) info("testMessage 1 = " + testMessage + " and consumed message 1 = " + message1) testMessage must_== message1 consumer1.close() testStatus1 = true } info("KafkaSpec : consumer 1 - is waiting some seconds") consumer1.read(exec1) info("KafkaSpec : consumer 1 - consumed") val consumer2 = new KafkaConsumer(testTopic,groupId_2,"192.168.86.5:2181") def exec2(binaryObject: Array[Byte]) = { val message2 = new String(binaryObject) info("testMessage 2 = " + testMessage + " and consumed message 2 = " + message2) testMessage must_== message2 consumer2.close() testStatus2 = true } info("KafkaSpec : consumer 2 - is waiting some seconds") consumer2.read(exec2) info("KafkaSpec : consumer 2 - consumed") testStatus2 must beTrue // we need to get to this point but a failure in exec will fail the test } } }
On the producer side I have started to look more into using Akka. The prototype for this implementation is in the test case above “Akka Producer and Consumer” and broken out here.
"Akka Producer and Consumer" should { "send string to broker and consume that string back in different consumer groups" in { val testMessage = UUID.randomUUID().toString val testTopic = UUID.randomUUID().toString val groupId_1 = UUID.randomUUID().toString val groupId_2 = UUID.randomUUID().toString var testStatus1 = false var testStatus2 = false info("starting akka producertesting") val system = ActorSystem("testing") val actorCount = 1 val producer = system.actorOf(Props[KafkaAkkaProducer].withRouter(RoundRobinRouter(actorCount)), "router") 1 to actorCount foreach { i =>( producer ! (testTopic,"192.168.86.10:9092")) } producer ! testMessage val consumer1 = new KafkaConsumer(testTopic,groupId_1,"192.168.86.5:2181") def exec1(binaryObject: Array[Byte]) = { val message1 = new String(binaryObject) info("testMessage 1 = " + testMessage + " and consumed message 1 = " + message1) testMessage must_== message1 consumer1.close() testStatus1 = true } info("KafkaSpec : consumer 1 - is waiting some seconds") consumer1.read(exec1) info("KafkaSpec : consumer 1 - consumed") val consumer2 = new KafkaConsumer(testTopic,groupId_2,"192.168.86.5:2181") def exec2(binaryObject: Array[Byte]) = { val message2 = new String(binaryObject) info("testMessage 2 = " + testMessage + " and consumed message 2 = " + message2) testMessage must_== message2 consumer2.close() testStatus2 = true } info("KafkaSpec : consumer 2 - is waiting some seconds") consumer2.read(exec2) info("KafkaSpec : consumer 2 - consumed") testStatus2 must beTrue // we need to get to this point but a failure in exec will fail the test } } }
What is really nice is you can up that actorCount and really start to run test data to analyze.
/*******************************************
Joe Stein
Founder, Principal Consultant
Big Data Open Source Security LLC
http://www.stealth.ly
Twitter: @allthingshadoop
********************************************/
Published at DZone with permission of Joe Stein, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments