Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

TCP/IP Sockets to monitor daemon processes.

DZone's Guide to

TCP/IP Sockets to monitor daemon processes.

· DevOps Zone
Free Resource

Download the blueprint that can take a company of any maturity level all the way up to enterprise-scale continuous delivery using a combination of Automic Release Automation, Automic’s 20+ years of business automation experience, and the proven tools and practices the company is already leveraging.

When I finished my previous article a few weeks ago I thought it would be a good idea to talk about Sockets. Daemon processes have problems about the lack of visibility on its outcome and many times it is difficult to get information from them so it would be perfect to link both articles.

I remember back in late 1990s when I discovered Socket communication and how that can help back-end processes to provide information to the outside world. I started to use that very often and it was a complete success since the company I was working for didn't know that was possible. Monitoring those processes use to be very tedious and time consuming so that made a huge difference for us. In half a year we had all our processes being not only monitored through socket clients but we were able to increase the number of threads, silently stop the daemon, reduce the waiting time the producer wakes up to produce tasks and even changing behavioral setting on how the process automatically slowed down or speeded up based on statistics on the demand of task.

Sockets have been used for too long already and they are always a wonderful and powerful tool in many ways. We surely use them hundreds of times a day without even knowing about it.

Let's start writing code now. I'll use the code on my previous article  so we don't have to waste time writing and understanding another back-end process.

The first thing we are going to do is taking care of the server socket that will live inside the backend process. The daemon needs to have another separate thread to receive connection petitions by listening one particular TCP/IP socket and then creates a new thread for the incoming connection that will attend its request. So we can say that every client connected to our process will have its own thread living into the daemon process devoted to him(the client).


Class ServerSocketConnection.java:

package com.test.multithread;

import java.net.*;
import java.io.*;

public class ServerSocketConnection extends Thread {
	boolean listening = true;
	ServerSocket serverSocket = null;
	TaskProducer taskProducer;

	@Override
	public void run() {
		try {
			serverSocket = new ServerSocket(3232);

			while (listening) {
				new SocketConnectionThread(serverSocket.accept(), taskProducer).start();
			}

		} catch (final IOException e) {
			System.err.println("Could not listen on port: 3232.");
			System.exit(-1);
		}
	}

	public void setProcessingThread(final TaskProducer ep) {
		this.taskProducer = ep;
	}

	public boolean isListening() {
		return listening;
	}

	public void setListening(final boolean listening) {
		this.listening = listening;
	}

	public void close() {
		try {
			serverSocket.close();
		} catch (final IOException e) {
			e.printStackTrace();
		}
	}
}


The run method is where the important action will be happening and the thread will be blocked in the 17 line while waiting for a new client requesting a connection. Once the connection comes, the thread gets unblocked and the new SocketConnectionThread is created. This new object is now aimed to listen to this particular client requests and will use the reference to the taskProducer to process it’s commands.

Now lets make sure this new class is used by the main class of the daemon:

Class ProducerConsumerPattern.java:

package com.test.multithread;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class ProducerConsumerPattern {

	private final int queueCapacity = 200;
	private int numberOfThreads = 10;

	public static void main(final String args[]) {
		new ProducerConsumerPattern(20);
	

	public ProducerConsumerPattern(final int numberOfThreads) {
		if (numberOfThreads <= 0 || numberOfThreads > 100) 
			throw new IllegalArgumentException("The number of threads should be a number between 1 and 100");

		this.numberOfThreads = numberOfThreads;

		// Creating shared object
		final BlockingQueue<Long> sharedQueue = new LinkedBlockingQueue<Long>(queueCapacity);

		// Creating and starting the Consumer Threads
		final List<TaskConsumer> consumers = new ArrayList<TaskConsumer>();
		for (int i = 0; i <= this.numberOfThreads; i++) {
			final TaskConsumer consThread = new TaskConsumer(i, sharedQueue);
			consThread.start();
			consumers.add(consThread);
		}

		final ServerSocketConnection socketServer = new ServerSocketConnection();

		// Creating and starting the Producer Thread
		final TaskProducer prodThread = new TaskProducer(sharedQueue, socketServer, consumers);
		prodThread.start();

		socketServer.setProcessingThread(prodThread);
		socketServer.start();
	}
}


As you can see lines 34, 40 and 41 were added to the original code. Since the TaskProducer will also need a reference to the ServerSocketConnection code, this has to be instantiated right before creating TaskProducer in order to pass the ServerSocketConnection reference to the constructor. Then, because these two classes are cross referenced, we need to set the TaskProducer to the ServerSocketConnection class by using a setter method.

Now that we have the server socket class, we have to write the SocketConnectionThread class that will listen the information requests (commands) from the clients and respond with the data coming from the statistics accumulated by the TaskProducer.

Class SocketConnectionThread.java:

package com.test.multithread;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

public class SocketConnectionThread extends Thread {
	private Socket socket = null;
	private final TaskProducer taskProducer;

	public SocketConnectionThread(final Socket socket, final TaskProducer ep) {
		super("SocketConnectionThread");
		this.socket = socket;
		this.taskProducer = ep;
	}

	@Override
	public void run() {
		try {
			final PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
			final BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));

			String inputLine, outputLine;
			final CommunicationProtocol kkp = new CommunicationProtocol();
			while ((inputLine = in.readLine()) != null) {
				outputLine = kkp.processInput(inputLine, taskProducer);
				out.println(outputLine);

				if (outputLine.equals("Bye"))
					break;
			}
			out.close();
			in.close();
			socket.close();
		} catch (final IOException e) {
			e.printStackTrace();
		}
	}
}


