DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones AWS Cloud
by AWS Developer Relations
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones
AWS Cloud
by AWS Developer Relations
  1. DZone
  2. Data Engineering
  3. Data
  4. Introduction to JOODAMP Framework for Bulk data processing

Introduction to JOODAMP Framework for Bulk data processing

pradeep Duraisamy user avatar by
pradeep Duraisamy
·
Feb. 20, 09 · Interview
Like (0)
Save
Tweet
Share
16.60K Views

Join the DZone community and get the full member experience.

Join For Free

This article gives a brief introduction about JOODAMP (Java Pooled Data Multi Processor https://joodamp.dev.java.net ) framework which started from scratch. JOODAMP is used for processing huge amount of data. It can be easily integrated with standalone java application or Enterprise java application. It’s very simple and easy to integrate with any application. Only base framework is done and we need lot of comments and new ideas to extends this to fit all the users needs.

JOODAMP process data using multiple java threads, manages the thread and writes the output to a file or any other system say database. The class diagram of the framework is given below.

JOODAMP Framework

MultiProcessor is the main class used by the application to configure JOODAMP. Pool is a shard memory which contains many input queues and output queues. The data processing thread gets the required data from the pool and also the output data is written to the output queue.

There are separate threads for all the output and input queues to marshall and unmarshall the data from the queue to file and vice versa.

We will take an example of summing two numbers and writing the output to another file.

Below is the input file content
10, 20
30, 40
...

First we need to configure JOODAMP with number of queues we are using and initializing it. Below is the code to do this.

MultiProcessor processor = new MultiProcessor(org.joodamp.multiprocessor.testing.TestTask.class,3,true,null);
processor.addInputQueue("input",10000,new FileInputStream("c:\\input.txt"),new InputDO(),MultiProcessor.DATASTORE_FILE);
processor.addOutputQueue("output",10000,new FileOutputStream("c:\\output.txt"),null,MultiProcessor.DATASTORE_FILE);
processor.start();

The first line creates an Multiprocessor object and the first parameter represents the class name of the task(Data processing thread) which process the data, second parameter is the queue size, third is the thread count, fourth represent whether it’s a synchronous or asynchronous process and the last parameter is the listener used when it’s a asynchronous process.Then we need to add the input queues and output queues to the initialize pool using addInputQueue() andaddOutputQueue() method.
The first parameter for the method is a logical name for that particular queue; second parameter is the InputStream which specify the input resource and the last parameter tell the input resource is a file. Similarly we add the ouput queue to the pool with OutputStream as the output resource.

Internally for each input queues and output queues threads are created and data are added into the queues, So that Task thread (Data processing thread) can take the data for processing.

Input data and Ouput data are stored in DOs which implements DataInput and DataOutput interfaces and implements marshall and unMarshall method respectively.

The unMarshall method isused to splits the string into two and store it in particular variable.

Marshall does the serialization of the data from the member variable and returns an object.

Below is the code for boththe DOs.

InputDO

public class InputDO implements DataInput,DataFactory{
public int a;
public int b;

/** Creates a new instance of InputDO */
public InputDO() {
}

public void unMarshal(Object obj) {
String str = (String)obj;
StringTokenizer st = new StringTokenizer(str,",");
st.hasMoreElements();
a = Integer.parseInt(st.nextToken());
st.hasMoreElements();
b = Integer.parseInt(st.nextToken());
}
public Data createInstance() {
return new InputDO();
}
}

OutputDO

public class OutputDO implements DataOutput{
public int a;
public int b;
public int sum;

/*** Creates a new instance of OutputDO*/
public OutputDO() {
}

public Object marshal() {
return a +","+b+","+sum;
}
}

DataFactory interface is used by the framework to create input dataobject.

Now we have to create Task (Data processing thread) which process all the data, this class should extend Task class and override execute method.

Below is the example code

public class TestTask extends Task{
PoolContext cxt;
Pool pool;

/*** Creates a new instance of TestTask */
public TestTask() {
}
public void init(PoolContext cxt) {
this.cxt = cxt;
pool = cxt.getPool();
}
public void execute() {
System.out.println("Thread started...");
int data=0;
InputDO input = null;
while(pool.isInputDataAvailable("input")) {
input = (InputDO) pool.getInputData("input");
if(input != null) {
OutputDO output = new OutputDO();
output.sum = input.a + input.b;
output.a = input.a;
output.b = input.b;
pool.addOutputData("output",output);
data++;
}
}
System.out.println(data);
}

public void destroyTask() {
}
}

In the above program we create a pool context and from that we get a reference to a Pool. Inside the execute method we have to do the dataprocessing logic. First we have to check whether the input queue is having any data or not and then we can get the data from the queue using getInputData() method which returns DataInput object using that we get the data, compute the sum of two numbers, create a outputDO, set all the values and add the outputDO to the output queue.

The output file is shown below

10, 20, 30
30, 40, 70
...

The framework takes care of loading all the data inside input queue, writing all the data from output queue and manage all the task threads.

Data processing Framework

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • 11 Observability Tools You Should Know
  • Steel Threads Are a Technique That Will Make You a Better Engineer
  • Building a Real-Time App With Spring Boot, Cassandra, Pulsar, React, and Hilla
  • Spring Boot, Quarkus, or Micronaut?

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends: