DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Because the DevOps movement has redefined engineering responsibilities, SREs now have to become stewards of observability strategy.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Related

  • What Are Events? Process, Data, and Application Integrators
  • 7 Salesforce CRM Integration Methods You Must Know About
  • 5 Simple Tips to Keep Dockerized Apps Secure
  • 4 Best Practices for IoT OTA Updates

Trending

  • How to Submit a Post to DZone
  • DZone's Article Submission Guidelines
  • MCP Servers: The Technical Debt That Is Coming
  • Event Driven Architecture (EDA) - Optimizer or Complicator
  1. DZone
  2. Software Design and Architecture
  3. Integration
  4. Integrating Pub/Sub With MuleSoft

Integrating Pub/Sub With MuleSoft

iPaaS tools are built on synchronous RESTful APIs. For your marketing app to be integrated with your CRM app, you need to have appropriate RESTful endpoints.

By 
Himanshu Gupta user avatar
Himanshu Gupta
·
Aug. 12, 21 · Tutorial
Likes (4)
Comment
Save
Tweet
Share
16.2K Views

Join the DZone community and get the full member experience.

Join For Free

One of the benefits of my current job is that I get to work with companies from different industries. As someone who had only worked in financial services before, it has been a very rewarding experience to understand the kinds of challenges companies face in other industries and how they compare to challenges commonly found in financial services. Additionally, I also get to learn about their technology stack.

MuleSoft Logo

Recently, I had the opportunity to work with a popular iPaaS (integration platform as a service) solution called MuleSoft which was acquired by Salesforce. MuleSoft, like other integration solutions, makes it as easy as dragging and dropping connectors to connect your applications. Such tools are heavily used to integrate applications from different domains together. For example, you can link your CRM app to your marketing app, invoice app, and analytics app with a simple drag-and-drop.

As you can probably guess, iPaaS tools are built on synchronous RESTful APIs. For your marketing app to be integrated with your CRM app, you need to have appropriate RESTful endpoints. While this works fine, we know that synchronous REST calls have their limitations when compared with asynchronous calls. Instead of polling for data, you would rather have your apps implement event-driven architecture via pub/sub messaging pattern.

What Are the Advantages of Event-Driven Architecture and Pub/Sub Messaging?

You have undoubtedly heard of these terms before because, in the last few years, they have gotten extremely popular. Our world is full of events so your applications need to be event-driven. When a credit card is swiped, an event is fired which triggers a bunch of other downstream events. For example, your phone gets a real-time notification informing you that your card was just swiped, your credit card balance is updated, and so forth. 

I have previously written a post about EDA and its advantages but here are the main ones:

  • Perform analytics and actions on real-time data as opposed to stale data obtained from batch processing.
  • Identify issues in real-time instead of waiting till the batch is executed.
  • Push vs. pull - events are pushed to your applications as opposed to constantly polling for updates.
  • Loosely coupled applications.
  • Easier to scale.
  • Typical pub/sub benefits such as the efficient distribution of data, persistence, replay, and migration to the cloud.

Implementing Pub/Sub With MuleSoft and Solace PubSub+

Now that we have a better idea of why you should consider using pub/sub messaging with your iPaaS to implement event-driven architecture, let's see how to actually do that.

We will be using MuleSoft's Anypoint Studio which you can download for free from here (there may be a 30-day limit).

For our broker, we will be using PubSub+ deployed via Docker container on my laptop but you can also use PubSub+ cloud which has a free tier.

PubSub+ supports open APIs and protocols so it's really easy to use it with other products. For example, to integrate it with Mulesoft, we will be using Mulesoft's JMS module.

Usecase

We have a CRM system such as Salesforce which pushes updates to a REST endpoint whenever there is an account update such as an account being created. The update contains high-level information about the account itself (name, website, etc.), contact information, contract information, and location information (address). We have downstream applications such as marketing app, invoice app, and analytics app which are interested in one or more of these types of information contained in the entire account update.

Here is what our downstream applications are interested in:

  • Analytics app - interested in high-level account and contract information.
  • Invoice app - interested in contract information only.
  • Marketing app - interested in contact and location information.

Our goal is to digest the original payload (XML schema), parse it, and split it into 4 different smaller payloads:

  • High-level account information.
  • Contact information.
  • Location information.
  • Contract information.

Then, we need to publish this data on different topics and have different consumers subscribe to this data depending on their interest.

Notice that despite having multiple apps subscribing to the same data (both analytics and invoice apps are interested in contract data), we only need to publish it once. This is one of the key benefits of using a pub/sub messaging pattern. While it may not seem like a major benefit in our use case, it definitely makes a difference when you are dealing with high data volumes.

