Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

How to Create a Custom StreamSets Origin That Supports Parallel Execution

DZone's Guide to

How to Create a Custom StreamSets Origin That Supports Parallel Execution

Origins in StreamSets Data Collector represent the source of data for a pipeline. Learn how to create a custom StreamSets origin that can support parallel execution.

· Big Data Zone
Free Resource

See how the beta release of Kubernetes on DC/OS 1.10 delivers the most robust platform for building & operating data-intensive, containerized apps. Register now for tech preview.

The latest releases of the StreamSets Data Collector (SDC) support multithreaded pipelines. In SDC, a multithreaded pipeline is a pipeline having an origin that supports parallel execution so that the pipeline can run in multiple threads.

This tutorial tries to explain how to get started writing your own custom StreamSets origin that supports parallel execution. This tutorial completes the excellent one by Pat Patterson available in the official SDC GitHub repository, which covers the creation process for a single threaded origin only. I highly recommend following that tutorial before starting with this one. This tutorial refers to the SDC version 2.4.0.0, but the process should apply to newer versions, as well.

What Is a StreamSets Origin?

In SDC, an origin stage represents the source of data for a pipeline. An origin reads data from some source, producing records to be consumed by the remainder of the pipeline. Several origins are currently available in the SDC libraries and they cover the most popular data sources, but it is possible to implement custom origins through Data Collector APIs.

Creating and Building an Origin Template

The process to create an origin template is the same as described in Pat's tutorial. It requires Apache Maven. The first step is to create a new custom stage project. From a command shell, execute:

$MAVEN_HOME/bin/mvn archetype:generate -DarchetypeGroupId=com.streamsets \
-DarchetypeArtifactId=streamsets-datacollector-stage-lib-tutorial \
-DarchetypeVersion=2.4.0.0 -DinteractiveMode=true

During the execution of this command, you will be asked for the groupId, artifactId, and Version for the project. Maven generates a template project starting from the archetype in a directory which name is the provided artifactId. This is the structure for a newly created project:

Image title

Origin project structure.

Through Maven, you can then add the files for the project to be imported in your favorite IDE. For Eclipse, move to the root folder of the project and then execute:

$MAVEN_HOME/bin/mvn eclipse:eclipse

Maven creates the template files also to implement a custom destination, a custom processor, and a custom executor in the destination, executor, and processor packages. You can delete them all because the goal here is to implement a new origin only.

Modifying the Origin Template Code

This involves a couple of different steps.

Extending the Proper Parent Class

The template code contains a class called SampleSource.java that extends the SDC BaseSource abstract class. The first change to do is to make the SampleSource class to extend com.streamsets.pipeline.api.base.BasePushSource:

public abstract class SampleSource extends BasePushSource   

You have then to override the produce method for the new parent class

public void produce(Map<String, String> offsets, int maxBatchSize) throws StageException  

As you can see from the signature of the method that the main differences between this case and the single thread scenario are a Map of offsets instead of a single one and noBatchMaker argument (because any thread has to start and manage its ownBatchContext).

Implementing the Thread Class

You need to add the code for the class (it could be an inner one) that implements the java.lang.Runnable interface:

public class RecordGeneratorThread implements Runnable

Implement a constructor for it. In this example, we are going to implement just one expecting a single argument, an integer to identify at runtime any single thread instance:

RecordGeneratorThread(int threadId)

The action is in the overridden run method. There, you need to start a com.streamsets.pipeline.api.BatchContext:

BatchContext batchContext = getContext().startBatch();

Then, generate some records to be added to the BatchContext:

while (<some condition>) {
    Record record = batchContext.createRecord("Thread #" + threadId);
    Map<String, Field> map = new HashMap<>();
    map.put("fieldName", Field.create("Some Value"));
    record.set(Field.create(map));
    batchContext.getBatchMaker().addRecord(record);
...
  }

And finally, processed by the remainder of the pipeline.

Thread Configuration

In order to allow the setup of the number of threads to issue at each pipeline run, you can add a configuration parameter to the SampleDSource.java class:

@ConfigDef(
     required = false,
     type = ConfigDef.Type.NUMBER,
     defaultValue = "1",
     label = "Thread Count",
     displayPosition = 10,
     group = "SAMPLE"
)
public int threadCount;

This way, you make it available in the origin UI. Its value can be made accessible from the Java code at runtime overriding the getNumberOfThreads parent method:

@Override
public int getNumberOfThreads() {
     return threadCount;
}
Thread Schedule and Execution

Threads scheduling and execution need to be done in the SampleSource produce method. One way to to this is to use an ExecutorService with Future:

ExecutorService executor = Executors.newFixedThreadPool(getNumberOfThreads()); 
List<Future<Runnable>> futures = new ArrayList<>(getNumberOfThreads());

// Start the threads
for(int i = 0; i < getNumberOfThreads(); i++) {
      Future future = executor.submit(new RecordGeneratorThread(i));
      futures.add(future);
}

// Wait for execution end
for(Future<Runnable> f : futures) {
      try {
          f.get();
      } catch (InterruptedException|ExecutionException e) {
          LOG.error("Record generation threads have been interrupted", e.getMessage());
      }
}

Finally, terminate the executor:

executor.shutdownNow();

This command also cleans up the threads that have been created previously.

Build and Deploy

To generate the final artifact, you need to run the following Maven command:

$MAVEN_HOME/bin/mvn clean package

It will generate a tar.gz archive in the target folder of the project. Send the archive to the $SDC-HOME/user-libs directory of the destination SDC host machine and extract its content:

tar xvfz $SDC-HOME/user-libs/samplestage-1.0-SNAPSHOT.tar.gz

Restart SDC in order to make the new origin available for the pipelines.

Running the Origin in a Pipeline

After restarting SDC, you will see the new origin available in the origin list and you can use it in any pipeline same way as for the built-in origin.

And that's it!

New Mesosphere DC/OS 1.10: Production-proven reliability, security & scalability for fast-data, modern apps. Register now for a live demo.

Topics:
data ingestion ,big data ,data pipeline ,tutorial ,streamsets data collector ,parallel execution

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}