Unveiling RxMule
Join the DZone community and get the full member experience.
Join For FreeI'm super excited to announce RxMule, my latest open source project. RxMule provides reactive extensions for Mule ESB, via a set of specific bindings for RxJava.
For several years, I've been mulling over the idea of creating a DSL for configuring Mule. Indeed, there is a treasure trove of pre-existing transports and connectors for Mule, which is very compelling for anyone building connected applications (which, nowadays, is probably everybody). Unfortunately developers can be put off by the XML-based DSL used to configure Mule, and thus may pass the opportunity to leverage all this available goodness.
As Rx is gaining traction, more and more developers are getting accustomed to its concepts and primitives. With this mind, and knowing that Mule is at core an event processing platform, it dawned on me that instead of creating a DSL that would mimic the XML artifacts (which are Mule specific), I'd rather create bindings to allow using Mule's essential moving parts via Rx.
In summary, RxMule adds a number of classes to RxJava that make it possible to observe:
- Mule inbound endpoints from traditional transports, including global endpoints and endpoints defined by URIs,
- raw message sources, like the new HTTP Listener Connector,
- Anypoint Connectors message sources.
In short, RxMule allows creating Observable<MuleEvent>
instances from different sources. AMuleEvent is what Mule creates and processes. It wraps a MuleMessage which contains the actual data and meta-data that's being processed.
You can read more about the structure of a MuleMessage
here.
The following demonstrates an asynchronous HTTP-to-Redis bridge, that only accepts one request per remote IP:
rxMule .observeEndpointAsync(new URI("http://localhost:8080/publish")) .distinct( muleEvent -> { final String remoteAddressAndPort = muleEvent .getMessage() .getInboundProperty( "MULE_REMOTE_CLIENT_ADDRESS"); return substringBefore(remoteAddressAndPort, ":"); }) .subscribe( asAction((MessageConsumer) muleEvent -> { redisModule.publish( "http-requests", false, muleEvent.getMessageAsString(), muleEvent); LOGGER.info("Published: {}", muleEvent); }));
Published at DZone with permission of David Dossot, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments