Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Mule’s New Batch Module and Polling with Watermark

DZone's Guide to

Mule’s New Batch Module and Polling with Watermark

· Integration Zone
Free Resource

Share, secure, distribute, control, and monetize your APIs with the platform built with performance, time-to-value, and growth in mind. Free 90-day trial of 3Scale by Red Hat

A lot of new features have been introduced in the Mule 3.5 release. In this post I briefly explain how four of these new features can be used:
  • The batch module
  • The watermark and cron scheduler on the poll message processor
  • The new database connector

Then I show how to apply these new features to achieve similar functionality to that described in a previous blog post.

Batch Module

The batch module lets us design a Mule application to do batch processing in a simpler and more intuitive fashion.

In the new batch module, a batch job is divided into 4 parts; Input, Load and Dispatch (performed automatically by Mule), Process and On Complete.

These are explained below.

Input Phase

As the name implies, the input phase is the part in the batch job where the data is loaded and prepared to be processed by the batch module. Since we can invoke the batch by using the ‘batch execute’ (even from a flow), the input phase is optional. This is similar to the behaviour of (private) flows where the message source can be either another flow, batch job or an inbound-endpoint.

Load and Dispatch Phase

The “load and dispatch” phase is done automatically by Mule; here Mule puts each message from the payload on a queue for the process phase.

Process Phase

The process phase is divided into a series of batch steps, each batch step can have multiple message processors (outbound endpoints, flow references, java components etc), also each step can be executed based on whether the message succeeded or failed in the previous step.

On Complete Phase

The “On Complete” phase has the results of the batch job, these results can be used for reporting.

Further information about the batch module can be found in the following blog post and the Mule documentation.

Poll message processor

The poll processor has now been improved to include the functionality to define a cron expression as follows:

<poll>
<schedulers:cron-scheduler expression="0 0/1 * 1/1 * ? *" />
</poll>
It is also important to note that if you are going to use the polling frequency, this should now be defined in a sub-element (called fixed-frequency-scheduler) instead of an attribute on the poll processor itself.
<poll>
<fixed-frequency-scheduler frequency="1" timeUnit="MINUTES" startDelay="0"/>
</poll>

Watermark

Watermark enables us to get a certain value (for example the biggest id) from the polled items and save this value to an object store (if the object store is not defined, one is created and used automatically by the watermark).

To use this feature we need to specify either a selector and a selector-expression or else we can use the update-expression.

The Selector attribute can take MIN, MAX, FIRST, LAST. The selector expression takes the expression to get the required element for example #[message.payload['Id']].

<poll>
<fixed-frequency-scheduler frequency="1" timeUnit="MINUTES" />
<watermark variable="Id" default-expression="#[0]" selector="MAX" selector-expression="#[payload['id']]" />
</poll>
On the other hand the update-expression lets us define a custom expression which updates the watermark value (this would usually be a variable we set in the flow).
<poll>
<fixed-frequency-scheduler frequency="1" timeUnit="MINUTES" />
<watermark variable="Id" default-expression="#[0]" update-expression="#[flowVars['myId']]" />
</poll>...
<set-variable variableName="myId" value="#[payload[payload.size()-1]['id']]" />

Note; when debugging in Anypoint Studio, the watermark value is retained between different executions of the application, this is due to the fact that the watermark value is by default persisted on a persistant object store. If this is not the desired behaviour during development, one can change the object store to use the memory as described in another blog post  or else change the run configuration to clear application data.

The new database connector

Mule now features a new database connector, this has big improvements on the previous JDBC connector including streaming for selects, bulk mode support for inserts, support for auto-generated-keys, Datasense support, as well as better support for dynamic queries.

Revisiting Large Dataset Retrieval in Mule with the new Mule features.

In this section I define a couple of ways to do batch jobs with the new database connector and the watermark feature.

