Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Deploying Kafka-Dependent Scala Microservices With Docker

DZone's Guide to

Deploying Kafka-Dependent Scala Microservices With Docker

Learn how to use Docker to deploy Scala microservices in this tutorial, which also shows how to create a Kafka producer service.

Free Resource

The Integration Zone is brought to you in partnership with Cloud Elements. What's below the surface of an API integration? Download The Definitive Guide to API Integrations to start building an API strategy.

In this post we will see how to use docker-compose and sbt-docker to deploy Scala microservices. We will create services to (1) get tweets using the Twitter streaming API and put them in Kafka, (2) read from Kafka and get the number of hashtags in each tweet, and (3) see how to use sbt-docker plugin to create separate Docker images for our services. Finally, we will use docker-compose to define the environment for our services and run them.

The full code and instruction to run example is available at https://github.com/saumitras/kafka-twitter-docker.

Our application will consist of the following services:

  1. Kafka and Zookeeper (the Kafka service depends on Zookeeper)

  2. Tweet producer (depends on the Kafka service)

  3. Tweet consumer (depends on the Kafka service)

Here are the high-level steps:

  1. Create a Kafka producer service to get tweets using the Twitter streaming API.

  2. Create a Kafka consumer service to read tweets from Kafka and get the count of tags.

  3. Use sbt-docker to package and create images for these services.

  4. Create a docker-compose.yml file to define the services and use docker-compose to start a multi-container environment.

  5. Run the services.

Set Up the Twitter Stream

To stream data from Twitter, you need the access keys and token. You can go to https://apps.twitter.com and create a new app to get these. After creating an app, click on the “Keys and access token” section and copy the following:

  • Consumer Key (API Key)

  • Consumer Secret (API Secret)

  • Access Token

  • Access Token Secret

Once you have the keys, you can create a StatusListener provided by the twitter4j library:

val KEYWORDS = List("#scala", "#kafka", "#cassandra", "#solr", "#apachespark", "#fastdata", "#bigdata")

def startTweetStream() = {
  val cb = new ConfigurationBuilder()
  cb.setDebugEnabled(true)
  cb.setOAuthConsumerKey("p5vABCjRWWSXNBkypnb8ZnSzk")  //replace this with your own keys
  cb.setOAuthConsumerSecret("wCVFIpwWxEyOcM9lrHa9TYExbNsLGvEUgJucePPjcTx83bD1Gt")  //replace this with your own keys
  cb.setOAuthAccessToken("487652626-kDOFZLu8bDjFyCKUOCDa7FtHsr22WC3PMH4iuNtn")  //replace this with your own keys
  cb.setOAuthAccessTokenSecret("4W3LaQTAgGoW5SsHUAgp6gK9b5AKgl8hRcFnNYgvPTylU")  //replace this with your own keys

  val stream = new TwitterStreamFactory(cb.build()).getInstance()

  val listener = new StatusListener {

    override def onTrackLimitationNotice(i: Int) = println(s"Track limited $i tweets")
    override def onStallWarning(stallWarning: StallWarning) = println("Stream stalled")
    override def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) = println("Status ${statusDeletionNotice.getStatusId} deleted")
    override def onScrubGeo(l: Long, l1: Long) = println(s"Geo info scrubbed. userId:$l, upToStatusId:$l1")
    override def onException(e: Exception) = println("Exception occurred. " + e.getMessage)

    override def onStatus(status: Status): Unit = {
      val tweet = status.getText
      println("[Producer] " + tweet)

    }

  }

  stream.addListener(listener)
  val fq = new FilterQuery()
  fq.track(KEYWORDS.mkString(","))
  stream.filter(fq)

}

You will now start seeing tweets being printed on your console. In the next step, instead of printing, we will send them to Kafka under the tweets topic.

Create Kafka Producer Service

import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import twitter4j._
import twitter4j.conf.ConfigurationBuilder

object TweetProducer extends App {

  val BROKER_LIST = "localhost:9092"

  val KEYWORDS = List("#scala", "#kafka", "#cassandra", "#solr", "#apachespark", "#fastdata", "#bigdata")
  val TOPIC = "tweets"

  val props = new Properties()
  props.put("bootstrap.servers", BROKER_LIST)
  props.put("client.id", "KafkaTweetProducer")
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer")

  val producer = new KafkaProducer[String, String](props)

  def startTweetStream() = {
      ...

      override def onStatus(status: Status): Unit = {
        val tweet = status.getText
        val data = new ProducerRecord[String, String](TOPIC, tweet)
        producer.send(data)
      }
      ...

  }

