Platinum Partner
apache,server-side,enterprise integration,apache kafka,integration,open source

Developing Apache Kafka Producers and Consumers

 I gave a presentation recently on Real-time streaming and data pipelines with Apache Kafka.


A correction in the talk (~ 22 minutes in) : I said that you have to have all your topic data fit on one server.  That is not true, you can’t span logs so you have to have all of your data for a partition fit on one server.  Kafka will spread your partitions around for you within topics.


For that presentation I put together sample code for producing and consuming with an Apache Kafka broker using Scala.

To get up and running, use vagrant.

1) Install Vagrant http://www.vagrantup.com/
2) Install Virtual Box https://www.virtualbox.org/

git clone https://github.com/stealthly/scala-kafka
cd scala-kafka
vagrant up
./sbt test

Your entry point is the test file.


/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package ly.stealth.testing

import org.specs2.mutable._
import java.util.UUID
import kafka.consumer._
import kafka.producer._
import kafka.utils._
import kafka.akka._
import akka.actor.{Actor, Props, ActorSystem}
import akka.routing.RoundRobinRouter

class KafkaSpec extends Specification with Logging {

  "Simple Producer and Consumer" should {
    "send string to broker and consume that string back" in {
      val testMessage = UUID.randomUUID().toString
      val testTopic = UUID.randomUUID().toString
      val groupId_1 = UUID.randomUUID().toString

      var testStatus = false

      info("starting sample broker testing")
      val producer = new KafkaProducer(testTopic,"192.168.86.10:9092")
      producer.sendString(testMessage)

      val consumer = new KafkaConsumer(testTopic,groupId_1,"192.168.86.5:2181")

      def exec(binaryObject: Array[Byte]) = {
        val message = new String(binaryObject)
        info("testMessage = " + testMessage + " and consumed message = " + message)
        testMessage must_== message
        consumer.close()
        testStatus = true
      }

      info("KafkaSpec is waiting some seconds")
      consumer.read(exec)
      info("KafkaSpec consumed")

      testStatus must beTrue // we need to get to this point but a failure in exec will fail the test
    }

    "send string to broker and consume that string back in different consumer groups" in {
      val testMessage = UUID.randomUUID().toString
      val testTopic = UUID.randomUUID().toString
      val groupId_1 = UUID.randomUUID().toString
      val groupId_2 = UUID.randomUUID().toString

      var testStatus1 = false
      var testStatus2 = false

      info("starting sample broker testing")
      val producer = new KafkaProducer(testTopic,"192.168.86.10:9092")
      producer.sendString(testMessage)

      val consumer1 = new KafkaConsumer(testTopic,groupId_1,"192.168.86.5:2181")

      def exec1(binaryObject: Array[Byte]) = {
        val message1 = new String(binaryObject)
        info("testMessage 1 = " + testMessage + " and consumed message 1 = " + message1)
        testMessage must_== message1
        consumer1.close()
        testStatus1 = true
      }

      info("KafkaSpec : consumer 1 - is waiting some seconds")
      consumer1.read(exec1)
      info("KafkaSpec : consumer 1 - consumed")

      val consumer2 = new KafkaConsumer(testTopic,groupId_2,"192.168.86.5:2181")

      def exec2(binaryObject: Array[Byte]) = {
        val message2 = new String(binaryObject)
        info("testMessage 2 = " + testMessage + " and consumed message 2 = " + message2)
        testMessage must_== message2
        consumer2.close()
        testStatus2 = true
      }

      info("KafkaSpec : consumer 2 - is waiting some seconds")
      consumer2.read(exec2)
      info("KafkaSpec : consumer 2 - consumed")

      testStatus2 must beTrue // we need to get to this point but a failure in exec will fail the test
    }
  }

