Introduction to JOODAMP Framework for Bulk data processing
Join the DZone community and get the full member experience.
Join For FreeThis article gives a brief introduction about JOODAMP (Java Pooled Data Multi Processor https://joodamp.dev.java.net ) framework which started from scratch. JOODAMP is used for processing huge amount of data. It can be easily integrated with standalone java application or Enterprise java application. It’s very simple and easy to integrate with any application. Only base framework is done and we need lot of comments and new ideas to extends this to fit all the users needs.
JOODAMP process data using multiple java threads, manages the thread and writes the output to a file or any other system say database. The class diagram of the framework is given below.
MultiProcessor is the main class used by the application to configure JOODAMP. Pool is a shard memory which contains many input queues and output queues. The data processing thread gets the required data from the pool and also the output data is written to the output queue.
There are separate threads for all the output and input queues to marshall and unmarshall the data from the queue to file and vice versa.
We will take an example of summing two numbers and writing the output to another file.
Below is the input file content
10, 20
30, 40
...
First we need to configure JOODAMP with number of queues we are using and initializing it. Below is the code to do this.
MultiProcessor processor = new MultiProcessor(org.joodamp.multiprocessor.testing.TestTask.class,3,true,null);
processor.addInputQueue("input",10000,new FileInputStream("c:\\input.txt"),new InputDO(),MultiProcessor.DATASTORE_FILE);
processor.addOutputQueue("output",10000,new FileOutputStream("c:\\output.txt"),null,MultiProcessor.DATASTORE_FILE);
processor.start();
The first line creates an Multiprocessor object and the first parameter represents the class name of the task(Data processing thread) which process the data, second parameter is the queue size, third is the thread count, fourth represent whether it’s a synchronous or asynchronous process and the last parameter is the listener used when it’s a asynchronous process.Then we need to add the input queues and output queues to the initialize pool using addInputQueue() andaddOutputQueue() method.
The first parameter for the method is a logical name for that particular queue; second parameter is the InputStream which specify the input resource and the last parameter tell the input resource is a file. Similarly we add the ouput queue to the pool with OutputStream as the output resource.
Internally for each input queues and output queues threads are created and data are added into the queues, So that Task thread (Data processing thread) can take the data for processing.
Input data and Ouput data are stored in DOs which implements DataInput and DataOutput interfaces and implements marshall and unMarshall method respectively.
The unMarshall method isused to splits the string into two and store it in particular variable.
Marshall does the serialization of the data from the member variable and returns an object.
Below is the code for boththe DOs.
InputDO
public class InputDO implements DataInput,DataFactory{
public int a;
public int b;
/** Creates a new instance of InputDO */
public InputDO() {
}
public void unMarshal(Object obj) {
String str = (String)obj;
StringTokenizer st = new StringTokenizer(str,",");
st.hasMoreElements();
a = Integer.parseInt(st.nextToken());
st.hasMoreElements();
b = Integer.parseInt(st.nextToken());
}
public Data createInstance() {
return new InputDO();
}
}
OutputDO
public class OutputDO implements DataOutput{
public int a;
public int b;
public int sum;
/*** Creates a new instance of OutputDO*/
public OutputDO() {
}
public Object marshal() {
return a +","+b+","+sum;
}
}
DataFactory interface is used by the framework to create input dataobject.
Now we have to create Task (Data processing thread) which process all the data, this class should extend Task class and override execute method.
Below is the example code
public class TestTask extends Task{
PoolContext cxt;
Pool pool;
/*** Creates a new instance of TestTask */
public TestTask() {
}
public void init(PoolContext cxt) {
this.cxt = cxt;
pool = cxt.getPool();
}
public void execute() {
System.out.println("Thread started...");
int data=0;
InputDO input = null;
while(pool.isInputDataAvailable("input")) {
input = (InputDO) pool.getInputData("input");
if(input != null) {
OutputDO output = new OutputDO();
output.sum = input.a + input.b;
output.a = input.a;
output.b = input.b;
pool.addOutputData("output",output);
data++;
}
}
System.out.println(data);
}
public void destroyTask() {
}
}
In the above program we create a pool context and from that we get a reference to a Pool. Inside the execute method we have to do the dataprocessing logic. First we have to check whether the input queue is having any data or not and then we can get the data from the queue using getInputData() method which returns DataInput object using that we get the data, compute the sum of two numbers, create a outputDO, set all the values and add the outputDO to the output queue.
The output file is shown below
10, 20, 30
30, 40, 70
...
The framework takes care of loading all the data inside input queue, writing all the data from output queue and manage all the task threads.
Opinions expressed by DZone contributors are their own.
Comments