Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Mule ESB and Microsoft Azure Service Bus

DZone's Guide to

Mule ESB and Microsoft Azure Service Bus

This article outlines a few ways of integrating Mule ESB with the MS Azure Service Bus Queues.

· Integration Zone ·
Free Resource

The Future of Enterprise Integration: Learn how organizations are re-architecting their integration strategy with data-driven app integration for true digital transformation.

Introduction

This article outlines a few ways of integrating Mule ESB with the MS Azure Service Bus Queues.

At the time of compiling this article, the following are the MS Azure cloud offerings with respect to the queues:

1. Azure Storage Queues

  • Azure Queue storage is a service for storing large numbers of messages that can be accessed from anywhere in the world via authenticated calls using HTTP or HTTPS
  • It’s an “Azure Storage Infrastructure” cloud offering

2. Azure Service Bus*

  • Microsoft Azure Service Bus is a reliable information delivery service. When two or more parties want to exchange information, they need a communication facilitator. Service Bus is a brokered, or third-party communication mechanism
  • It’s an “Azure Integration Space” cloud offering

Since it is an “integration space” Azure cloud queue offering, here are few ways in which we can leverage Azure Service Bus with the Mule ESB for "Writing a message."

As always for queues, the following things are in play:

  • Writing a message to the queue.
  • Reading a message from the queue. (Queue readers can be independent of ESB. So this does not require Mule ESB as shown below)

3. Writing a Message to the Queue

  1. Using Azure Java SDK
  2. Using Azure Service Bus REST APIs

3. Using Mule Azure Service Bus Connector ( Not covered in this article)

4. Reading a Message From the Queue (Does Not Require Mule ESB at All) 

  1. “Classic way” using Azure Java SDK — A Simple Java Class, which registers a callback
  2. Azure function Apps (“Uber Cool” way)

5. A Few Things to Consider

  1. Examples are compiled and executed in the Anypoint Studio Version 6.3.0 with Java 8
  2. May require free/pay-as-go/premium subscription for Azure Function Apps, Azure SQL Server, and Azure Service bus and queues
  3. The details for the following are omitted for brevity and out of scope for this article:
    • How to create Function apps and configure them.
    • How to procure and set-up Azure SQL Server
    • How to create an Azure service bus
    • Set up Azure Subscriptions and Resource groups
    • How to write flows in Anypoint Studio and Mule flows, basics on how to use HTTP and Java components, and how to deploy flows in the Studio integrated Mule runtime.

Implementation — Writing a Message

1.1 Writing a Message — Using Azure Java SDK

How to: We can integrate using Mule “Java Component” in any flow.

High-level approach:

  • Add Azure Java SDK jars as a maven dependency

2 Types of Implementations:

  • Sync

  • Async (using Java Futures)

Mule Design View:

Image title

Configuration XML: (Leverage Mule Java Component — Sync)

<http:listener-config name="HTTP_Listener_Configuration" host="0.0.0.0" port="8081" doc:name="HTTP Listener Configuration"/>


    <flow name="mule-azure-service-bus-java-compFlow">
        <http:listener config-ref="HTTP_Listener_Configuration" path="/azure/messages/java" doc:name="HTTP" allowedMethods="POST"/>
        <logger level="INFO" doc:name="Logger" message="Sending Message"/>
        <component class="com.utility.message.azure.MessageSender" doc:name="Java"/>
        <logger message="Send Message - Java - Success" level="INFO" doc:name="Logger"/>
    </flow>


Configuration XML: (Leverage Mule Java Component — Async)

    <flow name="mule-azure-service-bus-java-threaded">
        <http:listener config-ref="HTTP_Listener_Configuration" path="/azure/messages/java/threaded" allowedMethods="POST" doc:name="HTTP"/>
        <logger level="INFO" doc:name="Logger" message="Recieving Message"/>
        <component class="com.utility.message.azure.MessageSenderThread" doc:name="Java"/>
        <logger level="INFO" doc:name="Logger" message="Send Message - java threaded - Success"/>
    </flow>


Source Code for Java Mule Component — Sync

package com.utility.message.azure;

import java.io.UnsupportedEncodingException;

import org.mule.api.MuleEventContext;
import org.mule.api.lifecycle.Callable;
import org.springframework.beans.factory.annotation.Value;

