DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
  1. DZone
  2. Data Engineering
  3. Data
  4. Producers and Consumers - Part 3 Poison Pills

Producers and Consumers - Part 3 Poison Pills

Roger Hughes user avatar by
Roger Hughes
·
Mar. 18, 13 · Interview
Like (2)
Save
Tweet
Share
26.53K Views

Join the DZone community and get the full member experience.

Join For Free

A couple of weeks ago I wrote part 2 of a short series of blogs on the Producer Consumer pattern. This blog focused upon the need to close down my Teletype’s worker thread, fixing a bug in the original code from part 1 of the series.

The idea here is that the Teletype’s worker thread can be controlled by a command from the application’s main thread. This command tells the worker thread to shutdown thus allowing the app the gracefully shutdown as demonstrated by the code below:

@Override
  public void run() {

    while (run) {

      try {
        Message message = queue.take();
        printHead.print(message.toString());
        messageCount++;
      } catch (InterruptedException e) {
        printHead.print("Teletype closing down...");
      }
    }
    printHead.print("Teletype Off.");
  }

  public void destroy() {
    run = false;
    thread.interrupt();
  }

In this sample, the main thread calls the destroy() method, which sets the run variable to false and interrupts the worker’s blocking call to queue.take().

However, there’s a problem with this idea in certain circumstances. For example, will suddenly terminating the consumer’s worker thread cause problems in other parts of the system? Will there be data loss as important messages in the queue don’t get processed? If the answer to these questions is ‘yes’ then there’s another approach you can take: use a Poison Pill.

Poison Pill is a rather melodramatic name for simply placing a certain, known, data item on the queue and when the consumer reads this item it closes down. Obviously, the poison pill has to be the last item placed on the queue or else the consumer will shut down prematurely.

This idea is great in simple systems with only one producer and consumer as shown below:


...but takes a little more thought when there are multiple producers with a single consumer as in my football match updates scenario:


...and could fall apart completely in the case of multiple produces and consumers:


... as ensuring that each consumer receives a poison pill at the right time and all the data in the queue gets processed could be quite tricky.

In this blog I’m updating my Teletype code to shut itself down once the two MatcherReporters have sent all their data. The first thing to do is to decide on the message that will act as a poison pill. In the snippet below you can see that I’ve inserted a message that contains the text “END OF FILE” at the end of the match update stream.
<value>95:30 END OF FILE</value>
  <value>95:00 Final Score  Fulham 0 - 1 Man Utd</value>
  <value>94:59 Full time The referee signals the end of the game.</value> 
I’ve inserted one of these messages into each set of game data.

The next thing to do is to modify the Teletype code adding a check for the poison pill message:
public class Teletype implements Runnable {

  private static final String POISON_PILL_MESSAGE = "END OF FILE";

  private final BlockingQueue<Message> queue;

  private final PrintHead printHead;

  private final int matchesPlayed;

  private volatile boolean run = true;

  private int pillsRecieved;

  public Teletype(PrintHead printHead, BlockingQueue<Message> queue, int matchesPlayed) {
    this.queue = queue;
    this.printHead = printHead;
    this.matchesPlayed = matchesPlayed;
  }

  public void start() {

    Thread thread = new Thread(this, "Studio Teletype");
    thread.start();
    printHead.print("Teletype Online.");
  }

  @Override
  public void run() {

    while (run) {

      try {
        Message message = queue.take();
        handleMessage(message);
      } catch (InterruptedException e) {
        printHead.print("Teletype closing down...");
      }
    }
    printHead.print("Teletype Off.");
  }

  private void handleMessage(Message message) {
    if (allGamesAreOver(message.getMessageText())) {
      run = false;
    } else {
      printHead.print(message.toString());
    }
  }

  private boolean allGamesAreOver(String messageText) {

    if (POISON_PILL_MESSAGE.equals(messageText)) {
      pillsRecieved++;
    }

    return pillsRecieved == matchesPlayed ? true : false;
  }

  @VisibleForTesting
  boolean isRunning() {
    return run;
  }
}
One of the most significant changes here is the addition of the matchesPlayed instance variable. This variable tells the Teletype how many MatchReporters there are supplying it with data. Ultimately this breaks the Producer Consumer pattern in that the consumer now knows about the rest of the system; however, it’s necessary because we need to ensure that the Teletype shuts down at the end of all the data. In a single producer/consumer one to one system this isn’t necessary.

The other big change in the Teletype code is to the run() loop. Once a message has been retrieved from the queue it’s passed to the new handleMessage(...) method. The handleMessage(...) method checks whether or not all the games it’s receiving data from are over by calling allGamesAreOver(...), which checks the message text against the poison pill string. If the message test is the poison pill string then the pillsRecieved counter is updated. If the pillsRecieved equals the matchesPlayed variable then all the all the games are over and allGamesAreOver(...) returns true. This sets the run instance variable to false and the worker thread’s run() method exits.

So that’s about it, the melodramatic Poison Pill pattern in a nutshell, next time Murder in the Red Barn.



The code for this sample is available on GitHub.

consumer producer Data (computing)

Published at DZone with permission of Roger Hughes, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • Unlocking the Power of Polymorphism in JavaScript: A Deep Dive
  • How To Generate Code Coverage Report Using JaCoCo-Maven Plugin
  • Spring Boot Docker Best Practices
  • Project Hygiene

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends: