DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

The Latest Integration Topics

article thumbnail
NetBeans IDE 7.2 Introduces TestNG
One of the advantages of code generation is the ability to see how a specific language feature or framework is used. As I discussed in the post NetBeans 7.2 beta: Faster and More Helpful, NetBeans 7.2 beta provides TestNG integration. I did not elaborate further in that post other than a single reference to that feature because I wanted to devote this post to the subject. I use this post to demonstrate how NetBeans 7.2 can be used to help a developer new to TestNG start using this alternative (to JUnit) test framework. NetBeans 7.2's New File wizard makes it easier to create an empty TestNG test case. This is demonstrated in the following screen snapshots that are kicked off by using New File | Unit Tests (note that "New File" is available under the "File" drop-down menu or by right-clicking in the Projects window). Running the TestNG test case creation as shown above leads to the following generated test code. TestNGDemo.java (Generated by NetBeans 7.2) package dustin.examples; import org.testng.annotations.AfterMethod; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import org.testng.Assert; /** * * @author Dustin */ public class TestNGDemo { public TestNGDemo() { } @BeforeClass public void setUpClass() { } @AfterClass public void tearDownClass() { } @BeforeMethod public void setUp() { } @AfterMethod public void tearDown() { } // TODO add test methods here. // The methods must be annotated with annotation @Test. For example: // // @Test // public void hello() {} } The test generated by NetBeans 7.2 includes comments indicate how test methods are added and annotated (similar to modern versions of JUnit). The generated code also shows some annotations for overall test case set up and tear down and for per-test set up and tear down (annotations are similar to JUnit's). NetBeans identifies import statements that are not yet used at this point (import org.testng.annotations.Test; and import org.testng.Assert;), but are likely to be used and so have been included in the generated code. I can add a test method easily to this generated test case. The following code snippet is a test method using TestNG. testIntegerArithmeticMultiplyIntegers() @Test public void testIntegerArithmeticMultiplyIntegers() { final IntegerArithmetic instance = new IntegerArithmetic(); final int[] integers = {4, 5, 6}; final int expectedProduct = 2 * 3 * 4 * 5 * 6; final int product = instance.multiplyIntegers(2, 3, integers); assertEquals(product, expectedProduct); } This, of course, looks very similar to the JUnit equivalent I used against the same IntegerArithmetic class that I used for testing illustrations in the posts Improving On assertEquals with JUnit and Hamcrest and JUnit's Built-in Hamcrest Core Matcher Support. The following screen snapshot shows the output in NetBeans 7.2 beta from right-clicking on the test case class and selecting "Run File" (Shift+F6). The text output of the TestNG run provided in the NetBeans 7.2 beta is reproduced next. [TestNG] Running: Command line suite [VerboseTestNG] RUNNING: Suite: "Command line test" containing "1" Tests (config: null) [VerboseTestNG] INVOKING CONFIGURATION: "Command line test" - @BeforeClass dustin.examples.TestNGDemo.setUpClass() [VerboseTestNG] PASSED CONFIGURATION: "Command line test" - @BeforeClass dustin.examples.TestNGDemo.setUpClass() finished in 33 ms [VerboseTestNG] INVOKING CONFIGURATION: "Command line test" - @BeforeMethod dustin.examples.TestNGDemo.setUp() [VerboseTestNG] PASSED CONFIGURATION: "Command line test" - @BeforeMethod dustin.examples.TestNGDemo.setUp() finished in 2 ms [VerboseTestNG] INVOKING: "Command line test" - dustin.examples.TestNGDemo.testIntegerArithmeticMultiplyIntegers() [VerboseTestNG] PASSED: "Command line test" - dustin.examples.TestNGDemo.testIntegerArithmeticMultiplyIntegers() finished in 12 ms [VerboseTestNG] INVOKING CONFIGURATION: "Command line test" - @AfterMethod dustin.examples.TestNGDemo.tearDown() [VerboseTestNG] PASSED CONFIGURATION: "Command line test" - @AfterMethod dustin.examples.TestNGDemo.tearDown() finished in 1 ms [VerboseTestNG] INVOKING CONFIGURATION: "Command line test" - @AfterClass dustin.examples.TestNGDemo.tearDownClass() [VerboseTestNG] PASSED CONFIGURATION: "Command line test" - @AfterClass dustin.examples.TestNGDemo.tearDownClass() finished in 1 ms [VerboseTestNG] [VerboseTestNG] =============================================== [VerboseTestNG] Command line test [VerboseTestNG] Tests run: 1, Failures: 0, Skips: 0 [VerboseTestNG] =============================================== =============================================== Command line suite Total tests run: 1, Failures: 0, Skips: 0 =============================================== Deleting directory C:\Users\Dustin\AppData\Local\Temp\dustin.examples.TestNGDemo test: BUILD SUCCESSFUL (total time: 2 seconds) The above example shows how easy it is to start using TestNG, especially if one is moving to TestNG from JUnit and is using NetBeans 7.2 beta. Of course, there is much more to TestNG than this, but learning a new framework is typically most difficult at the very beginning and NetBeans 7.2 gets one off to a fast start.
June 11, 2012
by Dustin Marx
· 21,509 Views · 1 Like
article thumbnail
Spring Integration - Robust Splitter Aggregator
A Robust Splitter Aggregator Design Strategy - Messaging Gateway Adapter Pattern What do we mean by robust? In the context of this article, robustness refers to an ability to manage exception conditions within a flow without immediately returning to the caller. In some processing scenarios n of m responses is good enough to proceed to conclusion. Example processing scenarios that typically have these tendencies are: Quotations for finance, insurance and booking systems. Fan-out publishing systems. Why do we need Robust Splitter Aggregator Designs? First and foremost an introduction to a typical Splitter Aggregator pattern maybe necessary. The Splitter is an EIP pattern that describes a mechanism for breaking composite messages into parts in order that they can be processed individually. A Router is an EIP pattern that describes routing messages into channels - aiming them at specific messaging endpoints. The Aggregator is an EIP pattern that collates and stores a set of messages that belong to a group, and releases them when that group is complete. Together, those three EIP constructs form a powerful mechanism for dividing processing into distinct units of work. Spring Integration (SI) uses the same pattern terminology as EIP and so readers of that methodology will be quite comfortable with Spring Integration Framework constructs. The SI Framework allows significant customisations of all three of those constructs and furthermore, by simply using asynchronous channels as you would in any other multi-threaded configuration, allows those units of work to be executed in parallel. An interesting challenge working with SI Splitter Aggregator designs is building appropriately robust flows that operate predictably in a number of invocation scenarios. A simple splitter aggregator design can be used in many circumstances and operate without heavy customisation of the SI constructs. However, some service requirements demand a more robust processing strategy and therefore more complex configuration. The following sections describe and show what a Simple Splitter Aggregator design actually looks like, the type of processing your design must be able to deal with and then goes on to suggest candidate solutions for more robust processing. A Simple Splitter Aggregator Design The following Splitter Aggregator design shows a simple flow that receives document request messages into messaging gateway, splits the message into two processing routes and then aggregates the response. Note that the diagram has been built from EIP constructs in OmniGraffle rather than being an Integration Graph view from within STS; the channels are missing from the diagram for the sake of brevity. SI Constructs in detail: Messaging Gateways - there are three messaging gateways. A number of configurations are available for gateway specifications but significantly can return business objects, exceptions and nulls (following a timeout). The gateway to the far left is the service gateway for which we are defining the flow. The other two gateways, between the Router and Aggregator, are external systems that will be providing responses to business questions that our flow generates. The Splitter - a single splitter exists and is responsible for consuming the document message and producing a collection of messages for onward processing. The Java signature for the, most often, custom Splitter specifies a single object argument and a collection for return. The Recipient List Router - a single router exists, any appropriate router can be used, chose the one that closely matches your requirements - you can easily route by expression or payload type. The primary purpose of the router is route a collection of messages supplied by the splitter. This is a pretty typical Splitter Aggregator configuration. Aggregator - a single construct that is responsible for collecting messages together in a group in order that further processing can take place on the gateway responses. Although the Aggregator can be configured with attributes and bean definitions to provide alternative grouping and release strategies, most often the default aggregation strategy suffices. Interesting Aspects of Splitter Aggregator Operation Gateway - the inbound gateway, the one on the far left, may or may not have an error handling bean reference defined on it. If it does then that bean will have an opportunity to handle an exceptions thrown within the flow to the right of that gateway. If not, any exception will be thrown straight out of the gateway. Gateway - an optional default-reply-timeout can be set on each of the gateways, there are significant implications for setting this value, ensure that they're well understood. An expired timeout will result in a null being returned from the gateway. This is the very same condition that can lead to a thread getting parked if an upstream gateway also has no default-reply-timeout set. Splitter Input Channel - this can be a simple direct channel or a direct channel with a dispatcher defined on it. If the channel has a dispatcher specified the flow downstream of this point will be asynchronous, multi-threaded. This also changes the upstream gateway semantics as it usually means that an otherwise impotent default-reply-timeout becomes active. Splitter - the splitter must return a single object. The single object returned by the splitter is a collection, a java.util.List. The SI framework will take each member of that list and feed it into the output-channel of the Splitter - as with this example, usually straight into a router. The contract for Splitter List returns is as its use in Java - it may contain zero, one or more elements. If the splitter returns an empty list it's unlikely that the router will have any work to do and so the flow invocation will be complete. However, if the List contains one item, the SI framework will extract that item from the list and push it into the router, if this gets routed successfully, the flow will continue. Router - the router will simply route messages into one of two gateways in this example. Gateways - the two gateways that are used between the Splitter and Aggregator are interesting. In this example I have used the generic gateway EIP pattern to represent a message sub-system but not defined it explicitly - we could use an HTTP outbound gateway, another SI flow or any other external system. Of course, for each of those sub-systems, a number of responses is possible. Depending on the protocol and external system, the message request may fail to send, the response fail to arrive, a long running process invoked, a network error or timeout or a general processing exception. Aggregator - the single aggregator will wait for a number of responses depending on what's been created by the Splitter. In the case where the splitter return list is empty the Aggregator will not get invoked. In the case where the Splitter return list has one entry, the aggregator will be waiting for one gateway response to complete the group. In the case where the Splitter list has n entries the Aggregator will be waiting for n entries to complete the group. Custom correlation strategies, release strategies and message stores can be injected amongst a set of rich configuration aspects. Interesting Aspects of Simple Splitter Aggregator Operation The primary deciding factor for establishing whether this type of simple gateway is adequate for requirements is to understand what happens in the event of failure. If any exception occurring in your SI flow results in the flow invocation being abandoned and that suits your requirements, there's no need to read any further. If, however, you need to continue processing following failure in one of the gateways the remainder of this article may be of more interest. Exceptions, from any source, generated between the splitter and aggregator, will result in an empty or partial group being discarded by the Aggregator. The exception will propagate back to the closest upstream gateway for either handling by a custom bean or re-throwing by the gateway. Note that a custom release strategy on the Aggregator is difficult to use and especially so alongside timeouts but would not help in this case as the exception will propagate back to the leftmost gateway before the aggregator is invoked. It's also possible to configure exception handlers on the innermost gateways, the exception message could be caught but how do you route messages from a custom exception handler into the aggregator to complete the group, inject the aggregator channel definition into the custom exception handler? This is a poor approach and would involve unpacking an exception message payload, copying the original message headers into a new SI message and then adding the original payload - only four or five lines of code, but dirty it is. Following exception generation, exception messages (without modification) cannot be routed into an Aggregator to complete the group. The original message, the one that contains the correlation and sequence ids for the group and group position are buried inside the SI messages exception payload. If processing needs to continue following exception generation, it should be clear that in order to continue processing, the following must take place: the aggregation group needs to be completed, any exceptions must be caught and handled before getting back to the closet upstream gateway, the correlation and sequence identifiers that allow group completion in the aggregator are buried within the exception message payload and will require extraction and setting on the message that's bound for the aggregator A More Robust Solution - Messaging Gateway Adapter Pattern Dealing with exceptions and null returns from gateways naturally leads to a design that implements a wrapper around the messaging gateway. This affords a level of control that would otherwise be very difficult to establish. This adapter technique allows all returns from messaging gateways to be caught and processed as the messaging gateway is injected into the Service Activator and called directly from that. The messaging gateway no longer responds to the aggregator directly, it responds to a custom Java code Spring bean configured in the Service Activator namespace definition. As expected, processing that does not undergo exception will continue as normal. Those flows that experience exception conditions or unexpected or missing responses from messaging gateways need to process messages in such as way that message groups bound for aggregation can be completed. If the Service Activator were to allow the exception to be propagated outside of it's backing bean, the group would not complete. The same applies not just for exceptions but any return object that does not carry the prerequisite group correlation id and sequence headers - this is where the adaptation is applied. Exception messages or null responses from messaging gateways are caught and handled as shown in the following example code: import com.l8mdv.sample.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.integration.Message; import org.springframework.integration.MessageHeaders; import org.springframework.integration.support.MessageBuilder; import org.springframework.util.Assert; public class AvsServiceImpl implements AvsService { private static final Logger logger = LoggerFactory.getLogger(AvsServiceImpl.class); public static final String MISSING_MANDATORY_ARG = "Mandatory argument is missing."; private AvsGateway avsGateway; public AvsServiceImpl(final AvsGateway avsGateway) { this.avsGateway = avsGateway; } public Message service(Message message) { Assert.notNull(message, MISSING_MANDATORY_ARG); Assert.notNull(message.getPayload(), MISSING_MANDATORY_ARG); MessageHeaders requestMessageHeaders = message.getHeaders(); Message responseMessage = null; try { logger.debug("Entering AVS Gateway"); responseMessage = avsGateway.send(message); if (responseMessage == null) responseMessage = buildNewResponse(requestMessageHeaders, AvsResponseType.NULL_RESULT); logger.debug("Exited AVS Gateway"); return responseMessage; } catch (Exception e) { return buildNewResponse(responseMessage, requestMessageHeaders, AvsResponseType.EXCEPTION_RESULT, e); } } private Message buildNewResponse(MessageHeaders requestMessageHeaders, AvsResponseType avsResponseType) { Assert.notNull(requestMessageHeaders, MISSING_MANDATORY_ARG); Assert.notNull(avsResponseType, MISSING_MANDATORY_ARG); AvsResponse avsResponse = new AvsResponse(); avsResponse.setError(avsResponseType); return MessageBuilder.withPayload(avsResponse) .copyHeadersIfAbsent(requestMessageHeaders).build(); } private Message buildNewResponse(Message responseMessage, MessageHeaders requestMessageHeaders, AvsResponseType avsResponseType, Exception e) { Assert.notNull(responseMessage, MISSING_MANDATORY_ARG); Assert.notNull(responseMessage.getPayload(), MISSING_MANDATORY_ARG); Assert.notNull(requestMessageHeaders, MISSING_MANDATORY_ARG); Assert.notNull(avsResponseType, MISSING_MANDATORY_ARG); Assert.notNull(e, MISSING_MANDATORY_ARG); AvsResponse avsResponse = new AvsResponse(); avsResponse.setError(avsResponseType, responseMessage.getPayload(), e); return MessageBuilder.withPayload(avsResponse) .copyHeadersIfAbsent(requestMessageHeaders).build(); } } Notice the last line of the catch clause of the exception handling block. This line of code copies the correlation and sequence headers into the response message, this is mandatory if the aggregation group is going to be allowed to complete and will always be necessary following an exception as shown here. Consequences of using this technique There's no doubt that introducing a Messaging Gateway Adapter into SI config makes the configuration more complex to read and follow. The key factor here is that there is no longer a linear progression through the configuration file. This because the Service Activator must forward reference a Gateway or a Gateway defined before it's adapting Service Activator - in both cases the result is the same. Resources Note:- The design for the software that drove creation of this meta-pattern was based on a requirement that a number of external risk assessment services would be accessed by a single, central Risk Assessment Service. In order to satisfy clients of the service, invocation had to take place in parallel and continue despite failure in any one of those external services. This requirement lead to the design of the Messaging Gateway Adapter Pattern for the project. Spring Integration Reference Manual The solution approach for this problem was discussed directly with Mark Fisher (SpringSource) in the context of building Risk Assessment flows for a large US financial institution. Although the configuration and code is protected by NDA and copyright, it's acceptable to express the design intention and similar code in this article.
June 3, 2012
by Matt Vickery
· 23,373 Views
article thumbnail
Eclipse Working Sets Explained
eclipse comes with a large set of different views: they allow the developer to represent the information in various forms and with different angles. most of these views are navigation oriented: a perfect example for this is the projects view or the outline view . but over time i add more projects, more resources to my project, and at a certain time things get overwhelming. i have a lot of projects, and i do not want to switch between workspace too often. yes, i can open and close projects, but this gets cumbersome too. thankfully, there is a solution in eclipse: working sets . working sets allow me to group elements for display in views. with that, i can do operations on a set of elements in that working set. especially as i’m using many projects the same time, working sets are a big help to focus on the right set of things at a time. i can define a set of things i want to look at, work with, or whatever: it allows me to get be productive in the universe of my environment. building/compiling a working set a nice feature of using working sets is to build a set of projects. instead of selecting a set of projects and then to compile them together, i use a working set. i use the menu project > build working set > select working set… to create or change working sets: menu to select working set if i have no working set defined, then this will show the following dialog where i can press new… to create a new one: creating new working set to create a working set of c/c++ projects, i select c/c++ and press next : new c c++ working set next i give a name and select the project(s) which shall be in my working set, and press finish : defining working set to build my set of projects, i can select the working set and press ok : selected working set search in a working set it is possible to limit the search to a working set. for this i can choose a working set as scope in the search dialogs: search in a working set managing working sets to manage working sets, i press ctrl+3 (see quick access ) and choose manage working sets… : ctrl+3 with manage working sets… note: i can add extra tool-bars and menus for working sets too (this is explained later). then i can manage my working sets: manage working set configuration project view filtering with working sets i can filter the projects shown in the project view based on working sets. for this i select the small triangle and select/define a working set or choose one from the most recently used sets: working sets for project view with this i can easily filter and focus on a subset of projects: project view with applied working set really cool tip: i’m using working sets as well to avoid too many workspaces. instead of having projects spread over different workspaces, i can keep them in one workspace and use working sets instead. there is an added benefit of using working sets: having too many projects open at the same time in eclipse can slow down the ide: using working sets allows me just to switch quickly between the set of projects i’m working on. but it does not stop at filtering by projects: you can filter even things inside the project structure. i simply deselect things i don’t want to see and can focus on what is important for me: filtering project files export and import of working sets note: import and export of working sets is not part of the standard codewarrior eclipse distribution. you get the import/export feature installed with the mqx plugins (www.freescale.com/mqx) or with the anyedit plugins (http://andrei.gmxhome.de/anyedit/index.html). to export a working set, i use the menu file > export > other > export working set : export working sets with mqx plugins note: the anyedit plugins come as well with an import/expert working set wizard. the file format is different, and the mqx plugin allows drag&drop of the file into eclipse. this gives the following dialog where i can specify the file name and the root of projects: export working set dialog this will store the settings in an xml file. importing the working set is done with file > import > other > import working set . tip: i’m using *.wsd extension for working sets. that way i can simply drag&drop the file into eclipse to import it. other kinds of working sets working sets do not stop at projects and files: select working set type i can create working sets of breakpoints or analysis/trace points. or i can create working sets of any resource files or tasks. the possibilities are nearly endless and depend as well on the extra plugins installed. window working sets and now back to the really cool part. one question remains: what are window working sets? window working sets the thing is that every view and dialog has its own working set setting. in my example below i use a working set ‘coldfire’ for the projects view, but my search dialog has a ‘kinetis’ working set configured: two different working sets sometimes i want this, but not always. what i need is a ‘global’ working set. and here the window working sets comes to rescue me. for this i’m going to add some menus and toolbars to make it really easy… for this i choose the menu window > customize perspecti ve. in the command groups i enable ‘window working set’. additionally it is a good idea to enable ‘working set manipulation’ as well: window working set commands the same way i can enable the toolbar and menu visibility. this gives me added tool-bars to switch between working sets and to add/remove things from a working set quickly: working set toolbars in a similar way, it gives me menu access as well: working sets menu and here is the trick: using ‘ window working set ‘ really means ‘ using the global workbench working set ‘. to select the global workbench working set, i use the toolbar icon to switch between window (or workbench) working sets: selecting global window or workbench working set in the individual views i choose to use the window working sets instead a selection of working sets: selected global window or workbench working set now my working set settings are shared and common for all views: if i switch the working set, it will switch for all views where i have set it to ‘window working set’: window working set applied to multiple views that way my working set is the same across views, and switching between different project settings is done with a simple mouse click. summary working sets are an extremely powerful feature to get focus on a subset of things inside eclipse, based on my workflow. as with many great eclipse features, i need to know about it until you really appreciate the power of it. who knows how many other hidden treasures are buried in eclipse i hope this article helps to save you a few mouse clicks. happy work-setting but over time i add more projects, more resources to my project, and at a certain time things get overwhelming. i have a lot of projects, and i do not want to switch between workspace too often.
May 31, 2012
by Erich Styger
· 68,830 Views
article thumbnail
Spring Integration Gateways - Null Handling & Timeouts
Spring Integration (SI) Gateways Spring Integration Gateways () provide a semantically rich interface to message sub-systems. Gateways are specified using namespace constructs, these reference a specific Java interface () that is backed by an object dynamically implemented at run-time by the Spring Integration framework. Furthermore, these Java interfaces can, if you so wish, be defined entirely independent of any Spring artefacts - that's both code and configuration. One of the primary advantages of using the SI gateway as an interface to message sub-systems is that it's possible to automatically adopt the benefit of rich, default and customisable, gateway configuration. One such configuration attribute deserves further scrutiny and discussion primarily because it's easy to misunderstand and misconfigure around - default-reply-timeout. Primary Motivator for Gateway Analysis During recent consulting engagements, I've encountered a number of deployments that use Spring Integration Gateway specifications that may, in some circumstances, lead to production operational instability. This has often been in high-pressure environments or those where technology support is not backed by adequate training, testing, review or technology mentoring. How do gateways behave in Spring Integration (R2.0.5) One of the key sections, regarding gateways, in the Spring Integration manual clearly explains gateway semantics. Below is a 2-dimensional table of possible non-standard gateway returns for each of the scenarios that the SI Manual (r2.0.5) refers to. Gateway Non-standard Responses Runtime Events default-reply-timeout=x Single-threaded default-reply-timeout=x Multi-threaded default-reply-timeout=null Single-threaded default-reply-timeout=null Multi-threaded 1. Long Running Process Thread Parked null returned Thread Parked Thread Parked 2. Null Returned Downstream null returned null returned Thread Parked Thread Parked 3. void method Downstream null returned null returned Thread Parked Thread Parked 4. Runtime Exception Error handler invoked or exception thrown. Error handler invoked or exception thrown. Error handler invoked or exception thrown. Error handler invoked or exception thrown. The key parts of this table are the conditions that lead to invoking threads being parked (noted in red), nulls returned (noted in orange) and exceptions (noted in green). Each contributor consists of configuration that is under the developers control, deployed code that is under developers control and conditions that are usually not under developers control. Clearly, the column headings in the table above are divided into two sections; two gateway configuration attributes. The default-reply-timeout is set by the SI configured and is the amount of time that a client call is wiling to wait for a response from the gateway. Secondly, synchronous flows are represented by Single-threaded flows, asynchronous by Multi-threaded flows. A synchronous, or single-threaded flow, is one such as the following: The implicit input channel (gateway-request-channel) has no associated dispatcher configured. An asynchronous, or multi-threaded flow, is one such as the following: The explicit input channel has a dispatcher configured ("taskExecutor"). This task executor specifies a thread pool that supplies threads for execution and whose configuration as above marks a thread boundary. Note: This is not the only way of making channels asynchronous The other configuration attribute referenced is default-reply-timeout, this is set on the gateway namespace configuration such as the example above. Note that both of these runtime aspects are set by the configurer during SI flow design and implementation. They are entirely under developer control. The 'Runtime Events' column indicates gateway relevant runtime events that have to be considered during gateway configuration - these are obviously not under developer control. Trigger conditions for these events are not as unusual as one may hope. 1. Long Running Processes It's not uncommon for thread pools to become exhausted because all pooled threads are waiting for an external resource accessed through a socket, this may be a long running database query, a firewall keeping a connection open despite the server terminating etc. There is significant potential for these types of trigger. Some long-running processes terminate naturally, sometimes they never completed - an application restart is required. 2. Null returned downstream A null may be returned from a downstream SI construct such as a Transformer, Service Activator or Gateway. A Gateway may return null in some circumstances such as following a gateway timeout event. 3. Void method downstream Any custom code invoked during an SI flow may use a void method signature. This can also be caused by configuration in circumstances where flows are determined dynamically at runtime. 4. Runtime Exception RuntimeException's can be triggered during normal operation and are generally handled by catching them at the gateway or allowing them to propagate through. The reason that they are coloured green in the table above is that they are generally much easier to handle than timeouts. Gateway Timeout Handling Strategies There are four possible outcomes from invoking a gateway with a request message, all of these as a result of specific runtime events: a) an ordinary message response, b) an exception message, c) a null or d) no-response. Ordinary business responses and exceptions are straight forward to understand and will not be covered further in this article. The two significant outcomes that will be explored further are strategies for dealing with nulls and no-response. Generally speaking, long running processes either terminate or not. Long running processes that terminate may eventually return a message through the invoked gateway or timeout depending on timeout configuration, in which case a null may be returned. The severity of this as a problem depends on throughput volume, length of long running process and system resources (thread-pool size). Configuration exists for default-reply-timeout In the case where a long running process event is underway and a default-reply-timeout has been set, as long as the long running process completes before the default-reply-timeout expires, there is no problem to deal with. However, if the long running process does not complete before that timeout expires one of three outcomes will apply. Firstly, if the long running process terminates subsequent to the reply timeout expiry, the gateway will have already returned null to the invoker so the null response needs handling by the invoker. The thread handling the long-running process will be returned to the pool. Secondly, if the long running process does not terminate and a reply timeout has been set, the gateway will return null to the gateway invoker but the thread executing the long-running process will not get returned to the pool. Thirdly, and most significantly, if a default-reply-timeout has been configured but the long running process is running on the same thread as the invoker, i.e. synchronous channels supply messages to that process, the thread will not return, the default-reply-timeout has no affect. Assuming the most common processing scenario, a long running process completes either before or after the reply timeout expiry. When a null is returned by the gateway, the invoker is forced to deal with a null response. It's often unacceptable to force gateway consumers to deal with null responses and is not necessary as with a little additional configuration, this can be avoided. Absent Configuration for default-reply-timeout The most significant danger exists around gateways that have no default-reply-timeout configuration set. A long running process or a null returned from downstream will mean that the invoking thread is parked. This is true for both synchronous and asynchronous flows and may ultimately force an application to be restarted because the invoker thread pool is likely to start on a depletion course if this continues to occur. Spring Integration Timeout Handling Design Strategies For those Spring Integration configuration designers that are comfortable with gateway invokers dealing with null responses, exceptions and set default-reply-timeouts on gateways, there's no need to read further. However, if you wish to provide clients of your gateway a more predictable response, a couple of strategies exist for handling null responses from gateways in order that invokers are protected from having to deal with them. Firstly, the simpliest solution is to wrap the gateway with a service activator. The gateway must have the default-reply-timeout attribute value set in order to avoid unnecessary parking of threads. In order to avoid the consequence of long-running threads it's also very prudent to use a dispatcher soon after entry to the gateway - this breaks the thread boundary. Whilst this is a valid technical approach, the impact is that we have forced a different entry point to our message sub-system. Entry is now via a Service Activator rather than a Gateway. A side affect of this change is that the testing entry point changes. Integration tests that would normally reference a gateway to send a message now have to locate the backing implementation for the Service Activator, not ideal. An alternative approach toward solving this problem would be to configure two gateways with a Service Activator between them. Only one of the gateways would be exposed to invokers, the outer one. Both Gateways would reference the same service interface. The outer gateway specification would not specify the default-reply-timeout but would specify the input and output channels in the same way that a single gateway would. The Service Activator between the Gateways would handle null gateway responses and possibly any exceptions if preferred to the gateway error handler approach. An example is as follows: The Service Activator bean (enrollmentServiceGatewayHandler) deals with both null and exception responses from the adapted gateway (enrollmentServiceAdaptedGateway), in the situation where these are generated a business response detailing the error is generated. Spring Integration R2.1 Changes async-executor on gateway spec
May 26, 2012
by Matt Vickery
· 24,365 Views · 1 Like
article thumbnail
Connection Pooling in a Java Web Application with Tomcat and NetBeans IDE
After my article Connection Pooling in a Java Web Application with Glassfish and NetBeans IDE, here are the instructions for Tomcat. Requirements NetBeans IDE (this tutorial uses NetBeans 7) Tomcat (this tutorial uses Tomcat 7 that is bundled within NetBeans) MySQL database MySQL Java Driver Steps Assuming your MySQL database is ready, connect to it and create a database. Lets call it connpool: mysql> create database connpool; Now we create and populate the table from which we will fetch the data: mysql> use connpool; mysql> create table data(id int(5) not null unique auto_increment, name varchar(255) not null); mysql> insert into data(name) values("Fred Flintstone"), ("Pink Panther"), ("Wayne Cramp"), ("Johnny Bravo"), ("Spongebob Squarepants"); That is it for the database part. We now create our web application. In NetBeans IDE, click File → New Project... Select Java Web → Web Application: Click Next and give the project the name TomPool. Click Next Choose the server as Tomcat and, since we are not going to use any frameworks, click Finish. The project will be created and the start page, index.jsp, opened for us in the IDE. Now we create the connection pooling parameters. In the Projects window, expand configuration files and open "context.xml". You will see that the IDE has added this code for us: Delete the last line: and then add the following to the context.xml file. I have explained the sections along the way. Make sure you edit your MySQL username and password appropriately: Next, expand the Web Pages node, right-click WEB-INF → New → Other → XML → XML Document. Click Next and type web for the File Name. Click next and choose Well-Formed Document then Finish. You will now have the file "web.xml": Delete everything in the file and paste this code: MySQL Test App DB Connection connpool javax.sql.DataSource Container That is it for the connection pool. We now edit our code to make use of it. Edit index.jsp by adding this code just after the initial coments but before Edit the section of the page: Data in my Connection Pooled Database Now, we test the connection pool by running the application: If you want to have the one connection pool used in multiple applications, you need to edit the following two files: 1. /conf/web.xml Just before the closing tag, add the code DB Connection connpool javax.sql.DataSource Container 2. /conf/context.xml Just before the closing tag, add the code Now you can use the pool without editing XML files in each of your applications. Just use the sample code as given in index.jsp That's it folks!
May 23, 2012
by Arthur Buliva
· 70,294 Views · 2 Likes
article thumbnail
The Limited Usefulness of AsyncContext.start()
Some time ago I came across What's the purpose of AsyncContext.start(...) in Servlet 3.0? question. Quoting the Javadoc of aforementioned method: Causes the container to dispatch a thread, possibly from a managed thread pool, to run the specified Runnable. To remind all of you, AsyncContext is a standard way defined in Servlet 3.0 specification to handle HTTP requests asynchronously. Basically HTTP request is no longer tied to an HTTP thread, allowing us to handle it later, possibly using fewer threads. It turned out that the specification provides an API to handle asynchronous threads in a different thread pool out of the box. First we will see how this feature is completely broken and useless in Tomcat and Jetty - and then we will discuss why the usefulness of it is questionable in general. Our test servlet will simply sleep for given amount of time. This is a scalability killer in normal circumstances because even though sleeping servlet is not consuming CPU, but sleeping HTTP thread tied to that particular request consumes memory - and no other incoming request can use that thread. In our test setup I limited the number of HTTP worker threads to 10 which means only 10 concurrent requests are completely blocking the application (it is unresponsive from the outside) even though the application itself is almost completely idle. So clearly sleeping is an enemy of scalability. @WebServlet(urlPatterns = Array("/*")) class SlowServlet extends HttpServlet with Logging { protected override def doGet(req: HttpServletRequest, resp: HttpServletResponse) { logger.info("Request received") val sleepParam = Option(req.getParameter("sleep")) map {_.toLong} TimeUnit.MILLISECONDS.sleep(sleepParam getOrElse 10) logger.info("Request done") } } Benchmarking this code reveals that the average response times are close to sleep parameter as long as the number of concurrent connections is below the number of HTTP threads. Unsurprisingly the response times begin to grow the moment we exceed the HTTP threads count. Eleventh connection has to wait for any other request to finish and release worker thread. When the concurrency level exceeds 100, Tomcat begins to drop connections - too many clients are already queued. So what about the the fancy AsyncContext.start() method (do not confuse with ServletRequest.startAsync())? According to the JavaDoc I can submit any Runnable and the container will use some managed thread pool to handle it. This will help partially as I no longer block HTTP worker threads (but still another thread somewhere in the servlet container is used). Quickly switching to asynchronous servlet: @WebServlet(urlPatterns = Array("/*"), asyncSupported = true) class SlowServlet extends HttpServlet with Logging { protected override def doGet(req: HttpServletRequest, resp: HttpServletResponse) { logger.info("Request received") val asyncContext = req.startAsync() asyncContext.setTimeout(TimeUnit.MINUTES.toMillis(10)) asyncContext.start(new Runnable() { def run() { logger.info("Handling request") val sleepParam = Option(req.getParameter("sleep")) map {_.toLong} TimeUnit.MILLISECONDS.sleep(sleepParam getOrElse 10) logger.info("Request done") asyncContext.complete() } }) } } We are first enabling the asynchronous processing and then simply moving sleep() into a Runnable and hopefully a different thread pool, releasing the HTTP thread pool. Quick stress test reveals slightly unexpected results (here: response times vs. number of concurrent connections): Guess what, the response times are exactly the same as with no asynchronous support at all (!) After closer examination I discovered that when AsyncContext.start() is called Tomcat submits given task back to... HTTP worker thread pool, the same one that is used for all HTTP requests! This basically means that we have released one HTTP thread just to utilize another one milliseconds later (maybe even the same one). There is absolutely no benefit of calling AsyncContext.start() in Tomcat. I have no idea whether this is a bug or a feature. On one hand this is clearly not what the API designers intended. The servlet container was suppose to manage separate, independent thread pool so that HTTP worker thread pool is still usable. I mean, the whole point of asynchronous processing is to escape the HTTP pool. Tomcat pretends to delegate our work to another thread, while it still uses the original worker thread pool. So why I consider this to be a feature? Because Jetty is "broken" in exactly same way... No matter whether this works as designed or is only a poor API implementation, using AsyncContext.start() in Tomcat and Jetty is pointless and only unnecessarily complicates the code. It won't give you anything, the application works exactly the same under high load as if there was no asynchronous logic at all. But what about using this API feature on correct implementations like IBM WAS? It is better, but still the API as is doesn't give us much in terms of scalability. To explain again: the whole point of asynchronous processing is the ability to decouple HTTP request from an underlying thread, preferably by handling several connections using the same thread. AsyncContext.start() will run the provided Runnable in a separate thread pool. Your application is still responsive and can handle ordinary requests while long-running request that you decided to handle asynchronously are processed in a separate thread pool. It is better, unfortunately the thread pool and thread per connection idiom is still a bottle-neck. For the JVM it doesn't matter what type of threads are started - they still occupy memory. So we are no longer blocking HTTP worker threads, but our application is not more scalable in terms of concurrent long-running tasks we can support. In this simple and unrealistic example with sleeping servlet we can actually support thousand of concurrent (waiting) connections using Servlet 3.0 asynchronous support with only one extra thread - and without AsyncContext.start(). Do you know how? Hint: ScheduledExecutorService. Postscriptum: Scala goodness I almost forgot. Even though examples were written in Scala, I haven't used any cool language features yet. Here is one: implicit conversions. Make this available in your scope: implicit def blockToRunnable[T](block: => T) = new Runnable { def run() { block } } And suddenly you can use code block instead of instantiating Runnable manually and explicitly: asyncContext start { logger.info("Handling request") val sleepParam = Option(req.getParameter("sleep")) map { _.toLong} TimeUnit.MILLISECONDS.sleep(sleepParam getOrElse 10) logger.info("Request done") asyncContext.complete() } Sweet!
May 22, 2012
by Tomasz Nurkiewicz
· 17,524 Views · 1 Like
article thumbnail
Spring Integration: Splitter-Aggregator
Within Spring Integration, one form of EIP scatter-gather is provided by the splitter and aggregator constructs.
May 18, 2012
by Matt Vickery
· 47,607 Views · 2 Likes
article thumbnail
Functional Programming on the JVM
Introduction In recent times, many programming languages that run on JVM have emerged. Many of these languages support the concept of writing code in a functional style. Programmers have started realizing the benefits of functional programming and are beginning to rediscover the powerful style of this programming paradigm. The emergence of multiple languages on JVM have only helped to reignite the strong interest in this paradigm. Java at its core is an imperative programming language. However in recent past many new languages like Scala, Clojure, Groovy etc. have become popular which supports functional programming style and yet run on JVM. However none of these languages can be considered as pure functional language since all of them allow Java code to be called from within them and Java on its own is not a functional language. Still they have different degree of support for writing code in functional style and have their own benefits. Functional programming requires different kind of thinking and has its own advantages as compared to imperative programming. It seems that Java has also realized functional programming advantages and is slowly inching towards it. First sign of this can be seen in the form of Lambda Expressions that will be supported in Java 8. Although it's too early to comment on this as the draft for Java 8 is still under review and is expected to be released next year, but it does show that Java has plans of supporting functional programming style going forward. In this article we will first discuss what functional programming is and how it is different from imperative programming. Later we will see where does each of the above mentioned Java based programming languages i.e. Scala, Clojure and Groovy fits in the world of functional programming and what each of them has to offer. And at the last we will sneak-peak into Java 8's lambda expressions. Why Functional Programming? Computers of current era are shipped with multicore processors. Going forward the number of processors in a machine is only going to increase. The code we write today and tomorrow will probably never run on a single processor system. In order to get best out of this, software must be designed to make more and more use of concurrency and hence keep all available processors busy. Java does provide concurrency concepts like threads, synchronization, locks etc. to execute code in parallel. But shared memory multi-threading approach in Java causes more trouble than solving the problem. Java based functional programming languages like Scala, Clojure, Groovy etc. looks into these problems with a different angle and provides less complex and less error-prone solutions as compared to imperative programming. They provide immutability concepts out of the box and hence eliminate need of synchronization and associated risk of deadlocks or livelocks. Concepts like Actors, Agents and DataFlow variables provide high level concurrency abstraction and makes very easy to write concurrent programs. What is Functional Programming? Functional Programming is a concept which treats functions as first class citizens. At the core of functional programming is immutability. It emphasizes on application of functions in contrast to imperative programming style which emphasizes on change in state. Functional programming has no side effects whereas programming in imperative style can result in side-effects. Let's elaborate more on each of these characteristics to understand the concept behind functional programming. Immutable state - The state of an object doesn't change and hence need not be protected or synchronized. That might sound a bit awkward at first, since if nothing changes, one might think that we are not writing a useful program. However that's not what immutable state means. In functional programming, change in state occurs via series of transformations which keeps the object immutable and yet achieves change in state. Functions as first class citizens - There was a major shift in the way programs were written when Object oriented concepts came into picture. Everything was conceptualized as object and any action to be performed was treated as method call on objects. Hence there is a series of method calls executed on objects to get the desired work done. In functional programming world, it's more about thinking in terms of communication chain between functions than method calls on objects. This makes functions as first class citizens of functional programming since everything is modelled around functions. Higher-order functions - Functions in functional programming are higher order functions since following actions can be performed with them. 1. Functions can be passed within functions as arguments. 2. Functions can be created within functions just as objects can be created in functions 3. Functions can be returned from functions Functions with no side-effects - In functional programming, function execution has no side-effects. In other words a function code will always return same result for same argument when called multiple times. It doesn't change anything outside its boundaries and is also not affected by any external change outside it's boundary. It doesn't change input value and can only produce new output. However once the output has been produced and returned by function, it also becomes immutable and cannot be modified by any other function. In other words, they support referential transparency i.e. if a function takes an input and returns some output, multiple invocation of that function at different point of time will always return same output as long as input remains same. This is one of the main motivations behind using functional language as it makes easy to understand and predict behaviour of program. Characteristics like immutability and no side-effects are extremely helpful while writing multi-threaded code and developers need not to worry for synchronizing the state. Hence functional code is very easy to distribute across multiple cores as they don't have any side effects. JVM based Functional Programming Languages There are many JVM based languages which supports functional programming paradigm. However I intend to limit discussion around following. Scala Clojure Groovy Lambda Expressions in Java 8 Lambda Expressions is not a programming language but a feature that will be supported in Java8. The reason for including it in this article is to emphasize on the fact that going forward Java will also support writing code in functional style. Scala Scala is a statically typed multi-paradigm programming language designed to integrate features of object oriented programming and functional programming. Since it is static, one cannot change class definition at run time i.e. one cannot add new methods or variables at run-time. However Scala does provide functional programming concepts i.e. immutability, higher-order functions, nested functions etc. Apart from supporting Java's concurrency model, it also provides concept of Actor model out of the box for event based asynchronous message passing between objects. The code written in Scala gets compiled into very efficient bytecode which can then be executed on JVM. Creating immutable list in Scala is very simple and doesn't require any extra effort. "val" keyword does the trick. val numbers = List(1,2,3,4) Functions can be passed as arguments. Let's see this with an example. Suppose we have a list of 10 numbers and we want to calculate sum of all the numbers in list. val numbers = List(1,2,3,4,5,6,7,8,9,10) val total = numbers.foldLeft(0){(a,b) => a+b } As can be seen in above example, we are passing a function to add two variables "a" and "b" to another function "foldLeft" which is provided by Scala library on collections. We have also not used any iteration logic and temporary variable to calculate the sum. "foldLeft" method eliminates the need to maintain state in temporary variable which would have otherwise be required if we were to write this code in pure Java way (as mentioned below). int total = 0; for(int number in numbers){ total+=number; } Scala function can easily be executed in parallel without any need for synchronization since it does not mutate state. This was just a small example to showcase the power of Scala as functional programming language. There are whole lot of features available in Scala to write code in functional style. Clojure Clojure is a dynamic language with an excellent support for writing code in functional style. It is a dialect of "lisp" programming language with an efficient and robust infrastructure for multithreaded programming. Clojure is predominantly a functional programming language, and features a rich set of immutable, persistent data structures. When mutable state is needed, Clojure offers a software transactional memory system and reactive Agent system that ensure clean, correct multithreaded designs. Apart from this since Clojure is a dynamic language, it allows to modify class definition at run time by adding new methods or modifying existing one at run time. This makes it different from Scala which is a statically typed language. Immutability is in the root of Clojure. To create immutable list just following needs to be done. By default list in Clojure is immutable, so does not require any extra effort. (def numbers (list 1 2 3 4 5 6 7 8 9 10)) To add numbers without maintaining state, reduce function can be used as mentioned below (reduce + 0 '(1 2 3 4 5 6 7 8 9 10)) As can be seen, adding list of numbers just requires one line of code without mutating any state. This is the beauty about functional programming languages and plays an important role for parallel execution. Groovy Groovy is again a dynamic language with some support for functional programming. Amongst the 3 languages, Groovy can be considered weakest in terms of functional programming features. However because of it's dynamic nature and close resemblance to Java, it has been widely accepted and considered good alternative to Java. Groovy does not provide immutable objects out of the box but has excellent support for higher order functions. Immutable objects can be created with annotation @Immutation, but it's far less flexible than immutablity support in Scala and Clojure. In Groovy functions can be passed around just as any other variable in the form of Closures. The same example in Groovy can be written as follows def numbers = [1,2,3,4,5,6,7,8,9,10] def total = numbers.inject(0){a,b -> a+b } However the point to be noted is that variables "numbers" and "total" are not immutable and can be modified at any point of time. Hence writing multithreaded code can be a bit challenging. But Groovy does provide the concept of Actors, Agents and DataFlow variables via library called GPars(Groovy Parallel System) which reduces the challenges associated with multithreaded code to a greater extent. Java8 Lambda Expression Java has finally realized the power of writing code in functional style and is going to support the concept of closures starting from Java8. JSR 335 - Lambda Expressions for the JavaTM Programming Language aims to support programming in a multicore environment by adding closures and related features to the Java language. So it will finally be possible to pass around functions similar to variables in pure Java code. Currently if someone wants to try out and play around lambda expressions, Project Lambda of OpenJDK provides prototype implementation of JSR-335. Following code snippet should run fine with OpenJDK Project Lambda compiler. ExecutorService executor = Executors.newCachedThreadPool(); executor.submit(() -> {System.out.println("I am running")}) As can be seen above, a closure(function) has been passed to executor's submit method. It does not take any argument and hence empty brackets () have been placed. This function just prints "I am running" when executed. Just as we can pass functions to function, it will also be possible to create closure within functions and return closure from function. I would recommend to try out OpenJDK to get a feel of lambda expressions which is going to be part of Java8 Conclusion So this was all about functional programming, it's concepts, benefits and options available on JVM to write function code. Functional programming requires a different mind-set and can be very useful if used correctly. Functional Programming along with Object Oriented Programming can be a jewel in crown. As discussed there are various options available to write code in functional style that can be executed on JVM. Choice depends on various factors and there is no one language that can be considered best in all aspects. However one thing is for sure, going forward we are going to see more and more usage of functional programming.
May 14, 2012
by Gagan Agrawal
· 29,371 Views
article thumbnail
Basic REST service in Apache CXF vs. Camel-CXF
This article demonstrates how to create/test a basic REST service in CXF vs. Camel-CXF. Given the range of configuration and deployment options, I'm focusing on building a basic OSGi bundle that can be deployed in Fuse 4.2 (ServiceMix)...basic knowledge of Maven, ServiceMix and Camel are assumed. Apache CXF For more details, see http://cxf.apache.org/docs/jax-rs.html. Here is an overview of the steps to get a basic example running... 1. add dependencies to your pom.xml org.apache.cxf cxf-rt-frontend-jaxrs 2.3.0 2. setup the bundle-context.xml file 3. create a service bean class @Path("/example") public class ExampleBean { @GET @Path("/") public String ping() throws Exception { return "SUCCESS"; } } 4. deploy and test build the bundle using "mvn install" start servicemix deploy the bundle open a browser to "http://localhost:9000/example" (should see "SUCCESS") Camel-CXF For details, see http://camel.apache.org/cxfrs.html. Here is an overview of the steps to get a basic example running... 1. add dependencies to your pom.xml org.apache.camel camel-core ${camel.version} org.apache.camel camel-cxf ${camel.version} 2. setup the bundle-context.xml file com.example 3. create a RouteBuilder class public class ExampleRouter extends RouteBuilder { @Override public void configure() throws Exception { from("cxfrs://http://localhost:9000?resourceClasses=" + ExampleResource.class.getName()) .process(new Processor() { public void process(Exchange exchange) throws Exception { //custom processing here } }) .setBody(constant("SUCCESS")); } } 4. create a REST Resource class @Path("/example") public class ExampleResource { @GET public void ping() { //strangely, this method is not called, only serves to configure the endpoint } } 5. deploy and test build bundle using "mvn install" start servicemix deploy the bundle open a browser to "http://localhost:9000/example" (should see "SUCCESS") Unit Testing To perform basic unit testing for either of these approaches, use the Apache HttpClient APIs by first adding this dependency to your pom.xml... org.apache.httpcomponents httpclient 4.0.1 Then, you can use these APIs to create a basic test to validate the REST services created above... String url = "http://localhost:9000/example"; HttpGet httpGet = new HttpGet(url); HttpClient httpclient = new DefaultHttpClient(); HttpResponse response = httpclient.execute(httpGet); String responseMessage = EntityUtils.toString(response.getEntity()); assertEquals("SUCCESS", responseMessage); assertEquals(200, response.getStatusLine().getStatusCode()); Summary Overall, the approaches are very similar, but you can use various combinations of Spring XML and Java APIs to set this up. I focused on a common approach to demonstrate the basics of each approach side-by-side. That being said, if you have requirements for complex REST services (security, interceptors, filters, etc), I recommend grabbing a copy of Apache CXF Web Service Development and following some of the more complex examples on the Apache CXF, Camel-CXFRS pages. In practice, I've generally used Camel-CXF because it gives you the flexibility of integrating with other Camel components and allows you to leverage the rich routing features of Camel. I hope to cover more complex scenarios in future posts...
May 14, 2012
by Ben O'Day
· 33,554 Views · 2 Likes
article thumbnail
EasyNetQ, a simple .NET API for RabbitMQ
After pondering the results of our message queue shootout, we decided to run with Rabbit MQ. Rabbit ticks all of the boxes, it’s supported (by Spring Source and then VMware ultimately), scales and has the features and performance we need. The RabbitMQ.Client provided by Spring Source is a thin wrapper that quite faithfully exposes the AMQP protocol, so it expects messages as byte arrays. For the shootout tests spraying byte arrays around was fine, but in the real world, we want our messages to be .NET types. I also wanted to provide developers with a very simple API that abstracted away the Exchange/Binding/Queue model of AMQP and instead provides a simple publish/subscribe and request/response model. My inspiration was the excellent work done by Dru Sellers and Chris Patterson with MassTransit (the new V2.0 beta is just out). The code is on GitHub here: https://github.com/mikehadlow/EasyNetQ The API centres around an IBus interface that looks like this: /// /// Provides a simple Publish/Subscribe and Request/Response API for a message bus. /// public interface IBus : IDisposable { /// /// Publishes a message. /// /// The message type /// The message to publish void Publish(T message); /// /// Subscribes to a stream of messages that match a .NET type. /// /// The type to subscribe to /// /// A unique identifier for the subscription. Two subscriptions with the same subscriptionId /// and type will get messages delivered in turn. This is useful if you want multiple subscribers /// to load balance a subscription in a round-robin fashion. /// /// /// The action to run when a message arrives. /// void Subscribe(string subscriptionId, Action onMessage); /// /// Makes an RPC style asynchronous request. /// /// The request type. /// The response type. /// The request message. /// The action to run when the response is received. void Request(TRequest request, Action onResponse); /// /// Responds to an RPC request. /// /// The request type. /// The response type. /// /// A function to run when the request is received. It should return the response. /// void Respond(Func responder); } To create a bus, just use a RabbitHutch, sorry I couldn’t resist it :) var bus = RabbitHutch.CreateRabbitBus("localhost"); You can just pass in the name of the server to use the default Rabbit virtual host ‘/’, or you can specify a named virtual host like this: var bus = RabbitHutch.CreateRabbitBus("localhost/myVirtualHost"); The first messaging pattern I wanted to support was publish/subscribe. Once you’ve got a bus instance, you can publish a message like this: var message = new MyMessage {Text = "Hello!"}; bus.Publish(message); This publishes the message to an exchange named by the message type. You subscribe to a message like this: bus.Subscribe("test", message => Console.WriteLine(message.Text)); This creates a queue named ‘test_’ and binds it to the message type’s exchange. When a message is received it is passed to the Action delegate. If there are more than one subscribers to the same message type named ‘test’, Rabbit will hand out the messages in a round-robin fashion, so you get simple load balancing out of the box. Subscribers to the same message type, but with different names will each get a copy of the message, as you’d expect. The second messaging pattern is an asynchronous RPC. You can call a remote service like this: var request = new TestRequestMessage {Text = "Hello from the client! "}; bus.Request(request, response => Console.WriteLine("Got response: '{0}'", response.Text)); This first creates a new temporary queue for the TestResponseMessage. It then publishes the TestRequestMessage with a return address to the temporary queue. When the TestResponseMessage is received, it passes it to the Action delegate. RabbitMQ happily creates temporary queues and provides a return address header, so this was very easy to implement. To write an RPC server. Simple use the Respond method like this: bus.Respond(request => new TestResponseMessage { Text = request.Text + " all done!" }); This creates a subscription for the TestRequestMessage. When a message is received, the Func delegate is passed the request and returns the response. The response message is then published to the temporary client queue. Once again, scaling RPC servers is simply a question of running up new instances. Rabbit will automatically distribute messages to them. The features of AMQP (and Rabbit) make creating this kind of API a breeze. Check it out and let me know what you think.
May 13, 2012
by Mike Hadlow
· 11,259 Views
article thumbnail
Eclipse Global Preferences
rate this eclipse is good, but like any other tool: it gets better after i have it customized for my special needs. eclipse stores a lot of settings in the workspace, see my article about copy my workspace settings . but is there a way to apply some settings to every workspace? at least to the new ones? because importing/exporting the settings can get really tedious as i have many workspace. and indeed, there are global settings in eclipse. and i want to have them changed… warning: changing eclipse global preferences might break an eclipse installation. so better have a backup of the changed files at hand! i’m using here the eclipse based codewarrior for mcu10.2 , but things are pretty much the same for any eclipse based product (see the documentation in defining your own global preferences ). question: where are the global preferences stored? the first thing to check is the eclipse\configuration\.settings folder: here some plugins store their global preferences. for example: the recent workspace settings are in org.eclipse.ui.ide.prefs. #fri apr 06 16:46:14 cest 2012 recent_workspaces_protocol=3 max_recent_workspaces=10 show_workspace_selection_dialog=true eclipse.preferences.version=1 recent_workspaces=c\:\\tmp\\wsp_test\nc\:\\tmp\\wsp_10.2 but what about all the other settings? looking at the codewarrior installation, inside the eclipse folder, i find the cwide.ini file. cwide.ini file this file defines the eclipse startup options for launching the ide (cwide.exe for codewarrior). the interesting part is this line: -declipse.plugincustomization=cwide.properties this tells eclipse to use the cwide.properties as a default configuration file. if i inspect that file, it has the following content: org.eclipse.debug.ui/org.eclipse.debug.ui.switch_perspective_on_suspend=always org.eclipse.debug.ui/org.eclipse.debug.ui.switch_to_perspective=always org.eclipse.ui.editors/spellingengine=org.eclipse.cdt.internal.ui.text.spelling.cspellingengine ok, that gives me an idea how settings could look like. but the question is: how to know the settings and syntax? what works (most of the time) is following approach: launch eclipse with a new workspace export the settings using file > export > general > preferences to a file change the setting in window > preferences export the settings using file > export > general > preferences to a different file compare/inspect the exported information and find the settings apply the settings to the cwide.properties file, without the /instance/ part restart the ide and check if it works with a new workspace the last check is necessary as not all settings might work that way, see this forum post . this is maybe best illustrated with an example. i have configured my workspace to use 2 for tab width and to insert spaces for tabs: changed preferences for tabs if i compare the two exported .epf files, this gives me: diffing eclipse preference files that means the two following lines are configuring what i have changed: /instance/org.eclipse.ui.editors/tabwidth=2 /instance/org.eclipse.ui.editors/spacesfortabs=true for the cwide.properties file i need to cut off the /instance/ part, so i have this added to the cwide.properties : # set tab width to 2 org.eclipse.ui.editors/tabwidth=2 # using spaces for tabs org.eclipse.ui.editors/spacesfortabs=true note: preferences are applied in following order: global preferences, then local (workspace) preferences this does not overwrite an existing setting of my workspace. as i can see from above diff, my initial workspace settings do not have any settings for tabwidth and spacesfortabs. creating a new workspace use and apply my new settings. but once i have the them, they will not be overwritten with new global ones. which makes sense: the local settings are winning. note: post a comment if you know an elegant way how to enforce/overwrite workspace settings with global ones.
May 12, 2012
by Erich Styger
· 18,514 Views · 1 Like
article thumbnail
Apache Camel Tutorial—EIP, Routes, Components, Testing, and More
Learn how Apache Camel implements the EIPs and offers a standardized, internal domain-specific language (DSL) to integrate applications.
May 7, 2012
by Kai Wähner DZone Core CORE
· 135,257 Views · 4 Likes
article thumbnail
Protect a REST Service Using HMAC (Play 2.0)
HMCA is a great tool for protecting a REST service. Read why.
May 7, 2012
by Jos Dirksen
· 32,284 Views
article thumbnail
Java Thread Deadlock: A Case Study
This article will describe the complete root cause analysis of a recent Java deadlock problem observed from a Weblogic 11g production system running on the IBM JVM 1.6.This case study will also demonstrate the importance of mastering Thread Dump analysis skills; including for the IBM JVM Thread Dump format. Environment specification Java EE server: Oracle Weblogic Server 11g & Spring 2.0 OS: AIX 5.3 Java VM: IBM JRE 1.6.0 Platform type: Portal & ordering application Monitoring and troubleshooting tools JVM Thread Dump (IBM JVM format) Compuware Server Vantage (Weblogic JMX monitoring & alerting) Problem overview A major stuck Threads problem was observed & reported from Compuware Server Vantage and affecting 2 of our Weblogic 11g production managed servers causing application impact and timeout conditions from our end users. Gathering and validation of facts As usual, a Java EE problem investigation requires gathering of technical and non-technical facts so we can either derived other facts and/or conclude on the root cause. Before applying a corrective measure, the facts below were verified in order to conclude on the root cause: · What is the client impact? MEDIUM (only 2 managed servers / JVM affected out of 16) · Recent change of the affected platform? Yes (new JMS related asynchronous component) · Any recent traffic increase to the affected platform? No · How does this problem manifest itself? A sudden increase of Threads was observed leading to rapid Thread depletion · Did a Weblogic managed server restart resolve the problem? Yes, but problem is returning after few hours (unpredictable & intermittent pattern) - Conclusion #1: The problem is related to an intermittent stuck Threads behaviour affecting only a few Weblogic managed servers at the time - Conclusion #2: Since problem is intermittent, a global root cause such as a non-responsive downstream system is not likely Thread Dump analysis – first pass The first thing to do when dealing with stuck Thread problems is to generate a JVM Thread Dump. This is a golden rule regardless of your environment specifications & problem context. A JVM Thread Dump snapshot provides you with crucial information about the active Threads and what type of processing / tasks they are performing at that time. Now back to our case study, an IBM JVM Thread Dump (javacore.xyz format) was generated which did reveal the following Java Thread deadlock condition below: 1LKDEADLOCK Deadlock detected !!! NULL --------------------- NULL 2LKDEADLOCKTHR Thread "[STUCK] ExecuteThread: '8' for queue: 'weblogic.kernel.Default (self-tuning)'" (0x000000012CC08B00) 3LKDEADLOCKWTR is waiting for: 4LKDEADLOCKMON sys_mon_t:0x0000000126171DF8 infl_mon_t: 0x0000000126171E38: 4LKDEADLOCKOBJ weblogic/jms/frontend/FESession@0x07000000198048C0/0x07000000198048D8: 3LKDEADLOCKOWN which is owned by: 2LKDEADLOCKTHR Thread "[STUCK] ExecuteThread: '10' for queue: 'weblogic.kernel.Default (self-tuning)'" (0x000000012E560500) 3LKDEADLOCKWTR which is waiting for: 4LKDEADLOCKMON sys_mon_t:0x000000012884CD60 infl_mon_t: 0x000000012884CDA0: 4LKDEADLOCKOBJ weblogic/jms/frontend/FEConnection@0x0700000019822F08/0x0700000019822F20: 3LKDEADLOCKOWN which is owned by: 2LKDEADLOCKTHR Thread "[STUCK] ExecuteThread: '8' for queue: 'weblogic.kernel.Default (self-tuning)'" (0x000000012CC08B00) This deadlock situation can be translated as per below: - Weblogic Thread #8 is waiting to acquire an Object monitor lock owned by Weblogic Thread #10 - Weblogic Thread #10 is waiting to acquire an Object monitor lock owned by Weblogic Thread #8 Conclusion: both Weblogic Threads #8 & #10 are waiting on each other; forever! Now before going any deeper in this root cause analysis, let me provide you a high level overview on Java Thread deadlocks. Java Thread deadlock overview Most of you are probably familiar with Java Thread deadlock principles but did you really experience a true deadlock problem? From my experience, true Java deadlocks are rare and I have only seen ~5 occurrences over the last 10 years. The reason is that most stuck Threads related problems are due to Thread hanging conditions (waiting on remote IO call etc.) but not involved in a true deadlock condition with other Thread(s). A Java Thread deadlock is a situation for example where Thread A is waiting to acquire an Object monitor lock held by Thread B which is itself waiting to acquire an Object monitor lock held by Thread A. Both these Threads will wait for each other forever. This situation can be visualized as per below diagram: Thread deadlock is confirmed…now what can you do? Once the deadlock is confirmed (most JVM Thread Dump implementations will highlight it for you), the next step is to perform a deeper dive analysis by reviewing each Thread involved in the deadlock situation along with their current task & wait condition.Find below the partial Thread Stack Trace from our problem case for each Thread involved in the deadlock condition: ** Please note that the real application Java package name was renamed for confidentiality purposes ** Weblogic Thread #8 "[STUCK] ExecuteThread: '8' for queue: 'weblogic.kernel.Default (self-tuning)'" J9VMThread:0x000000012CC08B00, j9thread_t:0x00000001299E5100, java/lang/Thread:0x070000001D72EE00, state:B, prio=1 (native thread ID:0x111200F, native priority:0x1, native policy:UNKNOWN) Java callstack: at weblogic/jms/frontend/FEConnection.stop(FEConnection.java:671(Compiled Code)) at weblogic/jms/frontend/FEConnection.invoke(FEConnection.java:1685(Compiled Code)) at weblogic/messaging/dispatcher/Request.wrappedFiniteStateMachine(Request.java:961(Compiled Code)) at weblogic/messaging/dispatcher/DispatcherImpl.syncRequest(DispatcherImpl.java:184(Compiled Code)) at weblogic/messaging/dispatcher/DispatcherImpl.dispatchSync(DispatcherImpl.java:212(Compiled Code)) at weblogic/jms/dispatcher/DispatcherAdapter.dispatchSync(DispatcherAdapter.java:43(Compiled Code)) at weblogic/jms/client/JMSConnection.stop(JMSConnection.java:863(Compiled Code)) at weblogic/jms/client/WLConnectionImpl.stop(WLConnectionImpl.java:843) at org/springframework/jms/connection/SingleConnectionFactory.closeConnection(SingleConnectionFactory.java:342) at org/springframework/jms/connection/SingleConnectionFactory.resetConnection(SingleConnectionFactory.java:296) at org/app/JMSReceiver.receive() …………………………………………………………………… Weblogic Thread #10 "[STUCK] ExecuteThread: '10' for queue: 'weblogic.kernel.Default (self-tuning)'" J9VMThread:0x000000012E560500, j9thread_t:0x000000012E35BCE0, java/lang/Thread:0x070000001ECA9200, state:B, prio=1 (native thread ID:0x4FA027, native priority:0x1, native policy:UNKNOWN) Java callstack: at weblogic/jms/frontend/FEConnection.getPeerVersion(FEConnection.java:1381(Compiled Code)) at weblogic/jms/frontend/FESession.setUpBackEndSession(FESession.java:755(Compiled Code)) at weblogic/jms/frontend/FESession.consumerCreate(FESession.java:1025(Compiled Code)) at weblogic/jms/frontend/FESession.invoke(FESession.java:2995(Compiled Code)) at weblogic/messaging/dispatcher/Request.wrappedFiniteStateMachine(Request.java:961(Compiled Code)) at weblogic/messaging/dispatcher/DispatcherImpl.syncRequest(DispatcherImpl.java:184(Compiled Code)) at weblogic/messaging/dispatcher/DispatcherImpl.dispatchSync(DispatcherImpl.java:212(Compiled Code)) at weblogic/jms/dispatcher/DispatcherAdapter.dispatchSync(DispatcherAdapter.java:43(Compiled Code)) at weblogic/jms/client/JMSSession.consumerCreate(JMSSession.java:2982(Compiled Code)) at weblogic/jms/client/JMSSession.setupConsumer(JMSSession.java:2749(Compiled Code)) at weblogic/jms/client/JMSSession.createConsumer(JMSSession.java:2691(Compiled Code)) at weblogic/jms/client/JMSSession.createReceiver(JMSSession.java:2596(Compiled Code)) at weblogic/jms/client/WLSessionImpl.createReceiver(WLSessionImpl.java:991(Compiled Code)) at org/springframework/jms/core/JmsTemplate102.createConsumer(JmsTemplate102.java:204(Compiled Code)) at org/springframework/jms/core/JmsTemplate.doReceive(JmsTemplate.java:676(Compiled Code)) at org/springframework/jms/core/JmsTemplate$10.doInJms(JmsTemplate.java:652(Compiled Code)) at org/springframework/jms/core/JmsTemplate.execute(JmsTemplate.java:412(Compiled Code)) at org/springframework/jms/core/JmsTemplate.receiveSelected(JmsTemplate.java:650(Compiled Code)) at org/springframework/jms/core/JmsTemplate.receiveSelected(JmsTemplate.java:641(Compiled Code)) at org/app/JMSReceiver.receive() …………………………………………………………… As you can see in the above Thread Strack Traces, such deadlock did originate from our application code which is using the Spring framework API for the JMS consumer implementation (very useful when not using MDB’s). The Stack Traces are quite interesting and revealing that both Threads are in a race condition against the same Weblogic JMS consumer session / connection and leading to a deadlock situation: - Weblogic Thread #8 is attempting to reset and close the current JMS connection - Weblogic Thread #10 is attempting to use the same JMS Connection / Session in order to create a new JMS consumer - Thread deadlock is triggered! Root cause: non Thread safe Spring JMS SingleConnectionFactory implementation A code review and a quick research from Spring JIRA bug database did reveal the following Thread safe defect below with a perfect correlation with the above analysis: # SingleConnectionFactory's resetConnection is causing deadlocks with underlying OracleAQ's JMS connection https://jira.springsource.org/browse/SPR-5987 A patch for Spring SingleConnectionFactory was released back in 2009 which did involve adding proper synchronized{} block in order to prevent Thread deadlock in the event of a JMS Connection reset operation: synchronized (connectionMonitor) { //if condition added to avoid possible deadlocks when trying to reset the target connection if (!started) { this.target.start(); started = true; } } Solution Our team is currently planning to integrate this Spring patch in to our production environment shortly. The initial tests performed in our test environment are positive. Conclusion I hope this case study has helped understand a real-life Java Thread deadlock problem and how proper Thread Dump analysis skills can allow you to quickly pinpoint the root cause of stuck Thread related problems at the code level. Please don’t hesitate to post any comment or question.
May 6, 2012
by Pierre - Hugues Charbonneau
· 14,884 Views
article thumbnail
6 Types of Monitoring
When you manage and develop infrastructure, you'll work with tests and monitoring solutions that ensure the quality of your end product (code or infrastructure). For code quality, you can have unit tests, functional tests, and integration tests etc. Similarly, you might have system monitoring, dependancy monnitoring, application monitoring, BAM, CEP, etc. In this post I'll narrate few of them: System monitoring : Watches CPU load, free memory (RAM), disk space etc. SNMP based hardware monitoring, etc. Dependency monitoring : Checks web server processes, web server states , %CPU consumption, RSS, etc. Integration : Tracks third party or other integration points whether they are available or not. BAM : Business activity monitoring. Records KPI or key performance indicators, which will in turn define the state of your business (quantitatively). This could include sucessful transactions per day or month. Process instrumentation or tracing : Includes kprobes, system tap, or other tracing like methodologies like DTrace, which lets you monitor at the individual method level. These are predominantly used for language or other interpreter optimizations. Complex event processing : Though not directly related, some of these monitoring solutions can or should use some form of complex event processing to deduce meaningful information. This is only important (or even significant) if the volume of data is large. Depending upon your problem you should employ one of more of these solutions. There are plenty of open source solutions/tooling available for all of them. BAM is kinda tricky, not BAM in itself, but defining the KPI part is bit trippy.
May 5, 2012
by Ranjib Dey
· 56,027 Views
article thumbnail
Spring Integration - Payload Storage via Claim-check
Continuing on the theme of temporary storage for transient messages used within Spring Integration flows, the claim-check model offers configurable storage for message payloads. The advantage in using this Enterprise Integration pattern, compared against header enrichment, is that objects don't have to be packed into the header using a Header Enrichment technique. They can be stored in a local Java Map, an IMDB, cache or anything else that be used to hold data. Several advantages using this approach are evident. Firstly, performance and efficiency. When using header enrichment, if message payloads need to be managed outside of the JVM that generates the enriched message header, the object will not be available unless it's serialised and transported around the distributed application. This could be costly in terms of performance and transport efficiency. The key factor here is the frequency of remote dispatch and the size of the header object. In specific circumstances the claim-check pattern may offer an advantage here, objects can be serialised and/or transformed into a storage specific format and stored internally in memory or externally in a data store. Secondly, accessibility. It's conceivable that message payloads undergoing claim-check processing may need to be accessed by third party applications that are unable to receive Spring Integration messages. The claim-check pattern allows this type of processing to take place. Thirdly, resiliency is offered. A data store can be chosen that guarantees persistence for messages in order that they can be recovered following failure. The following code details how the claim-check pattern can be used: The gateway used is specified as the following Java class: package com.l8mdv.sample; import org.springframework.integration.Message; import org.springframework.integration.annotation.Gateway; public interface ClaimCheckGateway { public static final String CLAIM_CHECK_ID = "CLAIM_CHECK_ID"; @Gateway (requestChannel = "claim-check-in-channel") public Message send(Message message); } Lastly, this can all be tested by using the following JUnit test case: package com.l8mdv.sample; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.integration.Message; import org.springframework.integration.support.MessageBuilder; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import static com.l8mdv.sample.ClaimCheckGateway.CLAIM_CHECK_ID; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration( locations = {"classpath:META-INF/spring/claim-check.xml"} ) public class ClaimCheckIntegrationTest { @Autowired ClaimCheckGateway claimCheckGateway; @Test public void locatePayloadInHeader() { String payload = "Sample test message."; Message message = MessageBuilder.withPayload(payload).build(); Message response = claimCheckGateway.send(message); Assert.assertTrue(response.getPayload().equals(payload)); Assert.assertTrue(response.getHeaders().get(CLAIM_CHECK_ID) != null); } }
May 4, 2012
by Matt Vickery
· 14,012 Views
article thumbnail
10 Best Eclipse Shortcuts
Looking for the best Eclipse shortcuts? Here are the top 10.
April 28, 2012
by Erich Styger
· 126,587 Views
article thumbnail
Bridging between JMS and RabbitMQ (AMQP) using Spring Integration
An old customer recently asked me if I had a solution for how to integrate between their existing JMS infrastructure on Websphere MQ with RabbitMQ. Although I know that RabbitMQ has the shovel plugin which can bridge between Rabbit instances I've yet not found a good plugin for JMS <-> AMQP forwarding. The first thing that came to my mind was to utilize a Spring Integration mediation as SI has excellent support for both JMS and Rabbit. Curious as I am I started a PoC and this is the result. It takes messages of a JMS queue and forwards to an AMQP exchange that is bound to a queue the consumer application is supposed to listen to. I used an external HornetQ instance in JBoss 6.1 as the JMS Provider, but I am 100% secure that the same setup would work for Websphere MQ as they both implement JMS. Be aware that I've done no performance tweaking or QoS setup yet as this is just a proof-of-concept. For a real setup you'd probably have to think about delivery guarantees versus performance and etc... The code will be available at a GitHub repository near you soon.. SpringContext in XML: org.jnp.interfaces.NamingContextFactory jnp://localhost:1099 org.jnp.interfaces:org.jboss.naming ConnectionFactory Maven POM: 4.0.0 org.rl si.jmstorabbit 0.0.1-SNAPSHOT jar si.jmstorabbit http://maven.apache.org UTF-8 2.2.5.Final 2.1.0.RELEASE springsource-release http://repository.springsource.com/maven/bundles/release false springsource-external http://repository.springsource.com/maven/bundles/external false org.springframework.integration spring-integration-core ${spring.integration.version} org.springframework.integration spring-integration-file ${spring.integration.version} org.springframework.integration spring-integration-amqp ${spring.integration.version} org.springframework.integration spring-integration-jms ${spring.integration.version} junit junit 3.8.1 test org.springframework spring-context 3.0.7.RELEASE jboss jnp-client 4.2.2.GA org.hornetq hornetq-core-client ${hornet.version} org.hornetq hornetq-jms-client ${hornet.version} org.hornetq hornetq-jms ${hornet.version} jboss jboss-common-client 3.2.3 org.jboss.netty netty 3.2.7.Final javax.jms jms 1.1
April 24, 2012
by Billy Sjöberg
· 30,045 Views
article thumbnail
Migrating From JMS to AMQP: RabbitMQ, Spring, Apache Camel, and Apache Qpid
As you know I'm open-sourcing and completely overhauling my PhD system. One of my goals was to replace internal JMS queues with AMQP. Today I'll show you how I did it and why I was forced to change RabbitMQ to Apache Qpid. AMQP In short. AMQP is an open standard application layer protocol for message-oriented middleware. The most important feature is that AMQP is a wire-level protocol and is interoperable by design. JMS is just an API. Altough JMS brokers can be used in .NET applications (see my post: ActiveMQ and .NET combined!), the whole JMS specification does not guarantee interoperability. Also, the AMQP standard is by design more flexible and powerful (e.g., supports two-way communication by design) - they simply learnt from JMS mistakes :). Oh, forgot to mention. The AMQP was originally developed by banks :) so I don't have to say that AMQP is secure, fault-tolerant, and so on. RabbitMQ RabbitMQ is the most mature AMQP broker. RabbitMQ is written in Erlang so you have to download that first (RabbitMQ Windows installer does it for you). Download it from here: http://www.rabbitmq.com/. I also recommend installing the web management console. From Rabbit's sbin directory execute: rabbitmq-plugins enable rabbitmq_management If you're on Windows and you installed a Rabbit service you have to restart it. That's it. Spring Well, it turned out that VMware bought RabbitMQ and SpringSource developers are now developing it. Given this fact, you shouldn't be surprised that Spring - RabbitMQ integration is childishly simple. Add spring-rabbit dependency to your Maven project, and then in Spring configuration paste the following: The default configuration assumes that RabbitMQ is running on a local server using the default port and default credentials (guest/guest). Of course all these settings are configurable. To sent a message to "myqueue" queue, just inject an instance of AmqpTemplate into your service and send the message. An example would be: @Service public class HomeController { @Autowired private AmqpTemplate amqpTemplate; public void sendMessage(Bundle bundle) throws IOException { byte[] body = IOUtils.toByteArray(bundle.getInputStream()); MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType(bundle.getContentType()); messageProperties.setContentLength(bundle.getSize()); messageProperties.setTimestamp(new Date()); messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); Message message = new Message(body, messageProperties); amqpTemplate.send(message); } } You can open the web console http://localhost:55672/mgmt/ and see 1 message in "myqueue" queue. Apache Camel To read a message from Apache Camel you first have to add camel-amqp dependency to your POM. Then just copy and paste the following route definition: Run the route by executing mvn:camel-run and... you'll see an error. Making a long story short, Apache Camel 2.9.0 doesn't work with RabbitMQ. This is because the camel-amqp component is using the Apache Qpid client under the hood. The current Qpid version is 0.14, but Qpid guys forgot to upload new jars to the Maven public repo. Thus camel-amqp is still using Qpid 0.12 whose client doesn't seem to negotiate protocols. Even if you exclude qpid-commons and qpid-client dependencies and explicitly add Qpid 0.14 ones (download them and install in your local repo) there will be an exception thrown from the camel-amqp component as there is no longer a default ConnectionFactory constructor. Thus I was forced to install Qpid. Qpid I downloaded the Java server and simply ran it. There is no web management console, but that's OK. You can use JConsole for JMX. Spring AMQP and Qpid In order to make Spring AMQP work with Qpid copy and paste the following configuration: As you can see in the above snippet I explicitly created AMPQComponent with connectionFactory set to Apache Qpid AMQConnectionFactory object. Source code and working example This solution is a part of the Qualitas project. I use Spring MVC to handle uploads of business processes bundles (e.g., zipped archive of a WS-BPEL process) and send it to an AMQP queue. Then Apache Camel consumes the message, does additional processing of the bundle, and installs it on a remote business process execution engine. The projects you are most interested in are: qualitas-webapp (Spring MVC sending messages to AMQP) qualitas-internall-installation (Apache Camel route consuming messages from AMQP) To check out 0.0.2-SNAPSHOT tag from here: http://code.google.com/p/qualitas/source/browse/. Qualitas Read more about Qualitas project here: http://code.google.com/p/qualitas/. Happy to welcome new developers on board! cheers, Łukasz
April 17, 2012
by Łukasz Budnik
· 42,018 Views · 2 Likes
article thumbnail
JMS Message Groups in Apache Camel
Message groups in JMS provide a way to identify a set of related messages. The messages could be related by anything - a customer order number, for example. Basically a JMS broker provides a guarantee that any messages that belong to a specific group will always be consumed by a common consumer. For instance, imagine that we’ve used the splitter pattern to split out line items from an order but want to aggregate those line items together later in a route. In order to perform that aggregation you need to guarantee that all of the messages being aggregated together are consumed by the same consumer. Below is an example of using message groups with ActiveMQ within Apache Camel. package com.brinksys.camel; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.camel.component.ActiveMQComponent; import org.apache.activemq.pool.PooledConnectionFactory; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.ProducerTemplate; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.impl.DefaultCamelContext; import java.util.concurrent.TimeUnit; public class App { private static BrokerService broker; public static void main(String[] args) throws Exception { try { startBroker(); CamelContext ctx = createCamelContext(); ctx.start(); ctx.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { /* Our direct route will take a message, and set the message to group 1 if the body is an integer, * otherwise set the group to 2. * * This demonstrates the following concepts: * 1) Header Manipulation * 2) Checking the payload type of the body and using it in a choice. * 3) JMS Message groups */ from("direct:begin") .choice() .when(body().isInstanceOf(Integer.class)).setHeader("JMSXGroupID",constant("1")) .otherwise().setHeader("JMSXGroupID",constant("2")) .end() .to("amq:queue:Message.Group.Test"); /* These two are competing consumers */ from("amq:queue:Message.Group.Test").routeId("Route A").log("Received: ${body}"); from("amq:queue:Message.Group.Test").routeId("Route B").log("Received: ${body}"); } }); sendMessages(ctx.createProducerTemplate()); Thread.sleep(TimeUnit.SECONDS.toMillis(10)); stopBroker(); } catch (Exception e) { e.printStackTrace(); } } private static CamelContext createCamelContext() throws Exception { CamelContext camelContext = new DefaultCamelContext(); ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost/"); PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory); pooledConnectionFactory.setMaxConnections(8); pooledConnectionFactory.setMaximumActive(500); ActiveMQComponent activeMQComponent = ActiveMQComponent.activeMQComponent(); activeMQComponent.setUsePooledConnection(true); activeMQComponent.setConnectionFactory(pooledConnectionFactory); camelContext.addComponent("amq", activeMQComponent); return camelContext; } private static void sendMessages(ProducerTemplate pt) throws Exception { for (int i = 0; i < 10; i++) { pt.sendBody("direct:begin", Integer.valueOf(i)); } for (int i = 0; i < 10; i++) { pt.sendBody("direct:begin", "next group"); } pt.sendBody("direct:begin", Integer.valueOf(1)); pt.sendBody("direct:begin", "foo"); pt.sendBody("direct:begin", Integer.valueOf(2)); } private static void startBroker() throws Exception { broker = new BrokerService(); broker.addConnector("vm://localhost"); broker.start(); } private static void stopBroker() throws Exception { broker.stop(); } } The result of running this main method is as follows: 2445 [Camel (camel-1) thread #0 - JmsConsumer[Message.Group.Test]] INFO Route A - Received: 0 2447 [Camel (camel-1) thread #0 - JmsConsumer[Message.Group.Test]] INFO Route A - Received: 1 2460 [Camel (camel-1) thread #0 - JmsConsumer[Message.Group.Test]] INFO Route A - Received: 2 2466 [Camel (camel-1) thread #0 - JmsConsumer[Message.Group.Test]] INFO Route A - Received: 3 2472 [Camel (camel-1) thread #0 - JmsConsumer[Message.Group.Test]] INFO Route A - Received: 4 2479 [Camel (camel-1) thread #0 - JmsConsumer[Message.Group.Test]] INFO Route A - Received: 5 2482 [Camel (camel-1) thread #0 - JmsConsumer[Message.Group.Test]] INFO Route A - Received: 6 2485 [Camel (camel-1) thread #0 - JmsConsumer[Message.Group.Test]] INFO Route A - Received: 7 2488 [Camel (camel-1) thread #0 - JmsConsumer[Message.Group.Test]] INFO Route A - Received: 8 2490 [Camel (camel-1) thread #0 - JmsConsumer[Message.Group.Test]] INFO Route A - Received: 9 2493 [Camel (camel-1) thread #1 - JmsConsumer[Message.Group.Test]] INFO Route B - Received: next group 2496 [Camel (camel-1) thread #1 - JmsConsumer[Message.Group.Test]] INFO Route B - Received: next group 2499 [Camel (camel-1) thread #1 - JmsConsumer[Message.Group.Test]] INFO Route B - Received: next group 2501 [Camel (camel-1) thread #1 - JmsConsumer[Message.Group.Test]] INFO Route B - Received: next group 2504 [Camel (camel-1) thread #1 - JmsConsumer[Message.Group.Test]] INFO Route B - Received: next group 2505 [Camel (camel-1) thread #1 - JmsConsumer[Message.Group.Test]] INFO Route B - Received: next group 2508 [Camel (camel-1) thread #1 - JmsConsumer[Message.Group.Test]] INFO Route B - Received: next group 2510 [Camel (camel-1) thread #1 - JmsConsumer[Message.Group.Test]] INFO Route B - Received: next group 2513 [Camel (camel-1) thread #1 - JmsConsumer[Message.Group.Test]] INFO Route B - Received: next group 2515 [Camel (camel-1) thread #1 - JmsConsumer[Message.Group.Test]] INFO Route B - Received: next group 2517 [Camel (camel-1) thread #0 - JmsConsumer[Message.Group.Test]] INFO Route A - Received: 1 2535 [Camel (camel-1) thread #1 - JmsConsumer[Message.Group.Test]] INFO Route B - Received: foo 2538 [Camel (camel-1) thread #0 - JmsConsumer[Message.Group.Test]] INFO Route A - Received: 2 You’ll notice that all messages with a groupId of 1 are consumed by one route and the messages with a groupId of 2 are consumed by the other consumer. You’ll also see how relatively simple it is to inspect the body of our original message to check it’s type and set the header in the route that begins our orchestration. If you wish to run this source code, I’ve set up a little Git repository on github for hosting some camel examples. As of the time I write this, only the message group example is available, but others should appear soon.
April 11, 2012
by Jason Whaley
· 15,332 Views
  • Previous
  • ...
  • 258
  • 259
  • 260
  • 261
  • 262
  • 263
  • 264
  • 265
  • 266
  • 267
  • ...
  • Next
  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook
×