import com.microsoft.azure.servicebus.Message;
import com.microsoft.azure.servicebus.QueueClient;
import com.microsoft.azure.servicebus.ReceiveMode;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;


public class MessageSender implements Callable {
 @Value("${servicebus.connstring}")
    private String connectionString;

 @Value("${servicebus.queue}")
    private String queueName;

@Override
public Object onCall(MuleEventContext eventContext) throws Exception {
     // Create a QueueClient instance for sending and then asynchronously send messages.
        // Close the sender once the send operation is complete.
        QueueClient sendClient = new QueueClient(new ConnectionStringBuilder(connectionString, queueName), ReceiveMode.PEEKLOCK);
       sendMessage(sendClient,eventContext.getMessageAsString(),eventContext.getSession().getId());
       return "true";
}

  private void sendMessage(QueueClient sendClient,String msg,String msgID) throws UnsupportedEncodingException, InterruptedException, ServiceBusException {
      Message message = new Message(msg.getBytes("UTF-8"));
          message.setContentType("application/json");
          message.setLabel(msgID);
          message.setMessageId(msgID);
          sendClient.send(message);
  }
}

Source Code for Java Mule Component — Async

package com.utility.message.azure;

import java.io.UnsupportedEncodingException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;

import org.mule.api.MuleEventContext;
import org.mule.api.lifecycle.Callable;

import com.microsoft.azure.servicebus.Message;
import com.microsoft.azure.servicebus.QueueClient;
import com.microsoft.azure.servicebus.ReceiveMode;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import org.springframework.beans.factory.annotation.Value;


public class MessageSenderThread implements Callable {
 @Value("${servicebus.connstring}")
    private String connectionString;

 @Value("${servicebus.queue}")
    private String queueName;

@Override
public Object onCall(MuleEventContext eventContext) throws Exception {
     // Create a QueueClient instance for sending and then asynchronously send messages.
        // Close the sender once the send operation is complete.
        QueueClient sendClient = new QueueClient(new ConnectionStringBuilder(connectionString, queueName), ReceiveMode.PEEKLOCK);
        this.sendMessagesAsync(sendClient,eventContext.getMessageAsString(),eventContext.getSession().getId()).thenRunAsync(() -> sendClient.closeAsync());
return "true";
}

  CompletableFuture<Void> sendMessagesAsync(QueueClient sendClient,String incomingMessage,String messageID) throws UnsupportedEncodingException {


        List<CompletableFuture> tasks = new ArrayList<>();
            final String messageId = "222";
            Message message = new Message(incomingMessage.getBytes("UTF-8"));
            message.setContentType("application/json");
            message.setLabel(messageID);
            message.setMessageId(messageId);
            //message.setTimeToLive(Duration.ofMinutes(2));
            System.out.printf("\nMessage sending: Id = %s", message.getMessageId());
            tasks.add(
                    sendClient.sendAsync(message).thenRunAsync(() -> {
                        System.out.printf("\n\tMessage acknowledged: Id = %s", message.getMessageId());
                    }));

        return CompletableFuture.allOf(tasks.toArray(new CompletableFuture<?>[tasks.size()]));
    }


}

Mule Application Properties (config.properties)

servicebus.connstring=Endpoint=sb://<your-queue-namespace>.servicebus.windows.net/;SharedAccessKeyName=<your-key-name>;SharedAccessKey=<your-key-value>
servicebus.queue=<your-queue-name>
servicebus.url=https://<your-queue-namespace>.servicebus.windows.net
servicebus.keyname=<your-key-name>
servicebus.accesskey=<your-key-value>

Substitute your own Azure Service Bus credentials in the following places:

<your-queue-namespace> , <your-key-name>, <your-key-value> and <your-queue-name>

With everything in place, run the above Mule Application.

Fire up your favorite rest client and hit the endpoint. In Postman, it may look like this.

Sync

Image title Async

Image title

Check in the MS Azure Service Bus Portal.

Image title

1.2 Writing a Message — Using Azure Service Bus REST APIs

How to: We can integrate using mule “Http Connector” and "Java component" in any flow. 

High-level approach:

Add Azure Java SDK jars as a maven dependency for generating Azure Tokens using Java class.

Call Azure Token Generator class from the mule Java Component.

Mule Design View:

Image title

