State Transitions With Spring Integration
In this article, look at how to implement a non-blocking state machine with the Spring Integration framework.
Join the DZone community and get the full member experience.
Join For FreeIn a previous article, I presented a framework for a simple state machine. In a follow-up article, I customized the framework for non-blocking processes using callback functions. In this article, I propose an approach to implementing a non-blocking state machine with the Spring Integration framework. The Spring Integration framework provides many communication mechanisms between applications and components. I will be using just the Message Channels for my example. I'll also use Spring Boot to drive the state transitions.
Order Processing Example: State Transitions
I consider a simple order processing application where a customer creates an order and then pays for the order. The first step in implementing a state machine is to write the state transitions for the application. I will assume that the following are the allowable state transitions for the above order scenario.
Initial State | Pre-Event | Processor | Post-Event | Final State |
Default | create | orderProcessor() | orderCreated | PaymentPending |
PaymentPending | pay | paymentProcessor() | paymentError | PaymentErrorEmailSent |
PaymentErrorEmailSent | retryPay | paymentProcessor() | paymentSuccess | PaymenSuccessEmailSent |
PaymentPending | pay | paymentProcessor() | paymentSuccess | PaymenSuccessEmailSent |
I'll further assume that the orderProcessor()
is a blocking process and the paymentProcessor()
is a non-blocking process.
Configuring States and Events
The next step is to configure the states and events identified above. I use Java enums to configure the states and events.
The states are configured like:
//ProcessState is a marker interface
public interface ProcessState {
}
public enum OrderState implements ProcessState {
DEFAULT,
PAYMENTPENDING,
PAYINPROGRESS,
PAYMENTERROREMAILSENT,
REPAYINPROGRESS,
PAYMENTSUCCESSEMAILSENT
}
Note that I have added two additional states - PAYINPROGRESS and RETRYPAYINPROGRESS corresponding to the two pre-events PAY and RETRYPAY. This will help us in validating the pre-events that might arrive before the long running non-blocking process completes.
The events are then configured like:
xxxxxxxxxx
//ProcessEvent is a framework class
public interface ProcessEvent {
public abstract String getChannelName();
public abstract long getTimeout();
public abstract ProcessState nextState();
public abstract String getMessage();
}
public enum OrderEvent implements ProcessEvent {
CREATE {
public String getChannelName() {
return "createChannel";
}
/**
* This event has no effect on state so return current state
*/
public ProcessState nextState() {
return OrderState.DEFAULT;
}
public String getMessage() {
return "Order submitted";
}
public long getTimeout() {
//by setting the timeout a negative value
//we are making this a blocking message communication.
return -1;
}
},
ORDERCREATED {
/**
* This post-event does not trigger any process So return null
*/
public String getChannelName() {
return null;
}
public ProcessState nextState() {
return OrderState.PAYMENTPENDING;
}
public String getMessage() {
return "Order created, payment pending";
}
public long getTimeout() {
//by setting the timeout to zero
//we are making this a non-blocking message communication.
return 0;
}
},
PAY {
public String getChannelName() {
return "payChannel";
}
/**
* This event has no effect on state so return current state
*/
public ProcessState nextState() {
return OrderState.PAYMENTPENDING;
}
public String getMessage() {
return "We are processing your payment, please check your email for the order confirmation number";
}
public long getTimeout() {
//by setting the timeout to zero
//we are making this a non-blocking message communication.
return 0;
}
},
RETRYPAY {
/**
* This post-event does not trigger any process So return null
*/
public String getChannelName() {
return "payChannel";
}
public ProcessState nextState() {
return OrderState.PAYMENTERROREMAILSENT;
}
public String getMessage() {
return "We are processing your payment, please check your email for the order confirmation number";
}
public long getTimeout() {
return 0;
}
},
PAYMENTSUCCESS {
/**
* This post-event does not trigger any process So return null
*/
public String getChannelName() {
return null;
}
public ProcessState nextState() {
return OrderState.PAYMENTSUCCESSEMAILSENT;
}
public String getMessage() {
return "Payment success, email sent";
}
public long getTimeout() {
return 0;
}
},
PAYMENTERROR {
/**
* This post-event does not trigger any process So return null
*/
public String getChannelName() {
return null;
}
public ProcessState nextState() {
return OrderState.PAYMENTERROREMAILSENT;
}
public String getMessage() {
return "Payment processing error, email sent";
}
public long getTimeout() {
return 0;
}
}
}
Order Processing Components
The order processing components are shown in the following diagram...
...where I have also shown the message channels used. The Spring Integration framework provides Messaging Gateway components. However, the custom facade components shown above offer more flexibility for the state machine implementation.
Persisting State
An in-memory H2 database is used to persist the order states with the following configuration:
xxxxxxxxxx
public class OrderDbService {
private JdbcTemplate jdbcTemplate;
public OrderDbService() {
DriverManagerDataSource dataSource = new DriverManagerDataSource();
dataSource.setDriverClassName("org.h2.Driver");
dataSource.setUrl("jdbc:h2:mem:testdb");
dataSource.setUsername("SA");
dataSource.setPassword("");
this.jdbcTemplate = new JdbcTemplate(dataSource);
}
public String getOrderState(UUID uuid) {
String sql = "select state from ORDER_STATE where uuid='"+uuid+"'";
String res;
try {
res = jdbcTemplate.queryForObject(sql, String.class);
}catch(Exception e) {
res="";
}
return res;
}
public void saveState(UUID uuid, String state) {
String sql;
if(!this.getOrderState(uuid).isEmpty()) {
sql = "update ORDER_STATE set state='"+state+"' where uuid='"+uuid+"'";
} else {
sql = "insert into ORDER_STATE(uuid,state) values('"+uuid+"','"+state+"')";
}
jdbcTemplate.execute(sql);
}
}
A schema.sql file is used to create the table:
xxxxxxxxxx
DROP TABLE IF EXISTS ORDER_STATE;
CREATE TABLE ORDER_STATE(
UUID VARCHAR(50) PRIMARY KEY,
STATE VARCHAR(256) NOT NULL
);
For brevity of discussion I am not tracking order state history.
Spring Integration Message Channels Configuration
The next step is to configure the message channels. I will be using the xml configuration option as suggested in the Spring Integration sample.
xxxxxxxxxx
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/task https://www.springframework.org/schema/task/spring-task.xsd"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:task="http://www.springframework.org/schema/task">
<int:poller default="true" fixed-delay="50" ></int:poller>
<int:channel id="createChannel" ></int:channel>
<int:channel id="postEventHandlerChannel">
<int:queue capacity="10" ></int:queue>
</int:channel>
<int:channel id="payChannel">
<int:queue capacity="10" ></int:queue>
</int:channel>
<int:channel id="syncResponseChannel">
<int:queue capacity="10" ></int:queue>
</int:channel>
<bean id="dbService" class="rnd.statemachine.order.OrderDbService"></bean>
<int:service-activator
input-channel="createChannel" output-channel="syncResponseChannel"
method="process">
<bean class="rnd.statemachine.order.OrderProcessor" ></bean>
</int:service-activator>
<int:service-activator
input-channel="payChannel" output-channel="postEventHandlerChannel"
method="process">
<bean class="rnd.statemachine.order.PaymentProcessor" ></bean>
</int:service-activator>
<int:service-activator
input-channel="postEventHandlerChannel">
<bean class="rnd.statemachine.order.PostEventHandler">
<constructor-arg index="0" ref="dbService" ></constructor>
</bean>
</int:service-activator>
</beans>
OrderStateTransitionsMgrBlocking.java
— a facade class that handles all state transitions that need to be synchronous.
xxxxxxxxxx
public class OrderStateTransitionsMgrBlocking extends AbstractStateTransitionsManager {
private final OrderDbService dbService;
private final AbstractApplicationContext applicationContext;
private OrderStateTransitionsMgrBlocking(AbstractApplicationContext applicationContext, OrderDbService dbService) {
this.applicationContext = new ClassPathXmlApplicationContext("/META-INF/spring/integration/MessageChannelConfig.xml", OrderStateTransitionsMgrBlocking.class);
this.dbService = dbService;
}
protected ProcessData processStateTransition(ProcessData sdata) throws ProcessException {
OrderData data = (OrderData) sdata;
log.info("Initial state: " + (OrderState)((OrderEvent)data.getEvent()).nextState());
log.info("Pre-event: " + data.getEvent().toString());
Message<ProcessData> msg = MessageBuilder.withPayload((ProcessData)data).build();
long timeout = data.getEvent().getTimeout();
this.applicationContext.getBean(data.getEvent().getChannelName(), MessageChannel.class).send(msg, timeout);
data = (OrderData)this.applicationContext.getBean("syncResponseChannel", PollableChannel.class).receive(timeout).getPayload();
this.processPostEvent(data);
return data;
}
private void processPostEvent(ProcessData data) {
log.info("Post-event: " + data.getEvent().toString());
this.dbService.saveState(((OrderData) data).getOrderId(), ((OrderState)((OrderEvent) data.getEvent()).nextState()).name());
log.info("Final state: " + this.dbService.getOrderState(((OrderData) data).getOrderId()));
}
protected boolean hasValidPreState(ProcessData sdata) throws OrderException {
OrderData data = (OrderData) sdata;
OrderState currentState = (OrderState)((OrderEvent) data.getEvent()).nextState();
if (data.getOrderId() == null) {
UUID orderId = UUID.randomUUID();
this.dbService.saveState(orderId, OrderState.DEFAULT.name());
((OrderData) data).setOrderId(orderId);
return true;
} else {
return currentState.name().equals(this.dbService.getOrderState(data.getOrderId()));
}
}
}
Note that blocking is enabled in the above by setting the timeout < 0.
OrderStateTransitionsMgrNonBlocking.java
— a facade class that handles all state transitions that need to be non-blocking.
xxxxxxxxxx
public class OrderStateTransitionsMgrNonBlocking extends AbstractStateTransitionsManager {
private final OrderDbService dbService;
private final AbstractApplicationContext applicationContext;
private OrderStateTransitionsMgrNonBlocking(AbstractApplicationContext applicationContext, OrderDbService dbService) {
this.applicationContext = new ClassPathXmlApplicationContext("/META-INF/spring/integration/MessageChannelConfig.xml", OrderStateTransitionsMgrNonBlocking.class);
this.dbService = dbService;
}
protected ProcessData processStateTransition(ProcessData sdata) throws ProcessException {
OrderData data = (OrderData) sdata;
log.info("Initial state: " + (OrderState)((OrderEvent)data.getEvent()).nextState());
log.info("Pre-event: " + data.getEvent().toString());
Message<ProcessData> msg = MessageBuilder.withPayload((ProcessData)data).build();
long timeout = data.getEvent().getTimeout();
this.applicationContext.getBean(data.getEvent().getChannelName(), MessageChannel.class).send(msg, timeout);
return data;
}
protected boolean hasValidPreState(ProcessData sdata) throws OrderException {
OrderData data = (OrderData) sdata;
String inProgressState = ((OrderEvent) data.getEvent()).name()+"INPROGRESSS";
String expectedState = ((OrderState)((OrderEvent) data.getEvent()).nextState()).name();
String actualState = this.dbService.getOrderState(data.getOrderId());
boolean result = expectedState.equals(actualState);
if(result) {
this.dbService.saveState(data.getOrderId(), inProgressState);
} else if (inProgressState.equals(this.dbService.getOrderState(data.getOrderId()))) {
throw new IllegalStateException("The "+(OrderEvent) data.getEvent()+" event is in progress, please try few minutes later.");
}
return result;
}
}
Note that non-blocking is enabled by setting timeout=0.
OrderProcessor.java
:
xxxxxxxxxx
public class OrderProcessor implements Processor {
public ProcessData process(ProcessData data) {
((OrderData) data).setMessage("Order created, Taxes 12.00, Shipping 123.00, Total 1234.00, orderId = " + ((OrderData) data).getOrderId());
((OrderData) data).setEvent(OrderEvent.ORDERCREATED);
return data;
}
}
PaymentProcessor.java:
xxxxxxxxxx
public class PaymentProcessor implements Processor {
public ProcessData process(ProcessData data) {
try{
//simulate a long running process
Thread.sleep(1000L);
if(((OrderData)data).getPayment() > 0) {
((OrderData)data).setEvent(OrderEvent.PAYMENTSUCCESS);
//TODO: send payment success email
} else {
((OrderData)data).setEvent(OrderEvent.PAYMENTERROR);
//TODO: send payment error email
}
}catch(InterruptedException e){
//TODO: Use a new state transition to include system error
}
return data;
}
}
PostEventHandler.java:
xxxxxxxxxx
public class PostEventHandler {
private final OrderDbService dbService;
public void handlePostEvent(ProcessData data) {
log.info("Post-event: " + data.getEvent().toString());
dbService.saveState(((OrderData) data).getOrderId(),
((OrderState) data.getEvent().nextState()).name());
log.info("Final state: " + this.dbService.getOrderState(((OrderData) data).getOrderId()));
}
}
OrderController.java
- generates pre-events for the state machine.
xxxxxxxxxx
public class OrderController {
private final OrderStateTransitionsMgrBlocking stateTrasitionsMgrBlocking;
private final OrderStateTransitionsMgrNonBlocking stateTrasitionsMgrNonBlocking;
/**
* Quick API to test the payment event
* @param amount
* @param id
* @return
* @throws Exception
*/
"/orders/{id}/payment/{amount}") (
public String handleOrderPayment(
double amount,
UUID id) throws Exception {
OrderData data = new OrderData();
data.setPayment(amount);
data.setOrderId(id);
data.setEvent(OrderEvent.PAY);
data = (OrderData)stateTrasitionsMgrNonBlocking.processPreEvent(data);
return ((OrderEvent)data.getEvent()).getMessage();
}
"/orders/{id}/retrypayment/{amount}") (
public String handleOrderRePayment(
double amount,
UUID id) throws Exception {
OrderData data = new OrderData();
data.setPayment(amount);
data.setOrderId(id);
data.setEvent(OrderEvent.RETRYPAY);
data = (OrderData)stateTrasitionsMgrNonBlocking.processPreEvent(data);
return ((OrderEvent)data.getEvent()).getMessage();
}
value=ProcessException.class) (
public String handleOrderException(ProcessException e) {
return e.getMessage();
}
value=IllegalStateException.class) (
public String handleIllegalStateException(IllegalStateException e) {
return e.getMessage();
}
/**
* API to test the order create event
* @return
* @throws ProcessException
*/
"/order/items") (
public String handleOrderCreate() throws ProcessException {
OrderData data = new OrderData();
data.setEvent(OrderEvent.CREATE);
data = (OrderData)stateTrasitionsMgrBlocking.processPreEvent(data);
return ((OrderData)data).getMessage();
}
}
Full source for this sample application is available on GitHub.
Testing the State Machine
Due to the delayed responses from the non-blocking processes, the requests should be made with appropriate delays. Otherwise, either "Invalid state" or "The PAY/RETRYPAY event is in progress" responses are sent to the customer. The following scenarios can be tested:
Scenario #1: Create Order and make a valid payment(amount>0.00) - Customer receives success email.
Scenario #2: Create an order, make an invalid payment(amount=0.00) and followed by a valid payment without delay - Customer receives "The PAY event is in progress".
Scenario #3: Create an order, make an invalid payment and followed by a valid retry-payment without delay - Customer receives "Invalid state" response.
Scenario #4: Create an order, make an invalid payment and followed by a valid retry-payment with a delay(> 1s) - Customer receives payment success email.
I have included a JMeter test plan file so interested readers can run the above scenarios. The logs display the state transitions.
Conclusions
A simple example for a state machine with blocking and non-blocking processes is presented. It is shown that pre-defined state transitions can be maintained to produce robust applications even when non-blocking processes are introduced. The use of the state machine is found to produce clean and maintainable code. The Spring Integration framework is found to be minimally invasive and enables loose coupling of application components with simple configuration.
Opinions expressed by DZone contributors are their own.
Comments