Additionally, we are able to dynamically filter the dataset using PubSub+'s rich hierarchical topics instead of having all our applications consume the same data and then having to filter themselves.

Schemas and Topics

One of the benefits of using an iPaaS such as Mulesoft is that it allows you to transform your data. In our case, we will be ingesting the XML payload but the output will be in JSON.

Here is the schema of the original payload (topic: company/sales/salesforce/customerAccount/all/created/v1/{accountId}):

<xs:schema attributeFormDefault="unqualified" elementFormDefault="qualified" xmlns:xs="http://www.w3.org/2001/XMLSchema">
  <xs:element name="AccountId" type="xs:string"/>
  <xs:element name="AccountName" type="xs:string"/>
  <xs:element name="AccountSource" type="xs:string"/>
  <xs:element name="AnnualRevenue" type="xs:int"/>
  <xs:element name="BillingCountryCode" type="xs:string"/>
  <xs:element name="BillingState" type="xs:string"/>
  <xs:element name="CreatedDate" type="xs:dateTime"/>
  <xs:element name="CurrencyIsoCode" type="xs:string"/>
  <xs:element name="IsActive" type="xs:string"/>
  <xs:element name="IsArchived" type="xs:string"/>
  <xs:element name="IsDeleted" type="xs:string"/>
  <xs:element name="LastModifiedById" type="xs:string"/>
  <xs:element name="LastModifiedDate" type="xs:dateTime"/>
  <xs:element name="LastReferencedDate" type="xs:dateTime"/>
  <xs:element name="LastViewedDate" type="xs:dateTime"/>
  <xs:element name="ContactName" type="xs:string"/>
  <xs:element name="ContactId" type="xs:int"/>
  <xs:element name="ContactEmail" type="xs:string"/>
  <xs:element name="Description" type="xs:string"/>
  <xs:element name="Industry" type="xs:string"/>
  <xs:element name="NumberOfEmployees" type="xs:short"/>
  <xs:element name="Type" type="xs:string"/>
  <xs:element name="BillingAddress" type="xs:string"/>
  <xs:element name="Website" type="xs:anyURI"/>
  <xs:element name="ProductCode" type="xs:int"/>
  <xs:element name="ContractNumber" type="xs:int"/>
  <xs:element name="ContractAddress" type="xs:string"/>
  <xs:element name="Sum_Units_Sold__c" type="xs:float"/>
  <xs:element name="SystemModstamp" type="xs:dateTime"/>
</xs:schema>


Here are the schemas for the 4 payloads after they have been parsed:

1. High-level account (topic: company/sales/salesforce/customerAccount/account/created/v1/{accountId}):

{
  "$schema": "http://json-schema.org/draft-04/schema#",
  "type": "object",
  "properties": {
    "Body": {
      "type": "object",
      "properties": {
        "AccountId": {
          "type": "string"
        },
        "AccountName": {
          "type": "string"
        },
        "Website": {
          "type": "string"
        }
      },
      "required": [
        "AccountId",
        "AccountName",
        "Website"
      ]
    }
  },
  "required": [
    "Body"
  ]
}


2. Contact (topic: company/sales/salesforce/customerAccount/contact/created/v1/{accountId}):

{
  "$schema": "http://json-schema.org/draft-04/schema#",
  "type": "object",
  "properties": {
    "Body": {
      "type": "object",
      "properties": {
        "ContactName": {
          "type": "string"
        },
        "ContactId": {
          "type": "string"
        },
        "ContactEmail": {
          "type": "string"
        }
      },
      "required": [
        "ContactName"
        "ContactId",
        "ContactEmail"
      ]
    }
  },
  "required": [
    "Body"
  ]
}


3. Location (topic: company/sales/salesforce/customerAccount/location/created/v1/{accountId}):

{
  "$schema": "http://json-schema.org/draft-04/schema#",
  "type": "object",
  "properties": {
    "Body": {
      "type": "object",
      "properties": {
        "BillingCountryCode": {
          "type": "string"
        },
        "BillingState": {
          "type": "string"
        },
        "BillingAddress": {
          "type": "string"
        }
      },
      "required": [
        "BillingCountryCode",
        "BillingState",
        "BillingAddress",
      ]
    }
  },
  "required": [
    "Body"
  ]
}


4. Contract (topic: company/sales/salesforce/customerAccount/contract/created/v1/{accountId}):