Configuration XML: (Leverage Mule Java Component and Http Connector)

  <flow name="mule-azure-service-bus-http">
        <http:listener config-ref="HTTP_Listener_Configuration" path="/azure/messages/http" allowedMethods="POST" doc:name="HTTP"/>
        <logger level="INFO" doc:name="Logger" message="Recieving Message"/>
        <set-variable variableName="inputPayload" value="#[payload]" doc:name="Variable" encoding="UTF-8" mimeType="application/json"/>
        <component class="com.utility.connectivity.azure.AzureTokenComponent" doc:name="Java"/>
        <set-property propertyName="Authorization" value="#[payload]" doc:name="Set Auth Header"/>
        <set-variable variableName="messageID" value="#[new java.util.Random().nextInt(100)]" doc:name="Variable"/>
        <set-property propertyName="BrokerProperties" value="{&quot;Label&quot;:#[flowVars.messageID],&quot;TimeToLiveTimeSpan&quot;:&quot;&quot;}" doc:name="Property"/>
        <dw:transform-message doc:name="Transform Message">
            <dw:set-payload><![CDATA[%dw 1.0
%output application/json
---
flowVars.inputPayload]]></dw:set-payload>
        </dw:transform-message>
        <http:request config-ref="HTTP_Request_Configuration" path="/testq/messages" method="POST" doc:name="HTTP"/>
        <logger level="INFO" doc:name="Logger" message="Send Message - http - Success"/>
    </flow>

Source Code for Java Azure Token Generator

package com.utility.message.azure.utility;


import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;

import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;

public class AzureTokenGenerator {
public static String generateToken(String resourceUri, String keyName, String key)
  {
      long epoch = System.currentTimeMillis()/1000L;
      int week = 60*60*24*7;
      String expiry = Long.toString(epoch + week);

      String sasToken = null;
      try {
          String stringToSign = URLEncoder.encode(resourceUri, "UTF-8") + "\n" + expiry;
          String signature = getHMAC256(key, stringToSign);
          sasToken = "SharedAccessSignature sr=" + URLEncoder.encode(resourceUri, "UTF-8") +"&sig=" +
                  URLEncoder.encode(signature, "UTF-8") + "&se=" + expiry + "&skn=" + keyName;
          System.out.println("SAS Token:"+sasToken);
      } catch (UnsupportedEncodingException e) {

          e.printStackTrace();
      }

      return sasToken;
  }


public static String getHMAC256(String key, String input) {
    Mac sha256_HMAC = null;
    String hash = null;
    try {
        sha256_HMAC = Mac.getInstance("HmacSHA256");
        SecretKeySpec secret_key = new SecretKeySpec(key.getBytes(), "HmacSHA256");
        sha256_HMAC.init(secret_key);
        java.util.Base64.Encoder encoder = Base64.getEncoder();

        hash = new String(encoder.encode(sha256_HMAC.doFinal(input.getBytes("UTF-8"))));

    } catch (InvalidKeyException e) {
        e.printStackTrace();
    } catch (NoSuchAlgorithmException e) {
        e.printStackTrace();
    } catch (IllegalStateException e) {
        e.printStackTrace();
    } catch (UnsupportedEncodingException e) {
        e.printStackTrace();
    }

    return hash;
}
public static void main(String args[]) {
generateToken("<your-queue-uri>","<your-key-name>","<your-key-value>");
}



}

To unit-test the java class, substitute your own Azure Service Bus credentials in the following places:

<your-queue-uri>, <your-key-name>, <your-key-value>

Source Code for Java Mule Component

package com.utility.connectivity.azure;

import org.mule.api.MuleEventContext;
import org.mule.api.lifecycle.Callable;
import org.springframework.beans.factory.annotation.Value;

import com.utility.message.azure.utility.AzureTokenGenerator;;

public class AzureTokenComponent implements Callable {
 @Value("${servicebus.url}")
    private String aurl;

 @Value("${servicebus.keyname}")
    private String akeyname;

 @Value("${servicebus.accesskey}")
    private String akey;

@Override
public String onCall(MuleEventContext eventContext) throws Exception {
// TODO Auto-generated method stub

String authHeader = AzureTokenGenerator.generateToken(aurl,akeyname,akey);
return authHeader;
}

}

With everything in place, run the above Mule Application.

