{{announcement.body}}
{{announcement.title}}

A Non-Blocking State Machine

DZone 's Guide to

A Non-Blocking State Machine

Learn more about non-blocking state machines in this quick post!

· Java Zone ·
Free Resource

Learn more about non-blocking state machines in this quick post!

In a previous article, I presented a simple state machine for Spring Boot projects. I mentioned that the framework is easy to customize for new requirements. In this article, I illustrate the customization of the framework for a situation where one or more processors need to be non-blocking.

I'll use the same order processing example considered in the previous article here as well, but with a change in the requirement that the PaymentProcessor is a long-running process and, therefore, needs to be non-blocking.

You may also like: Java IO and NIO

State Transitions

 The state transitions for the non-blocking order processing example can be written like:                             

Initial State Pre-Event Processor Post-Event Final State
DEFAULT submit OrderProcessor  orderCreated   PMTPENDING 
PMTPENDING   pay PaymentProcessor  paymentError PMTERROREMAILPENDING
PMTERROREMAILPENDING errorEmail PaymentErrorEmailProcessor pmtErrorEmailSent PMTPENDING
PMTPENDING pay PaymentProcessor  paymentSuccess PMTSUCCESSEMAILPENDING
PMTSUCCESSEMAILPENDING successEmail PaymentSuccessEmailProcessor pmtSuccessEmailSent COMPLETED


The above states and events are configured in Java enums like:

Java




x


 
1
// OrderState.java 
2
public enum OrderState implements ProcessState {
3
    Default,
4
    paymentErrorEmailPending,
5
    PaymentPending,
6
    paymentSuccessEmailPending,
7
    Completed;
8
}
9
 
          
10
// OrderEvent.java
11
public enum OrderEvent implements ProcessEvent {
12
 
          
13
    submit {
14
        @Override
15
        public Class<? extends AbstractProcessor> nextStepProcessor() {
16
                return OrderProcessor.class;
17
        }
18
        
19
        /**
20
         * This event has no effect on state so return current state
21
         */
22
        @Override
23
        public ProcessState nextState() {
24
                return OrderState.Default;
25
        }
26
 
          
27
        @Override
28
        public String getMessage() {
29
            return "Order submitted";
30
        }
31
    },
32
    orderCreated {
33
        /**
34
         * This event does not trigger any process
35
         * So return null 
36
         */
37
        @Override
38
        public Class<? extends AbstractProcessor> nextStepProcessor() {
39
            return null;
40
        }
41
        
42
        @Override
43
        public ProcessState nextState() {
44
                return OrderState.PaymentPending;
45
        }
46
 
          
47
        @Override
48
        public String getMessage() {
49
            return "Order create, payment pending";
50
        }
51
    },
52
    pay {
53
        @Override
54
        public Class<? extends AbstractProcessor> nextStepProcessor() {
55
                return PaymentProcessor.class;
56
        }
57
        
58
        /**
59
         * This event has no effect on state so return current state
60
         */
61
        @Override
62
        public ProcessState nextState() {
63
                return OrderState.PaymentPending;
64
        }
65
 
          
66
        @Override
67
        public String getMessage() {
68
            return "We are processing your payment, please check your email for the order confirmation number";
69
        }
70
    },
71
    paymentSuccess {
72
        /**
73
         * This event does not trigger any process
74
         * So return null 
75
         */
76
        @Override
77
        public Class<? extends AbstractProcessor> nextStepProcessor() {
78
            return null;
79
        }
80
        @Override
81
        public ProcessState nextState() {
82
                return OrderState.paymentSuccessEmailPending;
83
        }
84
        @Override
85
        public String getMessage() {
86
            return "Payment success, processing success email";
87
        }
88
    },
89
    paymentError {
90
        /**
91
         * This event does not trigger any process
92
         * So return null 
93
         */
94
        @Override
95
        public Class<? extends AbstractProcessor> nextStepProcessor() {
96
            return null;
97
        }
98
        
99
        @Override
100
        public ProcessState nextState() {
101
                return OrderState.paymentErrorEmailPending;
102
        }
103
 
          
104
        @Override
105
        public String getMessage() {
106
            return "Payment processing error, processing error email";
107
        }
108
    },
109
    errorEmail {
110
        @Override
111
        public Class<? extends AbstractProcessor> nextStepProcessor() {
112
            return PaymentErrorEmailProcessor.class;
113
        }
114
        /**
115
         * This event has no effect on state so return current state
116
         */
117
        @Override
118
        public ProcessState nextState() {
119
                return OrderState.paymentErrorEmailPending;
120
        }
121
        @Override
122
        public String getMessage() {
123
            return "Payment error, processing error email";
124
        }
125
    },
126
    successEmail {
127
        @Override
128
        public Class<? extends AbstractProcessor> nextStepProcessor() {
129
            return PaymentSuccessEmailProcessor.class;
130
        }
131
        
132
        /**
133
         * This event has no effect on state so return current state
134
         */
135
        @Override
136
        public ProcessState nextState() {
137
                return OrderState.paymentSuccessEmailPending;
138
        }
139
 
          
140
        @Override
141
        public String getMessage() {
142
            return "Payment processing success, processing success email";
143
        }
144
    },
145
    paymentErrorEmailSent {
146
 
          
147
        /**
148
         * This event does not trigger any process
149
         * So return null 
150
         */
151
        @Override
152
        public Class<? extends AbstractProcessor> nextStepProcessor() {
153
            return null;
154
        }
155
        
156
        @Override
157
        public ProcessState nextState() {
158
                return OrderState.PaymentPending;
159
        }
160
 
          
161
        @Override
162
        public String getMessage() {
163
            return "Payment processing error, email sent";
164
        }
165
    },
166
    paymentSuccessEmailSent {
167
        /**
168
         * This event does not trigger any process
169
         * So return null 
170
         */
171
        @Override
172
        public Class<? extends AbstractProcessor> nextStepProcessor() {
173
            return null;
174
        }
175
        
176
        @Override
177
        public ProcessState nextState() {
178
                return OrderState.Completed;
179
        }
180
 
          
181
        @Override
182
        public String getMessage() {
183
            return "Payment processing success, order completed";
184
        }
185
    };
186
}