  startTweetStream()

}

This completes our tweet-producer service. Next, we will define a consumer service to read data from tweets topics and get a hashtag count of each tweet.

Create Kafka Consumer Service

import java.util.Properties

import kafka.serializer.DefaultDecoder
import kafka.consumer.{Consumer, ConsumerConfig, Whitelist}

object TweetConsumer extends App {

  val ZK_HOST = "zookeeper:2181"

  val TOPIC = "tweets"

  private val props = new Properties()
  props.put("group.id", "tweet-consumer")
  props.put("zookeeper.connect", ZK_HOST)
  props.put("auto.offset.reset", "smallest")
  props.put("consumer.timeout.ms", "120000")
  props.put("zookeeper.connection.timeout.ms","20000")
  props.put("auto.commit.interval.ms", "10000")

  private val consumerConfig = new ConsumerConfig(props)
  private val consumerConnector = Consumer.create(consumerConfig)
  private val filterSpec = new Whitelist(TOPIC)

  def read() = try {
    val streams = consumerConnector.createMessageStreamsByFilter(filterSpec, 1,
      new DefaultDecoder(), new DefaultDecoder())(0)

    lazy val iterator = streams.iterator()

    while (iterator.hasNext()) {
      val tweet = iterator.next().message().map(_.toChar).mkString
      val numTags = tweet.count(_ == '#')
      println(s"[Consumer] [TagCount=$numTags] $tweet")
    }

  } catch {
    case ex: Exception =>
      ex.printStackTrace()
  }

  read()

}

This completes out twitter-consumer service. Next, we need to package our services as standalone jars and create separate Docker images for both. We will use the sbt-docker plugin for this. But before we do that, if you want to use sbt-docker as a non-root user, then you need to configure Docker.

Running Docker Commands as a Non-Root User

The Docker daemon always runs as the root user. The Docker daemon binds to a Unix socket instead of a TCP port. By default, that Unix socket is owned by the user root, and so, by default, you can only access it with sudo. Since we want to be able to package our application as a non-root user, so we need to make sure that sbt-docker can access the socket in non-root. Any Unix user belonging to the group docker can read/write that socket, so you need to add your user to the Docker group.

To add your user (who has root privileges) to the Docker group, run the following command:

sudo usermod -aG docker <username>
newgrp docker

Verify that you can use it as a non-root user by running docker ps.

sbt-docker Plugin

Now we will create images for our producer and consumer services using sbt-docker.

Add sbt-docker as a dependency in project/plugins.sbt:

addSbtPlugin("se.marcuslonnberg" % "sbt-docker" % "1.5.0")

We start by defining Docker settings in build.sbt:

def dockerSettings(debugPort: Option[Int] = None) = Seq(

  dockerfile in docker := {
    val artifactSource: File = assembly.value
    val artifactTargetPath = s"/project/${artifactSource.name}"
    val scriptSourceDir = baseDirectory.value / "../scripts"
    val projectDir = "/project/"

    new Dockerfile {
      from("saumitras01/java:1.8.0_111")
      add(artifactSource, artifactTargetPath)
      copy(scriptSourceDir, projectDir)
      entryPoint(s"/project/start.sh")
      cmd(projectDir, s"${name.value}", s"${version.value}")
    }
  },
  imageNames in docker := Seq(
    ImageName(s"saumitras01/${name.value}:latest")
  )
)

lazy val producer = (project in file("tweet-producer"))
  .enablePlugins(sbtdocker.DockerPlugin)
  .settings(
    libraryDependencies ++= Seq(
      "org.twitter4j" % "twitter4j-core" % "4.0.4",
      "org.twitter4j" % "twitter4j-stream" % "4.0.4",
      "org.apache.kafka" % "kafka_2.11" % "https://www.linkedin.com/redir/invalid-link-page?url=0%2e10%2e0%2e0" withSources() exclude("org.slf4j","slf4j-log4j12") exclude("javax.jms", "jms") exclude("com.sun.jdmk", "jmxtools") exclude("com.sun.jmx", "jmxri")
    ),
    dockerSettings()
  )

lazy val consumer = (project in file("tweet-consumer"))
  .enablePlugins(sbtdocker.DockerPlugin)
  .settings(
    libraryDependencies += "org.apache.kafka" % "kafka_2.11" % "https://www.linkedin.com/redir/invalid-link-page?url=0%2e10%2e0%2e0" withSources() exclude("org.slf4j","slf4j-log4j12") exclude("javax.jms", "jms") exclude("com.sun.jdmk", "jmxtools") exclude("com.sun.jmx", "jmxri"),
    dockerSettings()
  )

