Java DSL for Spring Integration 1.2 Milestone 2 Is Available
With the release of the Java DSL for Spring Integration 1.2 M2 now available, we take a look at what's included and what can be done!
Join the DZone community and get the full member experience.
Join For Free1.2 M2
is available now!First of all, I’d like to thank everyone who created issues, raised Pull Requests, provided feedback or just asked questions on StackOverflow. Especial thanks for early adopters since the previous Milestone 1. With their help, we have improved and fixed some issues with runtime flow registration.
The artifact org.springframework.integration:spring-integration-java-dsl:1.2.0.M2
is available in the Milestone repo. So, give it a shot and don’t hesitate to raise a GH issue for any feedback!
Some highlights of the current iteration:
JPA Support
After many Community requests we finally introduced the Jpa
Factory and corresponding `IntegrationComponentSpec`s to provide a fluent API for the Spring Integration JPA components:
@Autowired
private EntityManagerFactory entityManagerFactory;
@Bean
public IntegrationFlow pollingAdapterFlow() {
return IntegrationFlows
.from(Jpa.inboundAdapter(this.entityManagerFactory)
.entityClass(StudentDomain.class)
.maxResults(1)
.expectSingleResult(true),
e -> e.poller(p -> p.trigger(new OnlyOnceTrigger())))
.channel(c -> c.queue("pollingResults"))
.get();
}
@Bean
public IntegrationFlow updatingGatewayFlow() {
return f -> f
.handle(Jpa.updatingGateway(this.entityManagerFactory),
e -> e.transactional(true))
.channel(c -> c.queue("persistResults"));
}
@Bean
public IntegrationFlow retrievingGatewayFlow() {
return f -> f
.handle(Jpa.retrievingGateway(this.entityManagerFactory)
.jpaQuery("from Student s where s.id = :id")
.expectSingleResult(true)
.parameterExpression("id", "payload"))
.channel(c -> c.queue("retrieveResults"));
}
Mid-Flow Transaction Support
"Inspired" by the complexity of transaction support configuration for the Spring Integration JPA components (actually programmatic TransactionalInterceptor
), we have introduced TransactionInterceptorBuilder
. In addition, we provide the TransactionHandleMessageAdvice
, which allows you to start transactions from any endpoint for the entire sub-flow, not only the handleRequestMessage
as it is in the case of regular ConsumerEndpointSpec.advice()
. Actually, the main trick is done by the HandleMessageAdvice
, recently introduced in Spring Integration Core, which is a marker interface to distinguish advice for the handleRequestMessage
only or for the flow starting from the current MessageHandler
. For convenience, a bunch of .transactional()
methods have been added to the ConsumerEndpointSpec
.
Scatter-Gather Support
The Scatter-Gather EI pattern now has its own Java DSL API:
@Bean
public IntegrationFlow scatterGatherFlow() {
return f -> f
.scatterGather(scatterer -> scatterer
.applySequence(true)
.recipientFlow(m -> true,
sf -> sf.handle((p, h) -> Math.random() * 10))
.recipientFlow(m -> true,
sf -> sf.handle((p, h) -> Math.random() * 10))
.recipientFlow(m -> true,
sf -> sf.handle((p, h) -> Math.random() * 10)),
gatherer -> gatherer
.releaseStrategy(group ->
group.size() == 3 ||
group.getMessages()
.stream()
.anyMatch(m -> (Double) m.getPayload() > 5)),
scatterGather -> scatterGather
.gatherTimeout(10_000));
}
Where the scatterer
is just a RecipientListRouter
, gatherer
— an AggregatingMessageHandler
, and the last Consumer
accept options for the ScatterGatherHandler
.
More Routers Improvements
The .routeToRecipients()
API now provides more configuration variants for recipients:
.routeToRecipients(r -> r
.recipient("foo-channel", "'foo' == payload")
.recipient("bar-channel", m ->
m.getHeaders().containsKey("recipient")
&& (boolean) m.getHeaders().get("recipient"))
.recipientFlow("'foo' == payload or 'bar' == payload or 'baz' == payload",
f -> f.<String, String>transform(String::toUpperCase)
.channel(c -> c.queue("recipientListSubFlow1Result")))
.recipientFlow((String p) -> p.startsWith("baz"),
f -> f.transform("Hello "::concat)
.channel(c -> c.queue("recipientListSubFlow2Result")))
.recipientFlow(new FunctionExpression<Message<?>>(m ->
"bax".equals(m.getPayload())),
f -> f.channel(c -> c.queue("recipientListSubFlow3Result")))
.defaultOutputToParentFlow())
Previously, the .route()
operator made the next .channel()
in the IntegrationFlow
as a defaultOutputChannel
of the Router
. According to the user experience, it doesn’t sound reasonable to make such a decision unconditional. We reworked .route()
to align it with the standard AbstractMessageRouter
behavior. The .defaultOutputChannel()
and .defaultSubFlowMapping()
have been added to utilize the default
logic for the Router
. To rollback to the previous behavior, the .defaultOutputToParentFlow()
is present, as you noticed by the .routeToRecipients()
sample above.
See the commit history for 1.2.0.M2
version for more information. And always read JavaDocs to understand the API you use!
Next Steps
We expect the first (and, we hope, the last) Release Candidate for version 1.2
in a couple weeks, after some adoption for Spring Integration 4.3.2 and Spring Boot 1.4.1. It’s soon enough, as spring-integration-java-dsl
will move to the Spring Integration Core5.0
and Java 8 code base. The current 1.2
version will be still supported, but just for bug fixes.
Published at DZone with permission of Artem Bilan. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments