Java: BlockingQueues and Continuous Monitoring
Explore the BlockingQueue interface and see how to use it to work with multiple producers and consumers within a single thread while enabling continuous monitoring.
Join the DZone community and get the full member experience.
Join For FreeIn Java, the BlockingQueue interface is available in the java.util.concurrent package. BlockingQueue implementations are designed to be used primarily for producer-consumer queues with thread-safety. They can safely be used with multiple producers and multiple consumers.
We can find many BlockingQueue examples in various forums and articles. In this article, we are going to explain that how to monitor requests continuously in queues and how to process them immediately whenever the request comes in the queue.
Instead of maintaining a separate thread for consumers and producers, we can maintain a single thread that puts the request in the queue as well as takes the request from the queue in order to process it. The same thread will monitor the queue continuously. Here, one thread is dedicatedly created for BlockingQueue, continuously running until server termination.
The BlockingQueue's size can be set while initializing the object. It should be defined based on system heap size.
Now, let's go over the steps of how to create a BlockingQueue and continuously monitor and process requests.
Step 1: EventData
Create an EventData POJO Class, which can be used to store the event data in a queue by producers — and also to retrieve the event data from the queue by consumers for further processing.
package com.dzone.blockingqueue.example;
class EventData {
private String eventID;
private String eventName;
private String eventDate;
private String eventType;
private String eventLocation;
public String getEventID() {
return eventID;
}
public void setEventID(String eventID) {
this.eventID = eventID;
}
public String getEventName() {
return eventName;
}
public void setEventName(String eventName) {
this.eventName = eventName;
}
public String getEventDate() {
return eventDate;
}
public void setEventDate(String eventDate) {
this.eventDate = eventDate;
}
public String getEventType() {
return eventType;
}
public void setEventType(String eventType) {
this.eventType = eventType;
}
public String getEventLocation() {
return eventLocation;
}
public void setEventLocation(String eventLocation) {
this.eventLocation = eventLocation;
}
}
Step 2: QueueService
Create QueueService Singleton class which helps to put the request in the queue as well as take the request in order to process.
package com.dzone.blockingqueue.example;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class QueueService {
private static QueueService instance = null;
private static BlockingQueue < EventData > eventQueue = null;
private QueueService() {}
public static QueueService getInstance() {
if (instance == null) {
instance = new QueueService();
}
return instance;
}
private void initialize() {
if (eventQueue == null) {
eventQueue = new LinkedBlockingQueue <EventData> ();
EventProcessor eventProcessor = new EventProcessor();
eventProcessor.start();
}
}
public void putEventInQueue(EventData eventData) {
try {
initialize();
eventQueue.put(eventData);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
class EventProcessor extends Thread {
@Override
public void run() {
for (;;) {
EventData eventData = null;
try {
eventData = eventQueue.take();
System.out.println("Process Event Data : Type : " + eventData.getEventType() + " / Name : " + eventData.getEventName());
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
}
}
Here, we have created a static BlockingQueue variable. It should be initialized for the very first time with either an ArrayBlockingQueue or a LinkedBlockingQueue, depending on our requirements. After that, this object can be used for putting in or taking out requests from the queue.
We also created an EventProcessor private class that extends Thread. It should be triggered at the time of our BlockingQueue's initialization. Here, a for loop is used in the EventProcessor to monitor the queue. The advantage of BlockingQueue is that it will be put in waiting mode when the queue is empty, so the for loop is not iterating continuously. When the request comes into the queue, our BlockingQueue resume the process to consume the request.
A single EventProcessor thread will handle all requests in a particular queue. This thread will never expire, helping enable continuous monitoring.
We also created a public putEventInQueue method in our QueueService, which helps to put the request in the queue through the static getInsatnce method. Inside the method, requests are put in the BlockingQueue. Automatically, these requests will be taken by the BlockingQueue, which is monitoring in the EventProcessor thread for further processing.
Step 3: EventService
Now it's time for loading data in the queue. We have written one EventService class. It will put the number of the request into our BlockingQueue. In QueueService, we can find that how the request will be consumed in order to process it.
package com.dzone.blockingqueue.example;
public class EventService {
public static void main(String arg[]) {
try {
EventData event = null;
for (int i = 0; i < 100; i++) {
event = new EventData();
event.setEventType("EventType " + i);
event.setEventName("EventName " + i);
QueueService.getInstance().putEventInQueue(event);
Thread.sleep(100);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Step 4: EventProcessor Output
If we run the example, we can get the result seen below.
Process Event Data : Type : EventType 0 / Name : EventName 0
Process Event Data : Type : EventType 1 / Name : EventName 1
Process Event Data : Type : EventType 2 / Name : EventName 2
Process Event Data : Type : EventType 3 / Name : EventName 3
Process Event Data : Type : EventType 4 / Name : EventName 4
Opinions expressed by DZone contributors are their own.
Comments