Same as in ServerSocketConnection the logic is focused into the run method and here the blocking happens in a different place. Now the OutputStream coming from the socket is the one that blocks the thread waiting for requests. Every time a new line comes from the client (a command) the thread wakes up and process the request.

Here we have to make a decision. How are we going to process the text coming from the client? Are we going to write this code into this class? What I usually do in this scenarios is to encapsulate the logic of interpreting the commands coming from the client into a class that acts as a communication protocol parser. This class could be used both in the server and in the multiple clients service objects. That is why we have a third class here:

Class CommunicationProtocol.java:

package com.test.multithread;

public class CommunicationProtocol {

	public String processInput(final String theInput, final TaskProducer taskProducer) {
		final String output = "Unknown command";

		if (theInput.equalsIgnoreCase("exit")) {
			return "Bye";
		} else if (theInput.equalsIgnoreCase("stop")) {
			taskProducer.stopProcessing();
			return "Stopping";
		} else if (theInput.equalsIgnoreCase("totalQueuedItems")) {
			return Long.toString(taskProducer.getTotalQueuedItems());
		} else if (theInput.equalsIgnoreCase("totalProcessedItems")) {
			return Long.toString(taskProducer.getTotalProcessedItems());
		} else if (theInput.equalsIgnoreCase("averageProcessingTime")) {
			return Long.toString(taskProducer.getAverageProcessingTime());
		}

		return output;
	}
}


The code into this class is really simple, but it could be really complex too depending on the level of complexity your commands will need to have. The idea here is adding another layer of abstraction so the SocketConnectionThread class don’t have to deal with protocol related issues.

As you can see I will provide the process with 5 simple commands since It is not the intent of this article to explore ideas on how to manipulate of grab information from daemon processes. These simple three statistical pieces of data and two behavioral commands will serve as examples of what can be done using this approach.

In order to answer this statistical information to our clients we need to keep track of how many task have been queued, how many have been processed, and how much time takes to each task to get completed. But where this information belongs to?

The natural place for the total amount of queued items is the TaskProducer and either the total or the average processing time will rely on the TaskConsumer, so we will need to add some changes to both classes.

Class TaskProducer.java:

package com.test.multithread;

