Over a million developers have joined DZone.

How to Build a Streaming Application in Mule 4

DZone's Guide to

How to Build a Streaming Application in Mule 4

The goal is that upon deploying this flow, you will be able to take the input (a list of records) and parse it for emails.

· Integration Zone ·
Free Resource

The State of API Integration 2018: Get Cloud Elements’ report for the most comprehensive breakdown of the API integration industry’s past, present, and future.

We recently announced the GA release of Mule 4 and Studio 7, a major evolution of the core runtime behind Anypoint Platform. Over the past few months, we've discussed DataWeave, streaming, and building flows quickly and easily. Recently, we demoed these capabilities in the Mule 4 webinar, and we wanted to share this demo through this article!

Imagine that you are the developer for a marketing team: you are tasked with creating a project that will pull the emails of all the customers that have ordered a specific product and send it to the email marketing team. Today, we will be building a flow that processes a list of users, their emails, and their order history, and then exports that information into a separate file. The goal is that upon deploying this flow, you will be able to take the input (a list of records) and parse it for emails. Then you'll write the emails to a separate file while separately figuring out which record has a certain order type and writing those records to separate files.

Step 1:

Open Studio 7 and create a new project.


Step 2:

Drag, drop, and configure the HTTP listener.

For the purposes of this demo, I want to have this app run on my local machine only. This means you will have the port and host configured to my localhost on port 9090.


Step 3:

Pull in the async block and then drag the "file" connector with the "write" operation.

Now, you will execute on your first ambition: pulling all the emails from your list of records and writing them out to a separate file.

This is where the magic of DataWeave comes in. We are going to create an inline expression that takes in the payload and records only the "email" tag. That way, we can ignore the format of the data and just get right to the meat: the email tag! The DataWeave transformation should look like:

#[%dw 2.0
 output application/txt
 payload.customers map {
 "email": $.email

Here, we are telling DataWeave that the output file should have a .txt extension and the contents of the file are written by mapping each individual "customers'" email. Note, we did not write or create a double-nested loop to achieve this. DataWeave understands that the incoming payload data has an internal series of lists, from which we should be pulling a specific part of information (the emails) from.

Now, double check the XML configuration; it should look something like this:

<async doc:name="Async">
 <file:write config-ref="File_Config" doc:name="Write" path="connect-out/contacts.txt" mode="APPEND">
 <file:content><![CDATA[#[%dw 2.0
 output application/txt
 payload.customers map {
 "email": $.email


Why the async block? We wanted to show that an incoming stream can be consumed in parallel by two threads without being corrupted.

Step 4:

Now that the async block is done, you could deploy the application as it is. But you want to create a parallel thread, one that looks at each record and evaluates whether or not someone has placed an order. If they have placed an order, you want to create a separate file about that person.

If you remember, my input file is a list of lists, and normally, that would have required a double-nested loop to iterate through. This leaves a lot of room for error, especially if the stream is interrupted

So, let's pull in the forEach scope. Within this scope, create a conditional with the "choice router." Here, you'll use DataWeave to assess if each record has an order. If it has an order, then write the entire record out to a separate file. If it doesn't have an order, meaning that the "order" component is null, then no action is required.

Now, let's first set up the choice router, then you can resolve and configure it appropriately.

Your choice router is contingent on executing if the record has a valid order:

#[payload.order != null]

Then, write the output to the file with each record being its own file. You can define this with DataWeave: #['connect-out/output$(vars.counter).csv']

Let's break this down:

The "connect-out" part of the DataWeave line refers to my output file location, a folder in my general file structure called connect-out. Then, every file that is written into this folder will have the prefix "output" followed by the record number, going from 1 all the way to "n," depending on how many records it will go through. The "vars.counter" portion accounts for the numbers, and my folder will look something like: output1.csv, output2.csv, ...., output491.csv, etc.

In the event there is no order, then no action is required.

After the above step, the XML will look like the following:

<when expression="#[payload.order != null]">

 <file:write config-ref="File_Config" doc:name="Write"

Now you can run my application and test it.

I am using a JSON that is full of fake records as the input. It has more than a thousand records that look something like:

      "order": "179",
      "firstName": "Walter",
      "lastName": "Doe",
      "email": "waldoe@email.com",
      "gender": "Male",
      "address": ""

Note: It is important to define the content-type in the header.

The results of running the application are here:

What does this mean? This demo shows the following:

  1. Streaming: has been significantly enhanced with the introduction of Mule 4. The stream of data that follows from the HTTP listener is consumed in parallel via the "async" block and the "forEach" block.
  2. Seamless access to data: There are no transformations defined at any point during the flow. We use expressions that refer to the data structure of the input data, but not have to transform the input data into a digestible format — this is done automatically.
  3. DataWeave: is embedded in the connectors, so there are no side effects, like data breaking the transformations or hanging midstream, to the stream of data flowing through every component.

This demo shows that while the async block is running and writing a full list of customer emails to a TXT file, the rest of the flow is running in parallel on a separate thread. The result of the demo is that there is no conflict in execution.

Secondly, in Mule 3, all operations relied on the state of the Mule Message. If you had a piece of data that you wanted to transform and save, you would have to run the DataWeave transformation and write the output to the file. As a result of this approach, you could not revert back to the original format of the data after writing the content to the file. The transformation becomes permanent if you haven't duplicated and saved the original file separately.

Lastly, this example demonstrates best practices on the following:

  • Embedding DataWeave inside operations.
  • Using DataWeave to iterate over data — regardless of the format of the data.
  • Writing outputs to files without permanently affecting the state of the data in the original input.

This demo demonstrates a glimpse of some of the innovations that Mule 4 provides its users. MuleSoft's upcoming Summits provide an excellent opportunity for you to learn more about Mule 4 through interactive demos, breakout technical deep dives, training courses, and more.

Your API is not enough. Learn why (and how) leading SaaS providers are turning their products into platforms with API integration in the ebook, Build Platforms, Not Products from Cloud Elements.

mule ,mule 4 ,developer ,integration

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}