Over a million developers have joined DZone.

Building an Apache Kafka Messaging Producer on Bluemix

DZone's Guide to

Building an Apache Kafka Messaging Producer on Bluemix

Kafka 0.9 was recently released, and now available as a beta Message Hub in Bluemix. Here's how to create a messaging producer on IBM Bluemix.

· Integration Zone
Free Resource

Modernize your application architectures with microservices and APIs with best practices from this free virtual summit series. Brought to you in partnership with CA Technologies.

Messaging systems have the benefit of decoupling the processing of data (consumers) from the data producers and with Kafka you can scale the consumers easily and separately. Furthermore, via asynchronous processing you can offload work and improve the application user experience. These capabilities are especially important for cloud native applications with microservices.

Recently Kafka 0.9 was released which is now available as Message Hub (beta) service in Bluemix. For developers there are different APIs available and my colleague Niall Weedon provided samples for how to use them. There is a Java API, a REST API and a Node.js API which wraps the REST API and adds IBM specific functionality like administration and easier authentication.

The Node.js sample is a simple chat application. It uses the Node module cfenv to access the Bluemix environment variables and the Node module message-hub-rest to access Kafka. I modified the sample slightly to 1. separate the consumer from the producer and to 2. remove the chat sample application. Below is the minimal code for a producer. Tomorrow I’ll blog more about the consumer.

You can run the producer either locally or on Bluemix. To run it on Bluemix, create a Bluemix Node.js application, add the Message Hub service and execute these commands from the project’s root directory.

cf login
cf push <mykafkaproducer>
curl http://mykafkaproducer.mybluemix.net

To run the producer locally, execute these commands.

npm install
node app.js <message_hub_rest_endpoint> <message_hub_api_key>
curl http://localhost:6003

In order to test whether it works, deploy the sample chat application and connect to see the messages from your producer. Both the chat application and your producer application need to use the same Message Hub service.

Image title


  "name": "node-kafka-producer",
  "version": "1.0.0",
  "description": "",
  "scripts": {
    "start": "node app.js"
  "dependencies": {
    "express": "4.12.x",
    "cfenv": "1.0.x",
    "message-hub-rest": "^1.0.1"
  "repository": {},
  "engines": {
    "node": "0.12.x"


 * Copyright 2015 IBM
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *    http://www.apache.org/licenses/LICENSE-2.0
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
var express = require('express');
var app = express();

var Cfenv = require('cfenv');
var MessageHub = require('message-hub-rest');
var appEnv = Cfenv.getAppEnv();
var instance;
var cleanedUp = false;
var topic = 'livechat';

app.listen(appEnv.port, '', function() {
  console.log("server starting on " + appEnv.url);

app.get('/', function (req, res) {
  pushMessage("Hello World!");
  res.send('Hello World!');

var start = function(restEndpoint, apiKey, callback) {
  if(!appEnv.services || (appEnv.services && Object.keys(appEnv.services).length === 0)) {
    if(restEndpoint && apiKey) {
      appEnv.services = {
        "messagehub": [
              "label": "messagehub",
              "credentials": {
                 "api_key": apiKey,
                 "kafka_rest_url": restEndpoint,
    } else {
      console.error('A REST Endpoint and API Key must be provided.');
  } else {
    console.log('Endpoint and API Key provided have been ignored, as there is a valid VCAP_SERVICES.');

  instance = new MessageHub(appEnv.services);

      .then(function(response) {
        console.log('topic created');
      .fail(function(error) {

var pushMessage = function(message) {
    var list = new MessageHub.MessageList();
    var message = {
      user: "Niklas",
      message: message,


    instance.produce(topic, list.messages)
      .fail(function(error) {
        throw new Error(error);

var registerExitHandler = function(callback) {
  if(callback) {
    var events = ['exit', 'SIGINT', 'uncaughtException'];

    for(var index in events) {
      process.on(events[index], callback);
  } else if(!callback) {
    throw new ReferenceException('Provided callback parameter is undefined.');

// Register a callback function to run when
// the process exits.
registerExitHandler(function() {

var stop = function(exitCode) {
  exitCode = exitCode || 0;

  if(!cleanedUp) {
    console.log('Running exit handler.');
    cleanedUp = true;

// If this module has been loaded by another module, don't start
// the service automatically. If it's being started from the command license
// (i.e. node app.js), start the service automatically.
if(!module.parent) {
  if(process.argv.length >= 4) {
    start(process.argv[process.argv.length - 2], process.argv[process.argv.length - 1]);
  } else {

module.exports = {
  start: start,
  stop: stop,
  appEnv: appEnv

The Integration Zone is proudly sponsored by CA Technologies. Learn from expert microservices and API presentations at the Modernizing Application Architectures Virtual Summit Series.

kafka ,bluemix ,node js

Published at DZone with permission of Niklas Heidloff, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}