Asynchronous Non-Blocking Processors

A possible strategy for making a processor non-blocking is to use the Spring framework's @Async annotation in conjunction with a callback function.  

The code changes to accommodate the above-mentioned asynchronous  strategy are:

Java




x
27


 
1
// AsyncProcessor.java
2
public interface AsyncProcessor {
3
    public void processAsync(ProcessData data, Consumer<ProcessData> consumerFn);
4
}
5
 
          
6
// Processor.java
7
public interface Processor {
8
 
          
9
   public void process(ProcessData data);
10
 
          
11
}
12
 
          
13
// AbstractProcessor.java
14
public abstract class AbstractProcessor implements Processor, AsyncProcessor {
15
 
          
16
    public ProcessData process(ProcessData data) {
17
        //subclasses implement the body
18
        return null;
19
    }
20
 
          
21
    public void processAsync(ProcessData data, Consumer<ProcessData> consumerFn) {
22
        //subclasses implement the body
23
    }
24
 
          
25
    public abstract boolean isAsync();
26
}
27
 
          



Where I have introduced anAsyncProcessor, which accepts a callback function consumerFn . The  AbstractProcessor enables an implementing processor to be either synchronous or asynchronous.

An implementing processor like PaymentProcessor can adopt an asynchronous strategy like:

Java