{
  "$schema": "http://json-schema.org/draft-04/schema#",
  "type": "object",
  "properties": {
    "Body": {
      "type": "object",
      "properties": {
        "ContractNumber": {
          "type": "string"
        },
        "ContractAddress": {
          "type": "string"
        }
      },
      "required": [
        "ContractNumber",
        "ContractAddress"
      ]
    }
  },
  "required": [
    "Body"
  ]
}


Phew, that's a lot of information to keep track of. We need to know our downstream applications, which events they are interested in, and what the schemas will be for those events. If only there was a design tool that allowed us to do that!

PubSub+ Event Portal

The event-driven Gods have heard you plead and granted your wish! Solace launched its new product called Event Portal recently which is "the market’s first event management toolset to design, create, discover, catalog, share, visualize, secure and manage all the events in your enterprise."

I used it to design the flow of our events and here is what it looks like:

Flow of Events Design

As you can see, we have our Salesforce app at the top publishing the original event containing raw payload which is then parsed by MuleSoft and published to PubSub+ for downstream applications to consume.

We can also create specific events and associate schemas in the Event Portal. For example, as you can see from the image above, our MuleSoft app is publishing accountContractUpdates event that is being subscribed to by InvoicingSystem application. Expanding that event shows us who created it, which topic it is published to, what the associated schema is and which applications are subscribing and publishing this event.

Viewing Specific Event Details

Again, while they may not seem very impressive right now, imagine how useful Event Portal would be to an organization with hundreds of applications and thousands of events!

Mulesoft Implementation

Now that we have our events documented, we can start implementing them.

Here is what my Mule workspace looks like:

Author's Mule Workspace

It consists of two flows: Publisher and Consumer.

The Publisher flow has an http listener which is listening to my endpoint localhost:5001/solace. When I issue a POST request against this endpoint, the listener will capture the payload and pass it to 4 different JMS Publish modules for publishing 4 different events to 4 different topics.

Here is the connection setting for JMS Publish module:

<jms:config name="Solace" doc:name="JMS Config" doc:id="635cb4d0-a727-4723-8d5f-6da38b806745" >
        <jms:generic-connection username="mule" password="mule">
            <jms:connection-factory >
                <jms:jndi-connection-factory connectionFactoryJndiName="/jms/cf/default">
                    <jms:custom-jndi-name-resolver >
                        <jms:simple-jndi-name-resolver jndiInitialFactory="com.solacesystems.jndi.SolJNDIInitialContextFactory" jndiProviderUrl="tcp://mule:mule@localhost:55555">
                            <jms:jndi-provider-properties >
                                <jms:jndi-provider-property key="Solace_JMS_VPN" value="default" />
                            </jms:jndi-provider-properties>
                        </jms:simple-jndi-name-resolver>
                    </jms:custom-jndi-name-resolver>
                </jms:jndi-connection-factory>
            </jms:connection-factory>
        </jms:generic-connection>
</jms:config>


And here is the config XML for one of the JMS Publish modules:

<jms:publish doc:name="Publish Account to Solace" doc:id="1a24df7f-c33d-4ca1-a551-0d097d85bb65" config-ref="Solace" destination='#[output text
ns ns0 http://schemas.xmlsoap.org/soap/envelope/
---
"company/sales/salesforce/customerAccount/account/created/v1/" ++ payload.ns0#Envelope.ns0#Body.AccountId]' destinationType="TOPIC">
            <jms:message >
                <jms:body ><![CDATA[#[output application/xml
ns ns0 http://schemas.xmlsoap.org/soap/envelope/
---
{
    ns0#Envelope: {
        ns0#Body: {
            AccountName: payload.ns0#Envelope.ns0#Body.AccountName,
            Website: payload.ns0#Envelope.ns0#Body.Website
        }
    }
}]]]></jms:body>
            </jms:message>
</jms:publish>


I have a second flow called Consumer which has a simple JMS Consume module listening to a queue followed by a JMS ack module for acknowledging messages.

Here is its config:

<flow name="Consumer" doc:id="e705a269-7978-40c5-a0d2-7279567278ad" >
        <jms:listener doc:name="Listener" doc:id="2c76ddb5-07ff-41aa-9496-31897f19d378" config-ref="Solace" destination="mule" ackMode="MANUAL">
            <jms:consumer-type >
                <jms:queue-consumer />
            </jms:consumer-type>
        </jms:listener>
        <logger level="INFO" doc:name="Logger" doc:id="cb53890f-9bc7-4bad-8512-55586aa61e8d" />
        <jms:ack doc:name="Ack" doc:id="c2b197a8-2fcb-4954-b748-071ffde36da5" ackId="#[%dw 2.0
output application/java
---
attributes.ackId]"/>
</flow>