Fire up your favorite rest client and hit the endpoint. In Postman, it may look like this.

Image title

Verify if the message is posted in the MS Azure Service Bus Portal.

Image title

Implementation — Reading a Message

1.1 “Classic Way” Using Azure Java SDK — A Java Class

How to: We can read queues using simple Java program which registers a callback.

The callback will run where there is a message on the queue.

High-level approach:

Add Azure Java SDK jars , Apache Commons CLI, SL4J libraries as maven dependencies.

Register a callback for the Azure queue client.

Source Code for Java Queue Reader

package com.azure.util.queue;

import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.microsoft.azure.servicebus.ExceptionPhase;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.servicebus.IMessageHandler;
import com.microsoft.azure.servicebus.MessageHandlerOptions;
import com.microsoft.azure.servicebus.QueueClient;
import com.microsoft.azure.servicebus.ReceiveMode;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;

public class AzureQueueReader {
static final Logger logger = LoggerFactory.getLogger(AzureQueueReader.class);

    String ConnectionString = null;
    String QueueName = null;

    public void run() throws Exception {

        // Create a QueueClient instance for receiving using the connection string builder
        // We set the receive mode to "PeekLock", meaning the message is delivered
        // under a lock and must be acknowledged ("completed") to be removed from the queue
        QueueClient receiveClient = new QueueClient(new ConnectionStringBuilder(ConnectionString, QueueName), ReceiveMode.PEEKLOCK);
        this.registerReceiver(receiveClient);

        // wait for ENTER or 10 seconds elapsing
        waitForEnter(10);

        // shut down receiver to close the receive loop
        receiveClient.close();
    }


    void registerReceiver(QueueClient queueClient) throws Exception {

        // register the RegisterMessageHandler callback
        queueClient.registerMessageHandler(new IMessageHandler() {
                                               // callback invoked when the message handler loop has obtained a message
                                               public CompletableFuture<Void> onMessageAsync(IMessage message) {
                                                   // receives message is passed to callback
//                                                   if (message.getLabel() != null &&
//                                                           message.getContentType() != null) {

                                                       byte[] body = message.getBody();
                                                       String msgBody = new String(body);
                                                       System.out.printf(
                                                               "\n\t\t\t\tMessage received: \n\t\t\t\t\t\tMessageId = %s, \n\t\t\t\t\t\tSequenceNumber = %s, \n\t\t\t\t\t\tEnqueuedTimeUtc = %s," +
                                                                       "\n\t\t\t\t\t\tExpiresAtUtc = %s, \n\t\t\t\t\t\tContentType = \"%s\", \n\t\t\t\t\t\tLabel = \"%s\", \n\t\t\t\t\t\t Body = \"%s\"",
                                                               message.getMessageId(),
                                                               message.getSequenceNumber(),
                                                               message.getEnqueuedTimeUtc(),
                                                               message.getExpiresAtUtc(),
                                                               message.getContentType(),
                                                               message.getLabel(),
                                                               msgBody
                                                              );



//                                                   }
                                                   return CompletableFuture.completedFuture(null);
                                               }

                                               // callback invoked when the message handler has an exception to report
                                               public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
                                                   System.out.printf(exceptionPhase + "-" + throwable.getMessage());
                                               }
                                           },
                // 1 concurrent call, messages are auto-completed, auto-renew duration
                new MessageHandlerOptions(1, true, Duration.ofMinutes(1)));

    }

    public static void main(String[] args) {
    AzureQueueReader app = new AzureQueueReader();
            try {
                app.runApp(args);
                app.run();
            } catch (Exception e) {
                System.out.printf("%s", e.toString());
            }
          //  System.exit(0);
    }


    public void runApp(String[] args) {
        try {
            // parse connection string from command line
            Options options = new Options();
            options.addOption(new Option("c", true, "Connection string"));
            options.addOption(new Option("q", true, "Queue Name"));
            CommandLineParser parser = new BasicParser();
            CommandLine cl = parser.parse(options, args);
            if (cl.getOptionValue("c") != null && cl.getOptionValue("q") != null) {
                ConnectionString = cl.getOptionValue("c");
                QueueName =  cl.getOptionValue("q");
            }
            else
            {
                HelpFormatter formatter = new HelpFormatter();
                formatter.printHelp("run jar with", "", options, "", true);
            }
        } catch (Exception e) {
            System.out.printf("%s", e.toString());
        }
    }

    private void waitForEnter(int seconds) {
        ExecutorService executor = Executors.newCachedThreadPool();
        try {
            executor.invokeAny(Arrays.asList(() -> {
                System.in.read();
                return 0;
            }));
        } catch (Exception e) {
            // absorb
        }
    }

}

