Producers and Consumers - Part 3 Poison Pills
Join the DZone community and get the full member experience.
Join For FreeA couple of weeks ago I wrote part 2 of a short series of blogs on the Producer Consumer pattern. This blog focused upon the need to close down my Teletype’s worker thread, fixing a bug in the original code from part 1 of the series.
The idea here is that the Teletype’s worker thread can be
controlled by a command from the application’s main thread. This command
tells the worker thread to shutdown thus allowing the app the
gracefully shutdown as demonstrated by the code below:
@Override public void run() { while (run) { try { Message message = queue.take(); printHead.print(message.toString()); messageCount++; } catch (InterruptedException e) { printHead.print("Teletype closing down..."); } } printHead.print("Teletype Off."); } public void destroy() { run = false; thread.interrupt(); }
In this sample, the main thread calls the destroy() method, which sets the run variable to false and interrupts the worker’s blocking call to queue.take().
However, there’s a problem with this idea in certain circumstances. For
example, will suddenly terminating the consumer’s worker thread cause
problems in other parts of the system? Will there be data loss as
important messages in the queue don’t get processed? If the answer to
these questions is ‘yes’ then there’s another approach you can take: use
a Poison Pill.
Poison Pill is a rather melodramatic name for simply placing a
certain, known, data item on the queue and when the consumer reads this
item it closes down. Obviously, the poison pill has to be the last item placed on the queue or else the consumer will shut down prematurely.
This idea is great in simple systems with only one producer and consumer as shown below:
...but takes a little more thought when there are multiple producers with a single consumer as in my football match updates scenario:
...and could fall apart completely in the case of multiple produces and consumers:
... as ensuring that each consumer receives a poison pill at the right time and all the data in the queue gets processed could be quite tricky.
In this blog I’m updating my Teletype code to shut itself down once the two MatcherReporters have sent all their data. The first thing to do is to decide on the message that will act as a poison pill. In the snippet below you can see that I’ve inserted a message that contains the text “END OF FILE” at the end of the match update stream.
<value>95:30 END OF FILE</value> <value>95:00 Final Score Fulham 0 - 1 Man Utd</value> <value>94:59 Full time The referee signals the end of the game.</value>I’ve inserted one of these messages into each set of game data.
The next thing to do is to modify the Teletype code adding a check for the poison pill message:
public class Teletype implements Runnable { private static final String POISON_PILL_MESSAGE = "END OF FILE"; private final BlockingQueue<Message> queue; private final PrintHead printHead; private final int matchesPlayed; private volatile boolean run = true; private int pillsRecieved; public Teletype(PrintHead printHead, BlockingQueue<Message> queue, int matchesPlayed) { this.queue = queue; this.printHead = printHead; this.matchesPlayed = matchesPlayed; } public void start() { Thread thread = new Thread(this, "Studio Teletype"); thread.start(); printHead.print("Teletype Online."); } @Override public void run() { while (run) { try { Message message = queue.take(); handleMessage(message); } catch (InterruptedException e) { printHead.print("Teletype closing down..."); } } printHead.print("Teletype Off."); } private void handleMessage(Message message) { if (allGamesAreOver(message.getMessageText())) { run = false; } else { printHead.print(message.toString()); } } private boolean allGamesAreOver(String messageText) { if (POISON_PILL_MESSAGE.equals(messageText)) { pillsRecieved++; } return pillsRecieved == matchesPlayed ? true : false; } @VisibleForTesting boolean isRunning() { return run; } }One of the most significant changes here is the addition of the matchesPlayed instance variable. This variable tells the Teletype how many MatchReporters there are supplying it with data. Ultimately this breaks the Producer Consumer pattern in that the consumer now knows about the rest of the system; however, it’s necessary because we need to ensure that the Teletype shuts down at the end of all the data. In a single producer/consumer one to one system this isn’t necessary.
The other big change in the Teletype code is to the run() loop. Once a message has been retrieved from the queue it’s passed to the new handleMessage(...) method. The handleMessage(...) method checks whether or not all the games it’s receiving data from are over by calling allGamesAreOver(...), which checks the message text against the poison pill string. If the message test is the poison pill string then the pillsRecieved counter is updated. If the pillsRecieved equals the matchesPlayed variable then all the all the games are over and allGamesAreOver(...) returns true. This sets the run instance variable to false and the worker thread’s run() method exits.
So that’s about it, the melodramatic Poison Pill pattern in a nutshell, next time Murder in the Red Barn.
The code for this sample is available on GitHub.
Published at DZone with permission of Roger Hughes, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments