How to Create a Custom StreamSets Origin That Supports Parallel Execution
Origins in StreamSets Data Collector represent the source of data for a pipeline. Learn how to create a custom StreamSets origin that can support parallel execution.
Join the DZone community and get the full member experience.
Join For FreeThe latest releases of the StreamSets Data Collector (SDC) support multithreaded pipelines. In SDC, a multithreaded pipeline is a pipeline having an origin that supports parallel execution so that the pipeline can run in multiple threads.
This tutorial tries to explain how to get started writing your own custom StreamSets origin that supports parallel execution. This tutorial completes the excellent one by Pat Patterson available in the official SDC GitHub repository, which covers the creation process for a single threaded origin only. I highly recommend following that tutorial before starting with this one. This tutorial refers to the SDC version 2.4.0.0, but the process should apply to newer versions, as well.
What Is a StreamSets Origin?
In SDC, an origin stage represents the source of data for a pipeline. An origin reads data from some source, producing records to be consumed by the remainder of the pipeline. Several origins are currently available in the SDC libraries and they cover the most popular data sources, but it is possible to implement custom origins through Data Collector APIs.
Creating and Building an Origin Template
The process to create an origin template is the same as described in Pat's tutorial. It requires Apache Maven. The first step is to create a new custom stage project. From a command shell, execute:
$MAVEN_HOME/bin/mvn archetype:generate -DarchetypeGroupId=com.streamsets \
-DarchetypeArtifactId=streamsets-datacollector-stage-lib-tutorial \
-DarchetypeVersion=2.4.0.0 -DinteractiveMode=true
During the execution of this command, you will be asked for the groupId
, artifactId
, and Version
for the project. Maven generates a template project starting from the archetype in a directory which name is the provided artifactId
. This is the structure for a newly created project:
Through Maven, you can then add the files for the project to be imported in your favorite IDE. For Eclipse, move to the root folder of the project and then execute:
$MAVEN_HOME/bin/mvn eclipse:eclipse
Maven creates the template files also to implement a custom destination, a custom processor, and a custom executor in the destination
, executor
, and processor
packages. You can delete them all because the goal here is to implement a new origin only.
Modifying the Origin Template Code
This involves a couple of different steps.
Extending the Proper Parent Class
The template code contains a class called SampleSource.java
that extends the SDC BaseSource
abstract class. The first change to do is to make the SampleSource class to extend com.streamsets.pipeline.api.base.BasePushSource
:
public abstract class SampleSource extends BasePushSource
You have then to override the produce
method for the new parent class
public void produce(Map<String, String> offsets, int maxBatchSize) throws StageException
As you can see from the signature of the method that the main differences between this case and the single thread scenario are a Map of offsets instead of a single one and noBatchMaker
argument (because any thread has to start and manage its ownBatchContext
).
Implementing the Thread Class
You need to add the code for the class (it could be an inner one) that implements the java.lang.Runnable
interface:
public class RecordGeneratorThread implements Runnable
Implement a constructor for it. In this example, we are going to implement just one expecting a single argument, an integer to identify at runtime any single thread instance:
RecordGeneratorThread(int threadId)
The action is in the overridden run
method. There, you need to start a com.streamsets.pipeline.api.BatchContext
:
BatchContext batchContext = getContext().startBatch();
Then, generate some records to be added to the BatchContext
:
while (<some condition>) {
Record record = batchContext.createRecord("Thread #" + threadId);
Map<String, Field> map = new HashMap<>();
map.put("fieldName", Field.create("Some Value"));
record.set(Field.create(map));
batchContext.getBatchMaker().addRecord(record);
...
}
And finally, processed by the remainder of the pipeline.
Thread Configuration
In order to allow the setup of the number of threads to issue at each pipeline run, you can add a configuration parameter to the SampleDSource.java
class:
@ConfigDef(
required = false,
type = ConfigDef.Type.NUMBER,
defaultValue = "1",
label = "Thread Count",
displayPosition = 10,
group = "SAMPLE"
)
public int threadCount;
This way, you make it available in the origin UI. Its value can be made accessible from the Java code at runtime overriding the getNumberOfThreads
parent method:
@Override
public int getNumberOfThreads() {
return threadCount;
}
Thread Schedule and Execution
@Override
public int getNumberOfThreads() {
return threadCount;
}
Threads scheduling and execution need to be done in the SampleSource
produce
method. One way to to this is to use an ExecutorService
with Future
:
ExecutorService executor = Executors.newFixedThreadPool(getNumberOfThreads());
List<Future<Runnable>> futures = new ArrayList<>(getNumberOfThreads());
// Start the threads
for(int i = 0; i < getNumberOfThreads(); i++) {
Future future = executor.submit(new RecordGeneratorThread(i));
futures.add(future);
}
// Wait for execution end
for(Future<Runnable> f : futures) {
try {
f.get();
} catch (InterruptedException|ExecutionException e) {
LOG.error("Record generation threads have been interrupted", e.getMessage());
}
}
Finally, terminate the executor:
executor.shutdownNow();
This command also cleans up the threads that have been created previously.
Build and Deploy
To generate the final artifact, you need to run the following Maven command:
$MAVEN_HOME/bin/mvn clean package
It will generate a tar.gz
archive in the target
folder of the project. Send the archive to the $SDC-HOME/user-libs
directory of the destination SDC host machine and extract its content:
tar xvfz $SDC-HOME/user-libs/samplestage-1.0-SNAPSHOT.tar.gz
Restart SDC in order to make the new origin available for the pipelines.
Running the Origin in a Pipeline
After restarting SDC, you will see the new origin available in the origin list and you can use it in any pipeline same way as for the built-in origin.
And that's it!
Opinions expressed by DZone contributors are their own.
Comments