  "Akka Producer and Consumer" should {
    "send string to broker and consume that string back in different consumer groups" in {
      val testMessage = UUID.randomUUID().toString
      val testTopic = UUID.randomUUID().toString
      val groupId_1 = UUID.randomUUID().toString
      val groupId_2 = UUID.randomUUID().toString

      var testStatus1 = false
      var testStatus2 = false

      info("starting akka producertesting")
      val system = ActorSystem("testing")

      val actorCount = 1

      val producer = system.actorOf(Props[KafkaAkkaProducer].withRouter(RoundRobinRouter(actorCount)), "router")

      1 to actorCount foreach { i =>(
        producer ! (testTopic,"192.168.86.10:9092"))
      }

      producer ! testMessage

      val consumer1 = new KafkaConsumer(testTopic,groupId_1,"192.168.86.5:2181")

      def exec1(binaryObject: Array[Byte]) = {
        val message1 = new String(binaryObject)
        info("testMessage 1 = " + testMessage + " and consumed message 1 = " + message1)
        testMessage must_== message1
        consumer1.close()
        testStatus1 = true
      }

      info("KafkaSpec : consumer 1 - is waiting some seconds")
      consumer1.read(exec1)
      info("KafkaSpec : consumer 1 - consumed")

      val consumer2 = new KafkaConsumer(testTopic,groupId_2,"192.168.86.5:2181")

      def exec2(binaryObject: Array[Byte]) = {
        val message2 = new String(binaryObject)
        info("testMessage 2 = " + testMessage + " and consumed message 2 = " + message2)
        testMessage must_== message2
        consumer2.close()
        testStatus2 = true
      }

      info("KafkaSpec : consumer 2 - is waiting some seconds")
      consumer2.read(exec2)
      info("KafkaSpec : consumer 2 - consumed")

      testStatus2 must beTrue // we need to get to this point but a failure in exec will fail the test
    }
  }
}


On the producer side I have started to look more into using Akka. The prototype for this implementation is in the test case above “Akka Producer and Consumer” and broken out here.

  "Akka Producer and Consumer" should {
    "send string to broker and consume that string back in different consumer groups" in {
      val testMessage = UUID.randomUUID().toString
      val testTopic = UUID.randomUUID().toString
      val groupId_1 = UUID.randomUUID().toString
      val groupId_2 = UUID.randomUUID().toString

      var testStatus1 = false
      var testStatus2 = false

      info("starting akka producertesting")
      val system = ActorSystem("testing")

      val actorCount = 1

      val producer = system.actorOf(Props[KafkaAkkaProducer].withRouter(RoundRobinRouter(actorCount)), "router")

      1 to actorCount foreach { i =>(
        producer ! (testTopic,"192.168.86.10:9092"))
      }

      producer ! testMessage

      val consumer1 = new KafkaConsumer(testTopic,groupId_1,"192.168.86.5:2181")

      def exec1(binaryObject: Array[Byte]) = {
        val message1 = new String(binaryObject)
        info("testMessage 1 = " + testMessage + " and consumed message 1 = " + message1)
        testMessage must_== message1
        consumer1.close()
        testStatus1 = true
      }

      info("KafkaSpec : consumer 1 - is waiting some seconds")
      consumer1.read(exec1)
      info("KafkaSpec : consumer 1 - consumed")

      val consumer2 = new KafkaConsumer(testTopic,groupId_2,"192.168.86.5:2181")

      def exec2(binaryObject: Array[Byte]) = {
        val message2 = new String(binaryObject)
        info("testMessage 2 = " + testMessage + " and consumed message 2 = " + message2)
        testMessage must_== message2
        consumer2.close()
        testStatus2 = true
      }

      info("KafkaSpec : consumer 2 - is waiting some seconds")
      consumer2.read(exec2)
      info("KafkaSpec : consumer 2 - consumed")

      testStatus2 must beTrue // we need to get to this point but a failure in exec will fail the test
    }
  }
}


What is really nice is you can up that actorCount and really start to run test data to analyze.

/*******************************************
Joe Stein
Founder, Principal Consultant
Big Data Open Source Security LLC
http://www.stealth.ly
Twitter: @allthingshadoop
********************************************/

Published at DZone with permission of {{ articles[0].authors[0].realName }}, DZone MVB. (source)

Opinions expressed by DZone contributors are their own.

{{ tag }}, {{tag}},

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}
{{ parent.authors[0].realName || parent.author}}

{{ parent.authors[0].tagline || parent.tagline }}

{{ parent.views }} ViewsClicks
Tweet

{{parent.nComments}}