You will need to create a queue for the consumer to bind to. In my case, I have created a queue called mule using Solace's UI and mapped a topic to it: company/sales/salesforce/customerAccount/>

Creating a Queue for Consumer

Notice that instead of specifying an exact topic, I have used Solace's wildcard > to select any topic that falls under that hierarchy. In our case, subscribing to this topic will allow us to consume all of the four events.

Now we are ready to test our setup. Using Postman, I have sent a POST request:

Testing Setup With Postman

And here is the output of my Mule flow:

INFO  2020-05-21 13:02:55,528 [[MuleRuntime].uber.03: [solace].Consumer.CPU_LITE @85a5ceb] [processor: Consumer/processors/0; event: ea2e4270-9b84-11ea-9fe8-a483e79ba806] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: 
org.mule.runtime.core.internal.message.DefaultMessageBuilder$MessageImplementation
{
  payload=<?xml version='1.0' encoding='UTF-8'?>
<ns0:Envelope xmlns:ns0="http://schemas.xmlsoap.org/soap/envelope/">
  <ns0:Body>
    <AccountName>Solace</AccountName>
    <Website>www.solace.com</Website>
  </ns0:Body>
</ns0:Envelope>
  mediaType=application/xml; charset=UTF-8
  attributes=org.mule.extensions.jms.api.message.JmsAttributes@460bec31
  attributesMediaType=application/java
}
INFO  2020-05-21 13:02:55,536 [[MuleRuntime].uber.03: [solace].Consumer.CPU_LITE @85a5ceb] [processor: Consumer/processors/0; event: ea2e4270-9b84-11ea-9fe8-a483e79ba806] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: 
org.mule.runtime.core.internal.message.DefaultMessageBuilder$MessageImplementation
{
  payload={
  "Envelope": {
    "Body": {
      "ContactName": "Himanshu",
      "ContactId": "2309402",
      "ContactEmail": "himanshu@solace.com"
    }
  }
}
  mediaType=application/json; charset=UTF-8
  attributes=org.mule.extensions.jms.api.message.JmsAttributes@395abcae
  attributesMediaType=application/java
}
INFO  2020-05-21 13:02:55,566 [[MuleRuntime].uber.03: [solace].Consumer.CPU_LITE @85a5ceb] [processor: Consumer/processors/0; event: ea2e4270-9b84-11ea-9fe8-a483e79ba806] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: 
org.mule.runtime.core.internal.message.DefaultMessageBuilder$MessageImplementation
{
  payload={
  "Envelope": {
    "Body": {
      "ContractNumber": "123456",
      "ContractAddress": "535 Legget Drive, 3rd Floor, Ottawa, Canada"
    }
  }
}
  mediaType=application/json; charset=UTF-8
  attributes=org.mule.extensions.jms.api.message.JmsAttributes@6a4910db
  attributesMediaType=application/java
}
INFO  2020-05-21 13:02:55,574 [[MuleRuntime].uber.03: [solace].Consumer.CPU_LITE @85a5ceb] [processor: Consumer/processors/0; event: ea2e4270-9b84-11ea-9fe8-a483e79ba806] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: 
org.mule.runtime.core.internal.message.DefaultMessageBuilder$MessageImplementation
{
  payload={
  "Envelope": {
    "Body": {
      "BillingCountryCode": "US",
      "BillingState": "NY",
      "BillingAddress": "535 Legget Drive, 3rd Floor, Ottawa, Canada"
    }
  }
}
  mediaType=application/json; charset=UTF-8
  attributes=org.mule.extensions.jms.api.message.JmsAttributes@130e6e64
  attributesMediaType=application/java
}


As you can see, our single consumer in our Consumer Mule flow was able to consume all the messages from our queue (mule) which was subscribing to just one topic.

Of course, to fully leverage the power of pub/sub messaging, you can show having multiple consumers using Solace's rich hierarchal topics to consume select events.

Wrap Up

That's it for this post! I wanted to show you how you can make iPaaS tools such as MuleSoft even more powerful by adding pub/sub messaging to them. With Solace PubSub+, you can easily use the JMS standard to connect to it and publish/consume messages.

If you work with an iPaaS, I would highly recommend considering PubSub+ to make your applications event-driven. PubSub+ Standard Edition is free to use, even in production!

MuleSoft app Event application IT Data (computing) Event-driven architecture

Opinions expressed by DZone contributors are their own.

Related

  • What Are Events? Process, Data, and Application Integrators
  • 7 Salesforce CRM Integration Methods You Must Know About
  • 5 Simple Tips to Keep Dockerized Apps Secure
  • 4 Best Practices for IoT OTA Updates

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!