In part 1 of this article, we examined parallel processing, and some of the complexities it presents. It's important to understand these complexities to appreciate the tools and patterns available to you to ease the burden. Tools, such as Pervasive's DataRush, help you to more easily build parallel processing applications that automatically scale on multi-core servers without requiring you to write complex multi-threaded code. Part 2 of this article explores Pervasive DataRush, its architecture and programming model, and a sample application. This application uses DataRush to load large amounts of data into a database, where the process is broken down by the framework into pieces that can execute in parallel. The result is an exponential speed-up in data-loading performance without explicit parallel processing Java code.
Pervasive DataRush is a Java framework with tools support that allows you to quickly build highly parallel analytic applications that fully utilize today’s multi-core systems. DataRush handles all of the challenges listed in the previous sections, such as threading, synchronization, deadlock detection, partitioning, queuing, and dynamic runtime tuning based on target host resources. It supports both implicit parallelism (using the best algorithms for the problem being solved), and explicit parallelism, where you can control what aspects of a task can be made parallel.
Inside Pervasive DataRush
DataRush is a true parallel processing environment, built upon a dataflow-based architecture, which combines both horizontal and vertical partitioning with pipeline parallelism to take advantage of multi-core systems. Used alone, any of the three techniques is helpful; DataRush allows you to combine them for even more efficiency.
Dataflow is a concept from computer science where data processing operators (small units of business logic) are linked together through ports to form an application. Operators read data from input ports, perform their processing on that data, and write the results to output ports. DataRush links the operator ports with queues to allow the operators to work inparallel.
DataRush applications contain three main component types (see the component diagram in Figure 6):
1. Process: a single-threaded scalar operator that implements some business logic; the basic unit of dataflowprogramming in DataRush. A process is analogous to a subtask as described in the previous pipeline example;
2. Assembly: itself an operator, an assembly is a multi-threaded composite of other operators and customizers;
3. Customizer: a subcomponent that dynamically configures, composes, or extends an assembly.
The rules and relationships that govern these components form a tree that represents a complete DataRush application. An assembly is always a root node that contains other Assemblies, Processes, and/or Customizers. When an Assembly contains only other Assemblies, it’s the top-most root of the tree (see Figure 7).
An Assembly may reuse other Assemblies, and can contain an arbitrarynumber of processes (where you implement business logic) and Customizer classes (where you write code to modify the Assembly definition once the application begins execution). Let’s look at the DataRush development process in detail.
Component Development with DataRush
Working with DataRush requires a slight change in thinking for most developers. For instance, as a Java developer, your first inclination might be to do as much in Java as possible (a trap I fell into at first). However, in order to take advantage of true parallel processing, without writing reams of code just to handle threading issues, you must embrace the DataRush Assembly definition.
This means, for the current release, you must think in terms of DRXML Assemblies, using available operators, and limit your code to Assembly processes (for business logic) and Customizers where applicable. This is a good thing because it means you’ll spend most of your time and effort implementing business logic instead of difficult-to-debug multithreading support code.
It all starts with an Assembly, which you define in XML according to the DRXML specification. However, with the next release of Pervasive DataRush, the entire process will be performed via an all-Java approach; you won’t need to work with DRXML if you don’t want to. Whether you use XML or Java to do so, your Assembly definition defines inputs and outputs in terms of ports, and properties to dynamically manage the runtime behavior of your parallel application (property values are appropriately seeded from asupplied .properties file). Your Assembly can also use other assemblies that you or another developer have already defined, or one of the many predefined data operators that the DataRush framework has provided (see Figure 8).
Finally, you can define outer Assemblies that connect outputports from each referenced Assembly to the input ports of others. Once you’ve done this with an outer Assembly that itself defines no ports, you have a dataflow that defines a complete DataRush application that’s ready to be executed.
The arrows within the Assembly illustrate the data flow(properties are technically not part of the data flow). This is defined with<Link> tags within the Assembly Definition .drxml file, and can be rearranged easily (see Figure 9).
In this example, although the same components are usedwithin the Assembly, the new dataflow changes the behavior of the DataRushapplication without changing a line of Java code.
Keep in mind, that the term operator generalizes other assemblies and Assembly processes.This allows you to create basic building blocks of parallel processing code,and reuse them recursively to easily assemble new parallel applications withlittle to no rework. A single Assembly definition can contain multiple process and customizer instances to further break down the processing into smaller,simpler, steps. By utilizing the DataRush framework in this way, you get thebenefit of both horizontal and vertical partitioning for parallel processing,while maximizing reuse of your code.
At this point, you probably have some questions, such as:
- How is the data transmission handled from one Assembly’s port to another?
- How many threads are used to manage the execution of my Assemblies?
- When is partially available data transmitted?
Fortunately, you don’t have to worry about these details.DataRush maintains in-memory, blocking, queues between ports to ensure that data is available to assemblies when they need them, even if they execute atvarying speeds. Also, DataRush starts the correct number of threads accordingto the number of Assembly processes you’ve defined, with some tuning done according to the number of CPUs/cores available.
The DataRush Libraries, Tools, and Development Environment
DataRush is mostly composed of a set of libraries and simple script files to tie them together. The libraries contain the code for DataRush’s execution engine, its programming API, the provided set of operators to be reused in your own DataRush applications, and some internal utility code.
The three script files in the bin directory of your DataRush installation define the three main tools that you will use:
- dra: This is the DataRush assembler. It takes Assembly definitions (.drxml files), along with the references .java files (for process and customizer implementations) and compiles them into an executable DataRush application. The dra tool will invoke the Java compiler for the referenced Java code.
- dre: This is the DataRush execution engine. Use this tool to execute a DataRush application once it has been assembled with the dra tool.
- dri: This is the DataRush inspector tool. You can use this tool to inspect the internals of other DataRush applications and Assemblies. Use of this tool is not needed during the code, compile, run phases of a DataRush application, but is provided for your convenience.
Also included is a DataRush Eclipse plug-in that allows you to visually inspect your Assembly definitions and DataRush applications as you define them. Of course, you can also easily create new DataRush projects, Assembly definition files, process and customizer Java classes, and other filesneeded for DataRush applications, from within Eclipse (see Figure 10).
As you’re writing code or defining assemblies, you compileand debug your DataRush application like you would a plain Java application within Eclipse, with full tools support. For example, you can create application launch configurations and run DataRush projects within Eclipse like any other Java application. Assembly definitions, data flows, and DataRush application runtime output are all visible without leaving Eclipse (see Figure11).
Of course, all of this functionality (minus the visuals) is available through the command-line tools, dri (DataRush inspector), dra (DataRush assembler), and dre (DataRush engine), outside of the Eclipse environment.
High-Speed, Parallel, Database Loading with DataRush
To illustrate the power of DataRush, let’s explore a sampleapplication that simulates an RSS news feed aggregator. In this application, the XML files that are received from different RSS feeds are to be read andparsed. Then, the pertinent fields are written to a comma-delimited file (CSV)for bulk import into a database. In this sample, the database will be MySQL,and the mysqlimport tool will be usedfor bulk loading. Of course, other databases can be used just as easily.
The files from the various feeds reside in separate directories, where each directory can contain zero or more XML files. The generated CSV files are then available to be bulk loaded. Each step of the process (reading the XML files, parsing them, writing the CSV files, and calling the bulk import tool) can be done in parallel, and even the tasks within can benefit from partitioning and pipelining for even more benefit. DataRush will be used to achieve this with minimal coding.
The combination of DataRush to achieve maximum parallelization on your multi-core server, with the high-speed bulk loader tools available with most database engines (i.e. MySQL, Oracle), results in a high-speed, scalable, database loader application with no thread-specific code needed. All of the multi-threading is handled automatically by DataRush.
For this particular project, I utilized DRXML for significant portions of the process. However, Pervasive DataRush expects to move to an all-Java approach with its Beta 3 release later this year. Once released, Beta 3 will be available as a free download at www.pervasivedatarush.com. For now, let’s take a look at the sample DRXML-based application in detail.
The diagram in Figure 12 shows the assemblies and data flow for the application just described. Since the DataLoader Assembly contains no import or output ports, it defines the DataRush application. It’s also a composite Assembly that contains custom assemblies for the other phases of work. Here is a description of each:
- DataLoader: This is the application Assembly that starts the whole process through these two main steps:
- Pass the base path property to the DirectoryCrawler Assembly to begin file processing.
- A customizer iterates through all subdirectories, and modifies the DataLoader assembly composition (via the DataRush Java APIs) to add instances of each Assembly (DirectoryCrawler, RSSParser, and ArticleImporter) for each subdirectory found. In this sense, the Assembly is recursive.
- DirectoryCrawler: This Assembly is given a base directory, searches for all XML files in the directory, parses their contents using a Document Object Model (DOM) parser, and sends each file’s DOM through the output port as an Object.
- RSSParser: This Assembly takes an XML DOM object as input, traverses the DOM data hierarchy, generates a CSV file with the pertinent article data, and sends the resulting filename via the output port.
- ArticleImporter: This Assembly is given an importer tool to use as a property, along with a CSV file from the input port, and calls the import tool with that filename.
The DataCrawler Assembly contains a customizer, but no process class. The other assemblies each contain a process class to perform their related business processing. The definition for the DataLoader Assembly is shown in Listing 1. The sections of interest include the <Customizer> entry, and the three <Assembly> entries.
The customizer entry references a customizer class in the type attribute, the stage at which to invoke the customizer, and variables that are set. In this case, the customizer will be invoked at composition time, which allows the class to modify the actual Assembly file before it’s compiled. The base directory path to search for XML files and subdirectories, and theimport tool are provided--via <Set>elements--to the customizer, seeded from the Assembly properties:
<Set target="path"source="path" />
<Set target="importer"source="importer" />
Next, an instance of the DirectoryCrawler Assembly isdeclared, and the base path property is provided:
<Set target="path"source="path" />
This Assembly instance will search for all XML files in thepath, and send each filename via its output port. Next, the DataLoader Assembly declares an instance of the RSSParser Assembly and links its input port to the output from DirectoryCrawler via the <Link> element. The drxml is verbose and intuitive enough to understand when you read through it:
Next, an instance of the ArticleImporter Assembly is referenced. In this case, its input port (for the CSV file to be loaded) is linked to the output of RSSParser, and the importer property is passed in:
<Set target="importer"source="importer" />
Now let’s take a look at the RSSParser Assembly since it declares both an input and output port, two properties, and a process class (see Listing 2). Ports and properties are declared at the top of the drxml file, within the <Contracts> section. Again, thanks to the verbosity of XML, you can read this and understand what the intent is:
<Propertyname="fileName" type="string" default="null"/>
<Propertyname="charsetName" type="string"default="ISO-8859-1" >
Next, the Assembly’s process class is declared in the<Process> element. The process class is set with the type attribute, and the instance name is defined. Also, the class’s internal variable, input, is linked to the input port via a <Link> element:
<Linksource="domInput" target="input" />
Notice that the output port, filenameOutput, is linked to the process’s internal variable, also named filenameOutput. For the links to succeed, you need to define public getter and setter methods in the process class so the DataRush framework to access the variables (see Listing 3).
Note: Since the application depends on files from multiple directories, with a dataflow per directory, parallel processing only occurs when files are distributed across the directories. It was built this way for the purposes of this article. However, this can limit the degree or parallel processing that will occur if files aren’t evenly distributed. To remedy this in a real-world solution, the DirectoryCrawler Assembly should feed a queue that is read by a set of paired RSSParser – ArticleImporter Assemblies that execute in parallel.
Before execution begins, the DataLoader customizer class (see Listing 4) inserts instances of each of the custom assemblies for each subdirectory it finds in the base directory provided. As a result, each directory is processed in parallel, and data pipelining takes place throughout the XML file parsing and CSV file generation steps to achieve parallelization for the files within each directory.
All of the files within a single directory are parsed, and the important article data within them is written to a single CSV file. This happens in parallel to the assemblies working on the XML files within the other subdirectories. The data is pipelined in parallel between the assemblies working across all of the directories. Finally, for each CSV file created, DataRush invokes the database bulk loader via a command line script.
For directories with many, and larger, files, parsing may continue while the assemblies processing less populated directories finish creating their CSV files and begin the import process. The final result is a balance between CPU-intense parsing, IO operations for file reads and writes, and database bulk loading, all of which is made possible thanks to DataRush, without the need to write special multi-threaded code.
Running the Sample Application
To run the sample application, you can load the project in Eclipse after installing both the DataRush SDK and the Eclipse plug-in for DataRush development. You can find the source, article data, and other support files for the sample application here. You’ll also need to create a database named newsdata with a tablenamed article. Here is the SQLstatement that creates the table with the proper columns in MySQL:
DROPTABLE IF EXISTS `newsdata`.`article`;
CREATETABLE `newsdata`.`article` (
`TITLE` varchar(80) NOT NULL,
`ID` varchar(45) NOT NULL,
`DATE` varchar(45) default NULL,
`SUMARRY` varchar(1200) default NULL,
`URL` varchar(45) default NULL,
PRIMARY KEY (`ID`)
)ENGINE=MyISAM DEFAULT CHARSET=latin1;
You can then execute the sample application, which works on the XML data within the subdirectories in the data directory. Alternatively, you can execute the following command from the sample application’s root directory, after you ensure the DataRush command-line tools are in your path:
$ dre -cp bin -pfDataLoader.properties DataLoader
Remember that you’ll need Java 6 to run DataRush. The output from the execution should appear similar to that shown in Listing 5. Running the application on a machine with multiple cores should run magnitudes faster than with one cpu/core. With the data packaged with the sample, which includes about 50,000 articles (some real, most generated), I observed the following numbers:
- 2.8GHz single-core CPU: 8.234 seconds for import
- 2.4GHz dual-core CPU: 3.167 seconds for import
With these numbers, you can see that parallel processing improves the numbers even though the single-core CPU I ran the application on runs at a higher clock-rate. In fact, despite the slower clock rate, the same DataRush application run more than twice as fast on the dual-core CPU. The real magic here is that DataRush allowed me to take advantage of the additional core(s) without writing any additional code.
Key Learning and Takeaways
In this article, you’ve seen that developing parallel processing applications that truly take advantage of today’s multi-core servers requires more than writing multi-threaded code. It requires a framework that performs the multi-threading work for you, and allows for task partitioning, and data pipelining, to ensure that even seemingly single-threaded tasks get parallelized.
Further, you’ve seen how Pervasive provides such a framework with its DataRush product, which allows Java developers to concentrate on writing reusable business logic in Java, while providing a parallel processingruntime environment that scales to your multi-core hardware. What’s more, withits focus on tools, and complete integration with Eclipse, DataRush takes almost all of the complexity out of writing and running parallel-processing applications for massive data crunching.