In the first example I show how we can define a batch job that polls at specified intervals for the next values. A limitation of the first example might be that we want all the values available in the database to be polled at a specified time (especially when using cron). This is handled in the second example.

In order to start creating the batch job, we first need to define the Spring bean for the data source (in this case it uses the derby data source).


<spring:beans>
	<spring:bean id="dataSource" class="org.springframework.jdbc.datasource.SimpleDriverDataSource">
		<spring:property name="driverClass" value="org.apache.derby.jdbc.EmbeddedDriver"/>
		<spring:property name="url" value="jdbc:derby:target/database/message;create=true"/>
		<spring:property name="username" value="user"/>
		<spring:property name="password" value="password"/>
	</spring:bean>
</spring:beans>

Then we need to specify the database configuration as follows:

<db:generic-config name="Generic_Database_Configuration" dataSource-ref="dataSource"/>

Having specified the database configuration we can start creating the batch job.

First we need to define our input phase, this has a polling element (which allows us to specify either a Cron expression or a fixed frequency for the polling frequency) which polls a Database select.

To use a fixed frequency, specify the polling element as follows:

<poll>
<fixed-frequency-scheduler frequency="1" timeUnit="MINUTES" startDelay="0"/>
</poll>

To use a cron scheduler, specify the polling element as follows:

<poll>
<schedulers:cron-scheduler expression="0 0/1 * 1/1 * ? *" />
</poll>

Now we can define the query using the new database connector. With the new database connector there are three ways to do a Select query:

  1. ‘Parameterized’
  2. ‘Dynamic’
  3. ‘From Template’

In this case we use a ‘Parameterized’ query since our select statement does not need to be dynamic, and since we are not going to reuse the query in our application.

Our Polling element should now look something similar to the following:

<poll doc:name="Poll">
	<fixed-frequency-scheduler frequency="1" timeUnit="HOURS"/>
	<db:select config-ref="Generic_Database_Configuration">
	<db:parameterized-query><![CDATA[SELECT KEY1, KEY2 FROM table1 ]]></db:parameterized-query>
	</db:select>
</poll>

Assuming we want to do a database query that will get us only the next items in the database we can use the watermark; in this case we can specify a selector and a selector-expression.

The “selector” attribute can take MIN, MAX, FIRST, LAST. In this example we use MAX since the auto generated number by the database is in ascending order. The selector expression takes the expression to get the required element. In this case; #[message.payload[‘key1’]].

We should also define the default-expression (the expression to be invoked at the first run) as well as the variable that we use to hold the watermark result (in this example we name it ‘Id’).

This should look similar to the following:

<watermark variable="Id" default-expression="#[0]"
					selector="MAX" selector-expression="#[message.payload['key1']]" />

Furthermore we should amend our query to select only the elements that are greater than the value of the ‘Id’ variable.