Here’s what it means:

  1. Package the application using sbt-assembly to get a standalone jar for producer and consumer project as artifactTargetPath.
  2. Create a new Docker image using saumitras01/java:1.8.0_111 as the base image.
  3. Copy the jars in /project dir of the image.
  4. Copy scripts (to start the services) from your local ../script dir to the image’s /project/scripts/ dir.
  5. Execute /project/start.sh as soon as the container is started for each service. This script will cause your container to wait until Kafka is up. It will then start the consumer/producer service.
  6. Create the image as saumitras01/saumitras01/tweet-producer-app and saumitras01/saumitras01/tweet-consumer-app and create a "latest" tag for each of them.

We then use these setting while building both the producer and consumer project. You are now ready to create the images for your services. Run sbt docker:

sam@sam-ub:kafkatwitterdocker$ sbt
[info] Loading project definition from /home/sam/projects/kafkatwitterdocker/project
[info] Set current project to KafkaTwitterDockerExample (in build file:/home/sam/projects/kafkatwitterdocker/)
> docker

You will see something like below for both the tweet-producer and tweet-consumer projects:

[info] Step 1 : FROM saumitras01/java:1.8.0_111
[info]  ---> d53c800d525c
[info] Step 2 : ADD 0/tweet-consumer-app-assembly-1.0.jar /project/tweet-consumer-app-assembly-1.0.jar
[info]  ---> db7146cacb9d
[info] Removing intermediate container 81881e37217f
[info] Step 3 : COPY 1/scripts /project/
[info]  ---> d2969bcdbfc1
[info] Removing intermediate container f7ab67074a2e
[info] Step 4 : ENTRYPOINT /project/start.sh
[info]  ---> Running in 34eb4d7d282c
[info]  ---> bee7490cbfb0
[info] Removing intermediate container 34eb4d7d282c
[info] Step 5 : CMD /project/ tweet-consumer-app 1.0
[info]  ---> Running in 7d16d4e7ef45
[info]  ---> fd8af3ffe7c1
[info] Removing intermediate container 7d16d4e7ef45
[info] Successfully built fd8af3ffe7c1
[info] Tagging image fd8af3ffe7c1 with name: saumitras01/tweet-consumer-app:latest
[success] Total time: 44 s, completed 4 Nov, 2017 11:33:14 PM

Once its done, if you run the docker images command, you will see your images listed:

sam@sam-ub:kafkatwitterdocker$ docker images | grep saumitras01

saumitras01/tweet-consumer-app   latest              fd8af3ffe7c1        2 days ago           659.9 MB
saumitras01/tweet-producer-app   latest              4b87ea6e633c        2 days ago           660.3 MB
saumitras01/java                 1.8.0_111           d53c800d525c        2 days ago           643.2 MB

Docker Compose

Now that we have all the images available, we need to create a docker compose config file to define all the services, dependencies, and ports.

Before we can start a producer or consumer container, we need to make sure that Kafka is running. Kafka depends on the Zookeeper service. Let's start by defining config for the Zookeeper service:

zookeeper:
  image: saumitras01/zookeeper:v3.4.9
  ports:
    - "2181:2181"

This will pull tag v3.4.9 of saumitras01/zookeeper repo and start a container and exposes its 2181 port as zookeeper:2181.

Next, we define config for the Kafka service:

kafka:image: saumitras01/kafka:latest
  ports:
    - "9092:9092"
  depends_on:
    - zookeeper
  environment:
    - KAFKA_ADVERTISED_HOST_NAME: kafka
    - KAFKA_ADVERTISED_PORT: "9092"
    - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  volumes:
    - /var/run/docker.sock:/var/run/docker.sock

This will pull the tag latest from the saumitras01/kafka repo and start Kafka on port 9092 as kafka:9092.

Next, we define the consumer and producer services. Here’s the complete content of docker-compose.yml to start one producer and one consumer:

services:

  zookeeper:
    image: saumitras01/zookeeper:v3.4.9ports:
      - "2181:2181"

  kafka:
    image: saumitras01/kafka:latest
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka
      KAFKA_ADVERTISED_PORT: "9092"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
        - /var/run/docker.sock:/var/run/docker.sock

  tweet-producer:
    image: saumitras01/tweet-producer-app:latest
    ports:
      - "8080:8080"tty: true
    depends_on:
      - kafka

  tweet-consumer:
    image: saumitras01/tweet-consumer-app:latest
    ports:
      - "8081:8081"tty: true
    depends_on:
      - kafka