xxxxxxxxxx
1
76


 
1
// PaymentProcessor.java
2
@Service
3
@RequiredArgsConstructor
4
public class PaymentProcessor extends AbstractProcessor {
5
    
6
    private final PaymentProcessorHelper helper;
7
    
8
    @Override
9
    public void processAsync(ProcessData data, Consumer<ProcessData> consumerFn) {
10
        helper.process(data, consumerFn);
11
    }
12
 
          
13
    @Override
14
    public boolean isAsync() {
15
        return true;
16
    }
17
}
18
 
          
19
// PaymentProcessorHelper.java
20
@Service
21
public class PaymentProcessorHelper {
22
     
23
    /**
24
     * A long running process
25
     * @param data
26
     * @param consumerFn
27
     */
28
    @Async("threadPoolTaskExecutor")
29
    public void process(ProcessData data, Consumer<ProcessData> consumerFn) {
30
        try{
31
            //simulate a long running process
32
            Thread.sleep(2000);
33
 
          
34
            if(((OrderData)data).getPayment() > 0) {
35
                ((OrderData)data).setEvent(OrderEvent.paymentSuccess);
36
            } else {
37
                ((OrderData)data).setEvent(OrderEvent.paymentError);
38
            }
39
        }catch(InterruptedException e){
40
            //TODO: Use a new state transition to include system error
41
        }
42
        consumerFn.accept(data);
43
    }
44
}
45
 
          
46
// AppConfig.java
47
@Configuration
48
@Slf4j
49
@EnableAsync
50
public class AppConfig implements AsyncConfigurer {
51
 
          
52
    @Bean(name = "threadPoolTaskExecutor")
53
    public Executor taskExecutor() {
54
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
55
        executor.setCorePoolSize(2);
56
        executor.setMaxPoolSize(2);
57
        executor.setQueueCapacity(20);
58
        executor.setThreadNamePrefix("StateMachine-");
59
        executor.initialize();
60
        return executor;
61
    }
62
 
          
63
    /**
64
     * Catches unhandled exceptions in @Async methods
65
     */
66
    @Override
67
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
68
        return new AsyncUncaughtExceptionHandler(){
69
            @Override
70
            public void handleUncaughtException(Throwable ex, Method method, Object... params) {
71
                log.error("Exception in the async method {}.{}() {} {}", method.getDeclaringClass().getSimpleName(), method.getName(), ex.getClass().getSimpleName(), ex.getMessage());     
72
            }
73
        };
74
    }
75
}
76
 
          



... where the PaymentProcessor class is made non-blocking and asynchronous. All other processors are synchronous. The source for these classes is not shown here for brevity. The complete source is on GitHub. A thread pool management strategy like the one mentioned here is also used.

State Transitions Manager

The OrderStateTransitionsManager class takes the form:

Java
xxxxxxxxxx
1
78
 
1
// OrderStateTransitionsManager.java
2
@RequiredArgsConstructor
3
@Slf4j
4
@Service
5
public class OrderStateTransitionsManager extends AbstractStateTransitionsManager {
6
 
          
7
    private final ApplicationContext context;
8
    private final OrderDbService dbService;
9
 
          
10
    @Override
11
    protected void processPostEvent(ProcessData data) {
12
 
          
13
        log.info("Post-event: " + data.getEvent().toString());
14
        dbService.getStates().put(((OrderData) data).getOrderId(),
15
                (OrderState) data.getEvent().nextState());
16
                
17
        //if the post-event is either paymentSuccess or paymentError
18
        //then initiate the email process
19
        log.info("Final state: " + dbService.getStates().get(((OrderData) data).getOrderId()).name());
20
        log.info("??*************************************");
21
 
          
22
        if ((OrderEvent) data.getEvent() == OrderEvent.paymentSuccess) {
23
            ((OrderData) data).setEvent(OrderEvent.successEmail);
24
            processPreEvent(data);
25
        } else if ((OrderEvent) data.getEvent() == OrderEvent.paymentError) {
26
            ((OrderData) data).setEvent(OrderEvent.errorEmail);
27
            processPreEvent(data);
28
        } 
29
    }
30
 
          
31
    @Override
32
    protected ProcessData processStateTransition(ProcessData sdata) throws ProcessException {
33
 
34
        OrderData data = (OrderData) sdata;
35
        log.info("Pre-event: " + data.getEvent().toString());
36
        AbstractProcessor processor = this.context.getBean(data.getEvent().nextStepProcessor());
37
        if (processor.isAsync()) {
38
            processor.processAsync(data, this::processPostEvent);
39
        } else {
40
            data = (OrderData)processor.process(data);
41
            processPostEvent(data);
42
        }
43
        return data;
44
    }
45
    @Override
46
    protected void initializeState(ProcessData sdata) throws OrderException {
47
 
          
48
        OrderData data = (OrderData) sdata;
49
 
          
50
        //validate state
51
        checkStateForReturningCustomers(data);
52
 
          
53
        if ((OrderEvent) data.getEvent() == OrderEvent.submit) {
54
            UUID orderId = UUID.randomUUID();
55
            data.setOrderId(orderId);
56
        }
57
 
          
58
        dbService.getStates().put(data.getOrderId(), (OrderState)((OrderEvent) data.getEvent()).nextState());
59
 
          
60
        log.info("Initial state: " + dbService.getStates().get(data.getOrderId()).name());
61
    }
62
 
          
63
    private void checkStateForReturningCustomers(OrderData data) throws OrderException {
64
        // returning customers must have a state
65
        if (data.getOrderId() != null) {
66
            if (this.dbService.getStates().get(data.getOrderId()) == null) {
67
                throw new OrderException("No state exists for orderId=" + data.getOrderId());
68
            } else if (this.dbService.getStates().get(data.getOrderId()) == OrderState.Completed) {
69
                throw new OrderException("Order is completed for orderId=" + data.getOrderId());
70
            } 
71
        }
72
    }
73
 
          
74
    public ConcurrentHashMap<UUID, OrderState> getStates() {
75
        return dbService.getStates();
76
    }
77
}
78
 
          