import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public class TaskProducer extends Thread {
	private boolean blnExit = false;
	private final List<TaskConsumer> consumers;
	private final BlockingQueue<Long> sharedQueue;
	private final ServerSocketConnection socketServer;
	public long totalQueuedItems = 0;

	public TaskProducer(final BlockingQueue<Long> sharedQueue, final ServerSocketConnection socketServer, final List<TaskConsumer> consumers) {
		this.sharedQueue = sharedQueue;
		this.consumers = consumers;
		this.socketServer = socketServer;
	}

	@Override
	public void run() {
		long i = 0;

		////////////////////////////////////////////
		// PRODUCING THE OBJECTS TO BE CONSUMED
		////////////////////////////////////////////
		while (!blnExit) {
			try {
            			i++;
				totalQueuedItems++;
				sharedQueue.put(Long.valueOf(i)); 

			} catch (final InterruptedException ex) {
				Logger.getLogger(TaskProducer.class.getName()).log(Level.SEVERE, null, ex);
			}
		}

		/////////////////////////////////
		// WAIT UNTIL THE QUEUE IS EMPTY
		/////////////////////////////////
		while (sharedQueue.size() > 0) {
			try {
				Thread.sleep(200);
				System.out.println("Producer waiting to end.");

			} catch (final InterruptedException e) {
				break;
			}
		}

		////////////////////////////////////////////
		// SEND TO ALL CONSUMERS THE EXIT CONDITION
		////////////////////////////////////////////
		for (final TaskConsumer consumer : consumers) {
			consumer.setExitCondition(true);
		}
		socketServer.close();

	}

	public void setExitCondition(final boolean blnDoExit) {
		blnExit = blnDoExit;
	}

	public long getTotalQueuedItems() {
		return totalQueuedItems;
	}

	public long getTotalProcessedItems() {
		long total = 0;
		for (final TaskConsumer consumer : consumers) {
			total += consumer.getTotalConsumedItems();
		}
		return total;
	}

	public void stopProcessing() {
		blnExit = true;
		socketServer.setListening(false);
	}

	public long getAverageProcessingTime() {
		long total = 0;
		for (final TaskConsumer consumer : consumers) {
			total += consumer.getAverageProcessingTime();
		}
		return total / consumers.size();
	}
}


Here we added a method for each of the data the clients are going to be interested in and the two of them that will execute an action. The getAverageProcessingTime and getTotalProcessingTime are going to calculated by grabbing the information kept on each of the TaskConsumer objects, and getTotalQueuedItems is just about returning a class attribute value.

The TaskConsumer class will keep statistical information about the process to be able to hand it over to the TaskProducer whenever is needed.

Class TaskConsumer.java:

package com.test.multithread;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public class TaskConsumer extends Thread {
	boolean blnExit = false;
	private final int id;
	private long totalConsumedItems = 0;
	private long averageProcessingTime = 0;
	private final BlockingQueue<Long> sharedQueue;

	public TaskConsumer(final int id, final BlockingQueue<Long> sharedQueue) {
		this.id = id;
		this.sharedQueue = sharedQueue;
	}

	public void setExitCondition(final boolean blnDoExit) {
		blnExit = blnDoExit;
	}

	@Override
	public void run() {
		final Random generator = new Random();

		while (!blnExit) {
			try {
				if (sharedQueue.size() > 0) {
					System.out.println("Consumer id " + id + " has worked task "+ sharedQueue.take());

					// TO BE REMOVED (ONLY SIMULATES RANDOM WORKING TIME)
					final long start = System.currentTimeMillis();
					Thread.sleep(generator.nextInt(1000) + 1000);
					final long end = System.currentTimeMillis();

					totalConsumedItems++;
					averageProcessingTime = (averageProcessingTime + (end - start)) / 2;

				} else
					Thread.sleep(500);

			} catch (final InterruptedException ex) {
				Logger.getLogger(TaskConsumer.class.getName()).log(Level.SEVERE, null, ex);
			}
		}

		System.out.println("Consumer " + id + " exiting");
	}

	public long getTotalConsumedItems() {
		return totalConsumedItems;
	}

	public long getAverageProcessingTime() {
		return averageProcessingTime;
	}
}


Since we do not want to store historical data for each task consumed I decided to provide only the running average time. This information could be much more accurate but that is out of the scope of this article.

Now everything is in it’s place. We just need the client socket GUI application but we are not going to write it here. It’s been written thousands of times so lets not reinvent the wheel. Shall we? :)

I have used SocketTest with this article and it works beautifully but feel free to use any other generic propose socket client you want.

Download SocketTest from here http://sockettest.sourceforge.net/ and you are ready to go!

Play with this tool to feel comfortable with it and once you are ready to test our application, start the daemon and then connect the socket client using the default host (127.0.0.1, localhost also known as loopback) and port 3232 (or the one you choose).

Now you can query the daemon like this:

SocketTestProducerConsumer.png


Remember that the daemon process is continuously processing credit card applications and this add-on will simply provide information about the process execution to remote clients.

Have fun and feel free to ask questions if you need to.

Download the ‘Practical Blueprint to Continuous Delivery’ to learn how Automic Release Automation can help you begin or continue your company’s digital transformation.

Topics:

Published at DZone with permission of Andres Navarro. See the original article here.

Opinions expressed by DZone contributors are their own.

THE DZONE NEWSLETTER

Dev Resources & Solutions Straight to Your Inbox

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

X

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}