With everything in place - Run the Java application from the command line:

This java program requires 2 arguments for command line execution.

c - Connection String

q - Queue Name

In Unix like systems, here is the command to run:

Copy all dependencies required for the Java class in the lib folder

java -cp "lib/*:AzureServiceBusUtil-0.0.1-SNAPSHOT.jar" com.azure.util.queue.AzureQueueReader  -c "<your-queue-conn-string>" -q "<your-queue-name>"
  • <your-queue-conn-string> Generally it will be in the format like "Endpoint=sb://<your-queue-namespace>.servicebus.windows.net/;SharedAccessKeyName=<your-key-name>;SharedAccessKey=<your-key-value>"

Substitute your own credentials for the following:
<your-queue-namespace>, <your-key-name> and <your-key-value>

  • <your-queue-name>Generally it will be in the format like "<your-queue-name>"

Fire up your favorite rest client and Pump a message to queue. In postman it may look like this.

Image title

Check the logs of the Java application:
It should print a message with statement "Message received"

Image title

1.2 Azure Function Apps (“Uber Cool” Way)

  • At the time of writing this article, I could not find native support in "Function Apps" using Java, which can integrate with Azure Service Bus.
  • So instead of using Java, I used node.js code to demonstrate how can we "read a message from a queue" and put into Cloud SQL Server Database.
  • What this function app does is the following:
    • Reads a message from the Azure Service Bus Queue (it is configured as trigger)
    • Writes a message into Azure SQL Server table.

Azure Function App

Image title

Function App "Integrate" Tab Settings

Check the Trigger is configured as Azure Queue Connection. Image title

Source Code for Queue Reader in "Azure Function App" using node.js.

var Connection = require('tedious').Connection;
var Request = require('tedious').Request
var TYPES = require('tedious').TYPES;


module.exports = function (context, myQueueItem) {
    context.log('JavaScript queue trigger function processed work item:', myQueueItem);
    context.log('Correlation Data:', myQueueItem.correlationData);

var _currentData = {};

    var config = {
        userName: '<your-uname>',
        password: '<your-password>',
        server: '<your-azure-db-host>',
        options: {encrypt: true, database: 'trialdb'}
    };

    var connection = new Connection(config);

  connection.on('connect', function(err) {
        context.log("Connected");
        insertStatement(myQueueItem.correlationData);
    });


    function selectQuery() {

        request = new Request("SELECT * FROM dbo.s_table;", function(err) {
        if (err) {
            context.log(err);}
        });

        request.on('row', function(columns) {
            _currentData.id = columns[0].value;
            _currentData.payload = columns[1].value;;
            context.log(_currentData);
        });

        request.on('requestCompleted', function () {
                    connection.execSql(request);
        });
    }
    function insertStatement(payload) {
        context.log('Insert statement started');
                context.log('Payload'+payload);

        request = new Request("INSERT INTO DBO.S_TABLE(payload) VALUES(@payl);",function(err) {
        if (err) {
            context.log(err);}
        });
        request.addParameter('payl', TYPES.NVarChar,payload);  

        request.on('row', function(columns) {
            _currentData.id = columns[0].value;
            context.log(_currentData);
        });

        connection.execSql(request);

        context.log('Insert statement completed successfully');

    }

    context.done();
};

Substitute these values with your own Azure credentials.

<your-uname>, <your-password> , <your-azure-db-host>

With everything in place, let's test this out!

Let's pump a message into the queue using REST client. If it is Postman, it may be like below:

Image title

Check the logs of Azure Function App for message arrival.

Image title

Check the Azure SQL Server table.

Image title

So, this wraps up this article for both reading and writing messages.

Mahalo

Make your mark on the industry’s leading annual report. Fill out the State of API Integration 2019 Survey and receive $25 to the Cloud Elements store.

Topics:
mule 3.8 ,azure ,azure service bus ,service bus ,integration

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}