The complete source is on GitHub.

Order Processing Demo

The order processing demo includes two APIs, which can be used for testing, one to submit an order for which a synchronous response is returned and the other to submit a payment for the order. When the payment is submitted, the user gets a synchronous message saying the payment confirmation number will be sent in an email. Payment processing is performed asynchronously and an email is sent. Here are the requests and responses for the two APIs:

Test #1: Submit order:

 $ curl -X POST http://localhost:8080/order/items 

Response: "Order submitted, orderId = 1e6092da-3bef-4377-8a02-5a4cb93f4a96"

The logs display the state transitions like:

Initial state: Default
Pre-event: submit
Post-event: orderCreated
Final state: PaymentPending 

Test #2: Submit an invalid payment for the order (amount =  0.00):

 $ curl -X POST http://localhost:8080/orders/1e6092da-3bef-4377-8a02-5a4cb93f4a96/payment/0

Response: "We are processing your payment, please check your email for the order confirmation number."

The logs display the state transitions for the above cases like:

Initial state: PaymentPending 
Pre-event: pay 
Post-event: paymentError 
Final state: paymentErrorEmailPending

Initial state: paymentErrorEmailPending
Pre-event: errorEmail
Sent payment error email
Post-event: paymentErrorEmailSent
Final state: PaymentPending  

Test #3: Submit a valid payment for the order (amount > 0.00):

 $ curl -X POST http://localhost:8080/orders/1e6092da-3bef-4377-8a02-5a4cb93f4a96/payment/1

Response: "We are processing your payment, please check your email for the order confirmation number."

The logs display the state transitions for the above cases like:

Initial state: PaymentPending 
Pre-event: pay 
Post-event: paymentSuccess 
Final state: paymentSuccessEmailPending

Initial state: paymentSuccessEmailPending
Pre-event: successEmail
Sent payment success email
Post-event: paymentSuccessEmailSent
Final state: Completed  

Test #4: Invalid event:

In this testing we try to submit a payment when the process state is already in COMPLETED status.
$ curl -X POST http://localhost:8080/orders/1e6092da-3bef-4377-8a02-5a4cb93f4a96/payment/2

Response:"Order is completed for orderId=1e6092da-3bef-4377-8a02-5a4cb93f4a96"

No state changes are reported in the logs for this test case. 

Note that in the above tests I have considered one customer submitting order and paying for the order. Interested readers can run the included Apache JMeter test plan to test for multiple customers simultaneously placing orders and paying for the orders. 

Conclusions

The simple state machine presented in a previous article is customized for a new requirement that one or more processors need to be non-blocking. It is shown that the state transitions can be maintained even when one or more processors are non-blocking. 

Readers who would like to explore more can check out Spring's reactive state machine.
Readers interested in distributed state machines can checkout berndruecker.

Further Reading

Java IO and NIO

Why Non-Blocking? 

Topics:
state machine ,spring boot ,java ,non-blocking ,asynchronous

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}