<db:parameterized-query><![CDATA[SELECT key1, key2 FROM table1 WHERE BETWEEN #[flowVars['Id'] +1 ] AND #[flowVars['Id'] + 10 ]]]></db:parameterized-query>

The poll element should now look like this:

			<poll doc:name="Poll">
				<fixed-frequency-scheduler frequency="1"
					timeUnit="HOURS" />
				<watermark variable="Id" default-expression="#[0]"
					selector="MAX" selector-expression="#[message.payload['key1']]" />
				<db:select config-ref="Generic_Database_Configuration"
					doc:name="Database">
					<db:parameterized-query><![CDATA[SELECT key1, key2 FROM table1 WHERE BETWEEN #[flowVars['Id'] +1 ] AND #[flowVars['Id'] + 10 ]]]></db:parameterized-query>
				</db:select>
			</poll>

Now that we have the input phase defined we can start defining the processing phase.

As shown previously, each item on the list is transformed into a record during the ‘Load and Dispatch’ phase,  and each record is then processed in the processing phase in a series of one or many batch steps.

We first need to group these items with a batch commit, the size attribute defines the number of items we want committed in each batch. After we define the batch commit, we can go ahead and define the database insert endpoint with a parametrized query.  Since we want the results to be inserted in bulk we should set the bulk mode attribute on the database endpoint to true.

	<batch:commit doc:name="Batch Commit" size="10">
		<db:insert config-ref="Generic_Database_Configuration" bulkMode="true" doc:name="Copy_of_Database">
			<db:parameterized-query><![CDATA[INSERT into table2(KEY1, KEY2) values(#[payload['key1']], #[payload['key2']])]]></db:parameterized-query>
		</db:insert>
	</batch:commit>

As an extra step, after this batch step we can also define a batch step which handles failing messages only (notice the accept-policy), for the sake of simplicity we are just going to define a logger.

			<batch:step name="BatchFailed" accept-policy="ONLY_FAILURES">
				<logger doc:name="Logger" level="ERROR"
					message="Record with the following payload has failed. Payload:: #[message.payload]" />
			</batch:step>

As the processing phase is now defined, the only missing piece is the ‘on complete’ phase.

As described previously the on complete phase is used to gather the results of the batch we processed. Here we are going to use a logger to show this feature, however, I am sure that these values can be used much more creatively to create more awesome reporting in real world applications.

		<batch:on-complete>
			<logger message="Number of failed Records: #[payload.failedRecords] "
				level="INFO" doc:name="Failed Records" />
			<logger message="Number of sucessfull Records: #[payload.successfulRecords]"
				level="INFO" doc:name="Sucessfull Records" />
			<logger message="ElapsedTime #[payload.getElapsedTimeInMillis()]"
				level="INFO" doc:name="Elapsed Time" />
		</batch:on-complete>

The following is the complete flow for this example:

<spring:beans>
	<spring:bean id="dataSource" class="org.springframework.jdbc.datasource.SimpleDriverDataSource">
		<spring:property name="driverClass" value="org.apache.derby.jdbc.EmbeddedDriver"/>
		<spring:property name="url" value="jdbc:derby:target/database/message;create=true"/>
		<spring:property name="username" value="user"/>
		<spring:property name="password" value="password"/>
	</spring:bean>
</spring:beans>
	<db:generic-config name="Generic_Database_Configuration"
		dataSource-ref="dataSource" />

	<batch:job name="batch-example-1">
		<batch:threading-profile poolExhaustedAction="WAIT" />
		<batch:input>
			<poll doc:name="Poll">
				<fixed-frequency-scheduler frequency="1"
					timeUnit="HOURS" />
				<watermark variable="Id" default-expression="#[0]"
					selector="MAX" selector-expression="#[message.payload['key1']]" />
				<db:select config-ref="Generic_Database_Configuration"
					doc:name="Database">
                    <db:parameterized-query><![CDATA[SELECT key1, key2 FROM table1 WHERE key1 BETWEEN #[flowVars['Id'] +1 ] AND #[flowVars['Id'] + 10 ]]]></db:parameterized-query>
				</db:select>
			</poll>
		</batch:input>
		<batch:process-records>
			<batch:step name="Batch_Step">
	<batch:commit doc:name="Batch Commit" size="10">
		<db:insert config-ref="Generic_Database_Configuration" bulkMode="true" >
			<db:parameterized-query><![CDATA[INSERT into table2(KEY1, KEY2) values(#[payload['key1']], #[payload['key2']])]]></db:parameterized-query>
		</db:insert>
	</batch:commit>
			</batch:step>
			<batch:step name="BatchFailed" accept-policy="ONLY_FAILURES">
				<logger doc:name="Logger" level="ERROR"
					message="Record with the following payload has failed. Payload:: #[message.payload]" />
			</batch:step>
		</batch:process-records>
		<batch:on-complete>
			<logger message="Number of failed Records: #[payload.failedRecords] "
				level="INFO" doc:name="Failed Records" />
			<logger message="Number of sucessfull Records: #[payload.successfulRecords]"
				level="INFO" doc:name="Sucessfull Records" />
			<logger message="ElapsedTime #[payload.getElapsedTimeInMillis()]"
				level="INFO" doc:name="Elapsed Time" />
		</batch:on-complete>
	</batch:job>

Now that we have seen how we can implement this very easily, we can move on and discuss how we can refine this to replicate the same functional behaviour as the “Large Dataset Retrieval in Mule” blog post.

There are three major differences between the “Large Dataset Retrieval in Mule” blog post implementation and the implementation above:

  1. The Id is not being reset to ’0′ after we complete the whole batch.
  2. We are not polling immediately after but we are waiting for the next polling frequency to be met.
  3. An HTTP endpoint is used to start the batch.

For the first requirement we need to change the watermark from using a selector expression to use an update-expression instead.

For the second requirement, we need:

  • To make sure that the scheduler starts exactly after each other
  • To make sure that no other scheduler starts before that poll completes.

The third requirement can be achieved by stopping and starting the batch input phase, however I do think that this is beyond the scope of this blog post.

The first requirement is pretty easy to implement, we just need to do a couple of simple changes.

The first change we need to do is to modify the water mark as follows:
<watermark variable="Id" default-expression="#[0]" update-expression="#[flowVars['myId']]" />
We also need to fill in the “myId” to ’0′ when no records are left to be processed and to the biggest value when further records exist such as the following:
			<choice>
				<when expression="#[payload.size() > 0]">
					<set-variable variableName="myId" value="#[payload[payload.size()-1]['key1']]" />
				</when>
				<otherwise>
					<set-variable variableName="myId" value="#[0]" doc:name="set id" />
				</otherwise>
			</choice>
We also need to modify the database query to be ordered on key1 in ascending order, as well as changing to filter values to be between the current id and the number of items we want to get:
<db:select config-ref="Generic_Database_Configuration">
						<db:parameterized-query><![CDATA[SELECT key1, key2 FROM table1 WHERE key1 BETWEEN #[flowVars['Id'] +1 ] AND #[flowVars['Id'] + 10 ] order by key1 asc]]></db:parameterized-query>
					</db:select>

The second requirement is a little bit more tricky to implement. For this requirement we we need to make sure that another poll does not happen during the current poll; to do this we should make sure that the scheduler is stopped just after the database select is done.

We also need to manually poll for further records in case the current result had records to process. In case there are no further records to be processed we need to make sure that the scheduler is kept running so it runs automatically within the specified interval.

This requires that we use a bit of custom Java code that gets the scheduler for us to get the required functionality to stop, start and schedule the poller.

This is the Java code I’ve used for this:

package com.ricston.blog.example;

import java.util.Collection;
import org.mule.api.MuleContext;
import org.mule.api.MuleEventContext;
import org.mule.api.MuleException;
import org.mule.api.schedule.Scheduler;
import org.mule.modules.schedulers.cron.CronScheduler;
import org.mule.transport.polling.schedule.FixedFrequencyScheduler;
import org.mule.util.Predicate;

public class SchedulerWrapper{


	protected Scheduler getScheduler(MuleEventContext eventContext) {
		MuleContext context = eventContext.getMuleContext();
		final String flowName = eventContext.getFlowConstruct().getName();

		// Get poll scheduler so as to stop and start and poll the endpoint.
		Collection<Scheduler> pollSchedulers = context.getRegistry()
				.lookupScheduler(new Predicate<String>() {
					@Override
					public boolean evaluate(String s) {
						// the scheduler name should be polling:// + flow
						// name/batchjob name + /..
						if (s.startsWith("polling://" + flowName + "/")) {

							return true;
						} else {
							return false;
						}
					}
				});

		// There should be only one scheduler
		if (pollSchedulers.size() == 1) {
			Scheduler pollScheduler = pollSchedulers.iterator().next();
			return pollScheduler;
		} else {
			throw new IllegalStateException("Was expecting one scheduler but there were:" + pollSchedulers.size());
		}

	}

	protected void stopScheduler(Scheduler scheduler) throws MuleException {
		if (scheduler instanceof FixedFrequencyScheduler) {
			@SuppressWarnings("rawtypes")
			FixedFrequencyScheduler fixedFrequencyScheduler = (FixedFrequencyScheduler) scheduler;
			fixedFrequencyScheduler.stop();
		} else if (scheduler instanceof CronScheduler) {
			CronScheduler cronScheduler = (CronScheduler) scheduler;
			cronScheduler.stop();
		} else {
			throw new IllegalArgumentException(
					"Expected instance of org.mule.transport.polling.schedule.FixedFrequencyScheduler or org.mule.transport.polling.schedule.CronScheduler, but argument was "
							+ scheduler.getClass());
		}
	}
	
	protected void startScheduler(Scheduler scheduler) throws MuleException {
		if (scheduler instanceof FixedFrequencyScheduler) {
			@SuppressWarnings("rawtypes")
			FixedFrequencyScheduler fixedFrequencyScheduler = (FixedFrequencyScheduler) scheduler;
			fixedFrequencyScheduler.start();
		} else if (scheduler instanceof CronScheduler) {
			CronScheduler cronScheduler = (CronScheduler) scheduler;
			cronScheduler.start();
		} else {
			throw new IllegalArgumentException(
					"Expected instance of org.mule.transport.polling.schedule.FixedFrequencyScheduler or org.mule.transport.polling.schedule.CronScheduler, but argument was " + scheduler.getClass());
		}
	}

	
	protected void scheduleScheduler(Scheduler scheduler) throws Exception{
		if (scheduler instanceof FixedFrequencyScheduler) {
			@SuppressWarnings("rawtypes")
			FixedFrequencyScheduler fixedFrequencyScheduler = (FixedFrequencyScheduler) scheduler;
			fixedFrequencyScheduler.schedule();
		} else if (scheduler instanceof CronScheduler) {
			CronScheduler cronScheduler = (CronScheduler) scheduler;
			cronScheduler.schedule();
		} else {
			throw new IllegalArgumentException(
					"Expected instance of org.mule.transport.polling.schedule.FixedFrequencyScheduler or org.mule.transport.polling.schedule.CronScheduler, but argument was " + scheduler.getClass());
		}
		
	}

}
Please note that this assumes that there is only one batch name that starts with the same batch name and followed by ‘/’,  however this can be refined further by using a regular expression.
package com.ricston.blog.example;

import org.mule.api.MuleEventContext;
import org.mule.api.lifecycle.Callable;
import org.mule.api.schedule.Scheduler;

public class StartScheduler extends SchedulerWrapper implements Callable {

	@Override
	public Object onCall(MuleEventContext eventContext) throws Exception {
		
		Scheduler scheduler = this.getScheduler(eventContext);
		this.startScheduler(scheduler);
		return eventContext.getMessage().getPayload();
	}
}
package com.ricston.blog.example;

import org.mule.api.MuleEventContext;
import org.mule.api.lifecycle.Callable;
import org.mule.api.schedule.Scheduler;

public class StopScheduler extends SchedulerWrapper implements Callable {

	@Override
	public Object onCall(MuleEventContext eventContext) throws Exception {
		
		Scheduler scheduler = this.getScheduler(eventContext);
		this.stopScheduler(scheduler);
		return eventContext.getMessage().getPayload();
	}

}
package com.ricston.blog.example;

import org.mule.api.MuleEventContext;
import org.mule.api.lifecycle.Callable;
import org.mule.api.schedule.Scheduler;

public class ScheduleScheduler extends SchedulerWrapper implements Callable {

	@Override
	public Object onCall(MuleEventContext eventContext) throws Exception {
		Scheduler scheduler = getScheduler(eventContext);
		this.scheduleScheduler(scheduler);
		return eventContext.getMessage().getPayload();
	}

}

The following is the whole batch job for the second example:

<spring:beans>
	<spring:bean id="dataSource" class="org.springframework.jdbc.datasource.SimpleDriverDataSource">
		<spring:property name="driverClass" value="org.apache.derby.jdbc.EmbeddedDriver"/>
		<spring:property name="url" value="jdbc:derby:target/database/message;create=true"/>
		<spring:property name="username" value="user"/>
		<spring:property name="password" value="password"/>
	</spring:bean>
</spring:beans>
	<db:generic-config name="Generic_Database_Configuration"
		dataSource-ref="dataSource" doc:name="Generic Database Configuration" />

	<batch:job name="batch-example-two">
		<batch:input>
			<poll>
				<schedulers:cron-scheduler expression="0 0/1 * 1/1 * ? *" />
				<watermark variable="Id" default-expression="#[0]"
					update-expression="#[flowVars['myId']]" />
					<db:select config-ref="Generic_Database_Configuration"
						doc:name="Database">
						<db:parameterized-query><![CDATA[SELECT key1, key2 FROM table1 WHERE key1 BETWEEN #[flowVars['Id'] +1 ] AND #[flowVars['Id'] + 10 ] order by key1 asc]]></db:parameterized-query>
					</db:select>
			</poll>
			<component class="com.ricston.blog.examples.StopScheduler"
				doc:name="Stop scheduler" />
			<choice>
				<when expression="#[payload.size() > 0]">
					<set-variable variableName="myId" value="#[payload[payload.size()-1]['key1']]" doc:name="set id" />
				</when>
				<otherwise>
					<set-variable variableName="myId" value="#[0]" doc:name="set id" />
				</otherwise>
			</choice>

		</batch:input>
		<batch:process-records>
			<batch:step name="Batch_Step">
				<batch:commit size="10">
					<db:insert config-ref="Generic_Database_Configuration"
						doc:name="Database" bulkMode="true">
						<db:parameterized-query><![CDATA[INSERT into table2(KEY1, KEY2) values(#[payload['key1']], #[payload['key2']])]]></db:parameterized-query>
					</db:insert>
				</batch:commit>
			</batch:step>
			<batch:step name="BatchFailed" accept-policy="ONLY_FAILURES">
				<logger doc:name="Logger" level="ERROR"
					message="Record with the following payload has failed. Payload:: #[message.payload]" />
			</batch:step>
		</batch:process-records>
		<batch:on-complete>
			<logger level="INFO" doc:name="Logger" message="Stopping Inputblock" />
			<choice>
				<when expression="#[payload.getProcessedRecords() > 0]">
					<component class="com.ricston.blog.example.ScheduleScheduler"
						doc:name="Schedule Scheduler" />
					<component class="com.ricston.blog.example.StartScheduler"
						doc:name="Start Scheduler" />
				</when>
				<otherwise>
					<!-- get scheduler running -->
					<component class="com.ricston.blog.example.StartScheduler"
						doc:name="Start Scheduler" />
				</otherwise>

			</choice>
			<logger message="Number of failed Records: #[payload.failedRecords] "
				level="INFO" doc:name="Failed Records" />
			<logger message="Number of sucessfull Records: #[payload.successfulRecords]"
				level="INFO" doc:name="Sucessfull Records" />
			<logger message="ElapsedTime #[payload.getElapsedTimeInMillis()]"
				level="INFO" doc:name="Elapsed Time" />
		</batch:on-complete>
	</batch:job>
I hope you enjoyed this blog post.

Explore the core elements of owning an API strategy and best practices for effective API programs. Download the API Owner's Manual, brought to you by 3Scale by Red Hat

Topics:

Opinions expressed by DZone contributors are their own.

THE DZONE NEWSLETTER

Dev Resources & Solutions Straight to Your Inbox

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

X

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}