If you just want to start one of the service on a given machine, then remove other’s config from your docker-compose.yml definition.

We are now ready to run our application. Run this command from the root project directory:

docker-compose up

Your services will start and you should start seeing the output from your services:

Creating kafkatwitterdocker_zookeeper_1 ...
Creating kafkatwitterdocker_zookeeper_1 ... done
Creating kafkatwitterdocker_kafka_1 ...
Creating kafkatwitterdocker_kafka_1 ... done
Creating kafkatwitterdocker_tweet-consumer_1 ...
Creating kafkatwitterdocker_tweet-producer_1 ...
Creating kafkatwitterdocker_tweet-producer_1
Creating kafkatwitterdocker_tweet-consumer_1 ... done
Attaching to kafkatwitterdocker_zookeeper_1, kafkatwitterdocker_kafka_1, kafkatwitterdocker_tweet-producer_1, kafkatwitterdocker_tweet-consumer_1

....

....

tweet-producer_1  | [Producer] RT @loopbak: Outlier Detection with Unsupervised Decision Trees: https://t.co/weUoj3vTcy #abdsc #BigData #DataScience… https://t.co/sYYcb2Z…
tweet-consumer_1  | [Consumer] [TagCount=3] RT @loopbak: Outlier Detection with Unsupervised Decision Trees: https://t.co/weUoj3vTcy #abdsc #BigData #DataScience¬タᆭ https://t.co/sYYcb2Z¬タᆭ
tweet-producer_1  | [Producer] RT @HPI_DE: Von #DataMining bis #DataAnalytics – wie man #BigData sinnvoll auswertet erfahren Sie im kostenlosen #MOOC @openHPI… 
tweet-consumer_1  | [Consumer] [TagCount=4] RT @HPI_DE: Von #DataMining bis #DataAnalytics ¬タモ wie man #BigData sinnvoll auswertet erfahren Sie im kostenlosen #MOOC @openHPI¬タᆭ 
tweet-producer_1  | [Producer] RT @AcadayLabs: The future #blockchain infrastructure Gavin Wood & Jutta Steiner at @disruptberlin https://t.co/PTmjNpg5D0… 
tweet-consumer_1  | [Consumer] [TagCount=1] RT @AcadayLabs: The future #blockchain infrastructure Gavin Wood & Jutta Steiner at @disruptberlin https://t.co/PTmjNpg5D0¬タᆭ 
tweet-producer_1  | [Producer] RT @rajneeshchhabra: 3 Essentials Of Every Digital Journey https://t.co/hi5hJZXmz9 #MachineLearning #IoT #IIoT #AI #ML #DL #BigData #Artifi…
tweet-consumer_1  | [Consumer] [TagCount=8] RT @rajneeshchhabra: 3 Essentials Of Every Digital Journey https://t.co/hi5hJZXmz9 #MachineLearning #IoT #IIoT #AI #ML #DL #BigData #Artifi¬タᆭ

If you run docker ps, you will see different containers started for your services:

$ docker ps
CONTAINER ID        IMAGE                                   COMMAND                  CREATED             STATUS              PORTS                                                NAMES
ca5d617191fe        saumitras01/tweet-producer-app:latest   "/project/start.sh /p"   11 seconds ago      Up 8 seconds        0.0.0.0:8080->8080/tcp                               kafkatwitterdocker_tweet-producer_1
9717ceba31f0        saumitras01/tweet-consumer-app:latest   "/project/start.sh /p"   11 seconds ago      Up 8 seconds        0.0.0.0:8081->8081/tcp                               kafkatwitterdocker_tweet-consumer_1
e47522435b83        saumitras01/kafka:latest                "start-kafka.sh"         13 seconds ago      Up 11 seconds       0.0.0.0:9092->9092/tcp                               kafkatwitterdocker_kafka_1
3810212cb549        saumitras01/zookeeper:v3.4.9            "/bin/sh -c '/usr/sbi"   14 seconds ago      Up 12 seconds       22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp   kafkatwitterdocker_zookeeper_1

There are a lot of other things to consider when going into production with Docker, but I hope this post gave you an idea of how the basic workflow will look.

Your API is not enough. Learn why (and how) leading SaaS providers are turning their products into platforms with API integration in the ebook, Build Platforms, Not Products from Cloud Elements.

Topics:
kafka ,docker ,scala ,sbt ,integration ,microservices

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}