A Non-Blocking State Machine
Learn more about non-blocking state machines in this quick post!
Join the DZone community and get the full member experience.
Join For FreeIn a previous article, I presented a simple state machine for Spring Boot projects. I mentioned that the framework is easy to customize for new requirements. In this article, I illustrate the customization of the framework for a situation where one or more processors need to be non-blocking.
I'll use the same order processing example considered in the previous article here as well, but with a change in the requirement that the PaymentProcessor
is a long-running process and, therefore, needs to be non-blocking.
You may also like: Java IO and NIO
State Transitions
The state transitions for the non-blocking order processing example can be written like:
Initial State | Pre-Event | Processor | Post-Event | Final State |
DEFAULT | submit | OrderProcessor | orderCreated | PMTPENDING |
PMTPENDING | pay | PaymentProcessor | paymentError | PMTERROREMAILPENDING |
PMTERROREMAILPENDING | errorEmail | PaymentErrorEmailProcessor | pmtErrorEmailSent | PMTPENDING |
PMTPENDING | pay | PaymentProcessor | paymentSuccess | PMTSUCCESSEMAILPENDING |
PMTSUCCESSEMAILPENDING | successEmail | PaymentSuccessEmailProcessor | pmtSuccessEmailSent | COMPLETED |
The above states and events are configured in Java enums like:
x
// OrderState.java
public enum OrderState implements ProcessState {
Default,
paymentErrorEmailPending,
PaymentPending,
paymentSuccessEmailPending,
Completed;
}
// OrderEvent.java
public enum OrderEvent implements ProcessEvent {
submit {
public Class<? extends AbstractProcessor> nextStepProcessor() {
return OrderProcessor.class;
}
/**
* This event has no effect on state so return current state
*/
public ProcessState nextState() {
return OrderState.Default;
}
public String getMessage() {
return "Order submitted";
}
},
orderCreated {
/**
* This event does not trigger any process
* So return null
*/
public Class<? extends AbstractProcessor> nextStepProcessor() {
return null;
}
public ProcessState nextState() {
return OrderState.PaymentPending;
}
public String getMessage() {
return "Order create, payment pending";
}
},
pay {
public Class<? extends AbstractProcessor> nextStepProcessor() {
return PaymentProcessor.class;
}
/**
* This event has no effect on state so return current state
*/
public ProcessState nextState() {
return OrderState.PaymentPending;
}
public String getMessage() {
return "We are processing your payment, please check your email for the order confirmation number";
}
},
paymentSuccess {
/**
* This event does not trigger any process
* So return null
*/
public Class<? extends AbstractProcessor> nextStepProcessor() {
return null;
}
public ProcessState nextState() {
return OrderState.paymentSuccessEmailPending;
}
public String getMessage() {
return "Payment success, processing success email";
}
},
paymentError {
/**
* This event does not trigger any process
* So return null
*/
public Class<? extends AbstractProcessor> nextStepProcessor() {
return null;
}
public ProcessState nextState() {
return OrderState.paymentErrorEmailPending;
}
public String getMessage() {
return "Payment processing error, processing error email";
}
},
errorEmail {
public Class<? extends AbstractProcessor> nextStepProcessor() {
return PaymentErrorEmailProcessor.class;
}
/**
* This event has no effect on state so return current state
*/
public ProcessState nextState() {
return OrderState.paymentErrorEmailPending;
}
public String getMessage() {
return "Payment error, processing error email";
}
},
successEmail {
public Class<? extends AbstractProcessor> nextStepProcessor() {
return PaymentSuccessEmailProcessor.class;
}
/**
* This event has no effect on state so return current state
*/
public ProcessState nextState() {
return OrderState.paymentSuccessEmailPending;
}
public String getMessage() {
return "Payment processing success, processing success email";
}
},
paymentErrorEmailSent {
/**
* This event does not trigger any process
* So return null
*/
public Class<? extends AbstractProcessor> nextStepProcessor() {
return null;
}
public ProcessState nextState() {
return OrderState.PaymentPending;
}
public String getMessage() {
return "Payment processing error, email sent";
}
},
paymentSuccessEmailSent {
/**
* This event does not trigger any process
* So return null
*/
public Class<? extends AbstractProcessor> nextStepProcessor() {
return null;
}
public ProcessState nextState() {
return OrderState.Completed;
}
public String getMessage() {
return "Payment processing success, order completed";
}
};
}
Asynchronous Non-Blocking Processors
A possible strategy for making a processor non-blocking is to use the Spring framework's @Async
annotation in conjunction with a callback function.
The code changes to accommodate the above-mentioned asynchronous strategy are:
// AsyncProcessor.java
public interface AsyncProcessor {
public void processAsync(ProcessData data, Consumer<ProcessData> consumerFn);
}
// Processor.java
public interface Processor {
public void process(ProcessData data);
}
// AbstractProcessor.java
public abstract class AbstractProcessor implements Processor, AsyncProcessor {
public ProcessData process(ProcessData data) {
//subclasses implement the body
return null;
}
public void processAsync(ProcessData data, Consumer<ProcessData> consumerFn) {
//subclasses implement the body
}
public abstract boolean isAsync();
}
Where I have introduced anAsyncProcessor
, which accepts a callback function consumerFn
. The AbstractProcessor
enables an implementing processor to be either synchronous or asynchronous.
An implementing processor like PaymentProcessor
can adopt an asynchronous strategy like:
xxxxxxxxxx
// PaymentProcessor.java
public class PaymentProcessor extends AbstractProcessor {
private final PaymentProcessorHelper helper;
public void processAsync(ProcessData data, Consumer<ProcessData> consumerFn) {
helper.process(data, consumerFn);
}
public boolean isAsync() {
return true;
}
}
// PaymentProcessorHelper.java
public class PaymentProcessorHelper {
/**
* A long running process
* @param data
* @param consumerFn
*/
"threadPoolTaskExecutor") (
public void process(ProcessData data, Consumer<ProcessData> consumerFn) {
try{
//simulate a long running process
Thread.sleep(2000);
if(((OrderData)data).getPayment() > 0) {
((OrderData)data).setEvent(OrderEvent.paymentSuccess);
} else {
((OrderData)data).setEvent(OrderEvent.paymentError);
}
}catch(InterruptedException e){
//TODO: Use a new state transition to include system error
}
consumerFn.accept(data);
}
}
// AppConfig.java
public class AppConfig implements AsyncConfigurer {
(name = "threadPoolTaskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(2);
executor.setQueueCapacity(20);
executor.setThreadNamePrefix("StateMachine-");
executor.initialize();
return executor;
}
/**
* Catches unhandled exceptions in @Async methods
*/
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new AsyncUncaughtExceptionHandler(){
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
log.error("Exception in the async method {}.{}() {} {}", method.getDeclaringClass().getSimpleName(), method.getName(), ex.getClass().getSimpleName(), ex.getMessage());
}
};
}
}
... where the PaymentProcessor
class is made non-blocking and asynchronous. All other processors are synchronous. The source for these classes is not shown here for brevity. The complete source is on GitHub. A thread pool management strategy like the one mentioned here is also used.
State Transitions Manager
The OrderStateTransitionsManager
class takes the form:
xxxxxxxxxx
// OrderStateTransitionsManager.java
public class OrderStateTransitionsManager extends AbstractStateTransitionsManager {
private final ApplicationContext context;
private final OrderDbService dbService;
protected void processPostEvent(ProcessData data) {
log.info("Post-event: " + data.getEvent().toString());
dbService.getStates().put(((OrderData) data).getOrderId(),
(OrderState) data.getEvent().nextState());
//if the post-event is either paymentSuccess or paymentError
//then initiate the email process
log.info("Final state: " + dbService.getStates().get(((OrderData) data).getOrderId()).name());
log.info("??*************************************");
if ((OrderEvent) data.getEvent() == OrderEvent.paymentSuccess) {
((OrderData) data).setEvent(OrderEvent.successEmail);
processPreEvent(data);
} else if ((OrderEvent) data.getEvent() == OrderEvent.paymentError) {
((OrderData) data).setEvent(OrderEvent.errorEmail);
processPreEvent(data);
}
}
protected ProcessData processStateTransition(ProcessData sdata) throws ProcessException {
OrderData data = (OrderData) sdata;
log.info("Pre-event: " + data.getEvent().toString());
AbstractProcessor processor = this.context.getBean(data.getEvent().nextStepProcessor());
if (processor.isAsync()) {
processor.processAsync(data, this::processPostEvent);
} else {
data = (OrderData)processor.process(data);
processPostEvent(data);
}
return data;
}
protected void initializeState(ProcessData sdata) throws OrderException {
OrderData data = (OrderData) sdata;
//validate state
checkStateForReturningCustomers(data);
if ((OrderEvent) data.getEvent() == OrderEvent.submit) {
UUID orderId = UUID.randomUUID();
data.setOrderId(orderId);
}
dbService.getStates().put(data.getOrderId(), (OrderState)((OrderEvent) data.getEvent()).nextState());
log.info("Initial state: " + dbService.getStates().get(data.getOrderId()).name());
}
private void checkStateForReturningCustomers(OrderData data) throws OrderException {
// returning customers must have a state
if (data.getOrderId() != null) {
if (this.dbService.getStates().get(data.getOrderId()) == null) {
throw new OrderException("No state exists for orderId=" + data.getOrderId());
} else if (this.dbService.getStates().get(data.getOrderId()) == OrderState.Completed) {
throw new OrderException("Order is completed for orderId=" + data.getOrderId());
}
}
}
public ConcurrentHashMap<UUID, OrderState> getStates() {
return dbService.getStates();
}
}
The complete source is on GitHub.
Order Processing Demo
The order processing demo includes two APIs, which can be used for testing, one to submit an order for which a synchronous response is returned and the other to submit a payment for the order. When the payment is submitted, the user gets a synchronous message saying the payment confirmation number will be sent in an email. Payment processing is performed asynchronously and an email is sent. Here are the requests and responses for the two APIs:
Test #1: Submit order:
$ curl -X POST http://localhost:8080/order/items
Response: "Order submitted, orderId = 1e6092da-3bef-4377-8a02-5a4cb93f4a96"
The logs display the state transitions like:
Initial state: Default
Pre-event: submit
Post-event: orderCreated
Final state: PaymentPending
Test #2: Submit an invalid payment for the order (amount = 0.00):
$ curl -X POST http://localhost:8080/orders/1e6092da-3bef-4377-8a02-5a4cb93f4a96/payment/0
Response: "We are processing your payment, please check your email for the order confirmation number."
The logs display the state transitions for the above cases like:
Initial state: PaymentPending
Pre-event: pay
Post-event: paymentError
Final state: paymentErrorEmailPending
Initial state: paymentErrorEmailPending
Pre-event: errorEmail
Sent payment error email
Post-event: paymentErrorEmailSent
Final state: PaymentPending
Test #3: Submit a valid payment for the order (amount > 0.00):
$ curl -X POST http://localhost:8080/orders/1e6092da-3bef-4377-8a02-5a4cb93f4a96/payment/1
Response: "We are processing your payment, please check your email for the order confirmation number."
The logs display the state transitions for the above cases like:
Initial state: PaymentPending
Pre-event: pay
Post-event: paymentSuccess
Final state: paymentSuccessEmailPending
Initial state: paymentSuccessEmailPending
Pre-event: successEmail
Sent payment success email
Post-event: paymentSuccessEmailSent
Final state: Completed
Test #4: Invalid event:
In this testing we try to submit a payment when the process state is already in COMPLETED status.
$ curl -X POST http://localhost:8080/orders/1e6092da-3bef-4377-8a02-5a4cb93f4a96/payment/2
Response:"Order is completed for orderId=1e6092da-3bef-4377-8a02-5a4cb93f4a96"
No state changes are reported in the logs for this test case.
Note that in the above tests I have considered one customer submitting order and paying for the order. Interested readers can run the included Apache JMeter test plan to test for multiple customers simultaneously placing orders and paying for the orders.
Conclusions
The simple state machine presented in a previous article is customized for a new requirement that one or more processors need to be non-blocking. It is shown that the state transitions can be maintained even when one or more processors are non-blocking.
Readers who would like to explore more can check out Spring's reactive state machine.
Readers interested in distributed state machines can checkout berndruecker.
Further Reading
Opinions expressed by DZone contributors are their own.
Comments