Connecting Apache Camel and Apache Spark
How to use Apache Camel's new Apache Spark component for different architectural styles and use cases.
Join the DZone community and get the full member experience.
Join For FreeApache Camel 2.17 will come with a brand new Apache Spark component. This is great news for all the people working with big data technologies in general and Spark in particular.

The main purpose of the Spark integration with Camel is to provide a bridge between Camel connectors and Spark jobs. In particular Camel connector provides a way to route message from various transports, dynamically choose a job to execute, use incoming message as input data for that task and finally deliver the results of the execution back to the Camel pipeline.
Supported Architectural Styles
Spark component can be used as a driver application deployed into an application server (or executed as a fat jar).
Spark component can also be submitted as a job directly into the Spark cluster.
While Spark component is primary designed to work as a long running job serving as an bridge between Spark cluster and the other endpoints, you can also use it as a fire-once short job.
Component URI Format
Currently the Spark component supports only producers — it intended to invoke a Spark job and return results. You can call RDD, data frame or Hive SQL job.
spark:{rdd|dataframe|hive}
Working With RDD-oriented Jobs
To invoke an RDD job, use the following endpoint URI:
spark:rdd?rdd=#testFileRdd&rddCallback=#transformation
Where rdd option refers to the name of an RDD instance (subclass of org.apache.spark.api.java.AbstractJavaRDDLike) from a Camel registry, while rddCallback refers to the implementation of org.apache.camel.component.spark.RddCallback interface (also from a registry). RDD callback provides a single method used to apply incoming messages against the given RDD. Results of callback computations are saved as a body to an exchange.
public interface RddCallback<T> {
T onRdd(AbstractJavaRDDLike rdd, Object… payloads);
}
The following snippet demonstrates how to send message as an input to the job and return results:
String pattern = "job input";
long linesCount = producerTemplate.requestBody("spark:rdd?rdd=#myRdd&rddCallback=#countLinesContaining", pattern, long.class);
The RDD callback for the snippet above registered as Spring bean could look as follows:
@BeanRddCallback<Long> countLinesContaining() {
return new RddCallback<Long>() {
Long onRdd(AbstractJavaRDDLike rdd, Object… payloads) {
String pattern = (String) payloads[0];
return rdd.filter({line -> line.contains(pattern)}).count();
}
}
}
And the RDD definition in Spring could looks as follows:
@Bean
AbstractJavaRDDLike myRdd(JavaSparkContext sparkContext) {
return sparkContext.textFile("testrdd.txt");
}
Annotated RDD Callbacks
Probably the easiest way to work with the RDD callbacks is to provide class with method marked with @RddCallback annotation:
import static org.apache.camel.component.spark.annotations.AnnotatedRddCallback.annotatedRddCallback;
@BeanRddCallback<Long> rddCallback() {
return annotatedRddCallback(new MyTransformation());
}
…
import org.apache.camel.component.spark.annotation.RddCallback;
public class MyTransformation {
@RddCallback
long countLines(JavaRDD<String> textFile, int first, int second) {
return textFile.count() * first * second;
}
}
If you will pass CamelContext to the annotated RDD callback factory method, the created callback will be able to convert incoming payloads to match the parameters of the annotated method:
import static org.apache.camel.component.spark.annotations.AnnotatedRddCallback.annotatedRddCallback;
@BeanRddCallback<Long> rddCallback(CamelContext camelContext) {
return annotatedRddCallback(new MyTransformation(), camelContext);
}
…
import org.apache.camel.component.spark.annotation.RddCallback;
public class MyTransformation {
@RddCallback
long countLines(JavaRDD<String> textFile, int first, int second) {
return textFile.count() * first * second;
}
}
…
// Convert String "10" to integer
long result = producerTemplate.requestBody("spark:rdd?rdd=#rdd&rddCallback=#rddCallback",
Arrays.asList(10, “10”), long.class);
Working With DataFrame Jobs
Instead of working with RDDs Spark component can work with DataFrames as well. To invoke an DataFrame job, use the following URI:
spark:dataframe?dataFrame=#testDataFrame &dataFrameCallback=#transformation
Where dataFrame option refers to the name of an DataFrame instance (instance of of org.apache.spark.sql.DataFrame) from a Camel registry, while dataFrameCallback refers to the implementation of org.apache.camel.component.spark.DataFrameCallback interface (also from a registry). DataFrame callback provides a single method used to apply incoming messages against the given DataFrame. Results of callback computations are saved as a body to an exchange.
public interface DataFrameCallback<T> {
T onDataFrame(DataFrame dataFrame, Object… payloads);
}
The following snippet demonstrates how to send message as an input to a job and return results:
String model = "Micra";
long linesCount = producerTemplate.requestBody("spark:dataFrame?dataFrame=#cars&dataFrameCallback=#findCarWithModel",
model, long.class);
The DataFrame callback for the snippet above registered as Spring bean could look as follows:
@Bean
RddCallback<Long> findCarWithModel() {
return new DataFrameCallback<Long>() {
@Override
public Long onDataFrame(DataFrame dataFrame, Object… payloads) {
String model = (String) payloads[0];
return dataFrame.where(dataFrame.col(“model”).eqNullSafe(model)).count();
}
};
}
The DataFrame definition in Spring could look as follows:
@Bean
DataFrame cars(HiveContext hiveContext) {
DataFrame jsonCars = hiveContext.read().json("/var/data/cars.json");
jsonCars.registerTempTable("cars");
return jsonCars;
}
Hive Jobs
Instead of working with RDDs or DataFrame Spark component can also receive Hive SQL queries as payloads. To send Hive query to Spark component, use the following URI:
spark:hive
The following snippet demonstrates how to send message as an input to a job and return results:
long carsCount = template.requestBody("spark:hive?collect=false",
"SELECT * FROM cars", Long.class);
List<Row> cars = template.requestBody("spark:hive",
"SELECT * FROM cars", List.class);
The table we want to execute query against should be registered in a HiveContext before we query it. For example in Spring such registration could look as follows:
@Bean
DataFrame cars(HiveContext hiveContext) {
DataFrame jsonCars = hiveContext.read().json(“/var/data/cars.json”);
jsonCars.registerTempTable(“cars”);
return jsonCars;
}
Bridging Spark With Protocol Gateways
Camel Spark component excels when it comes to bridging Spark processing with various protocol gateways. For example if your Internet Of Things (IoT) device has to perform some action based on a decision being a result of an Spark job analysis, you can bridge Spark component to an MQTT endpoint:
from("paho:analytics?brokerUrl=tcp://mybroker.com:1883").
to("spark:rdd?rdd=#myRdd&rddCallback=#job").
to("paho:analytics-responses?brokerUrl=tcp://mybroker.com:1883");
Now whenever your device sends MQTT message to the analytics topic, Camel will send that message to the Spark job defined as an RDD callback. Then results of the Spark computations will be returned back to the IoT device via an another MQTT topic (analytics-responses).
What’s Next?
This article demonstrates just a part of the Camel and Spark integration possibilities. In the coming articles I will focus on some other useful Spark and Camel features such as Camel Spark streaming API, submitting Camel jobs to the Spark cluster, running Camel Spark in a Kubernetes environment and many more.
Opinions expressed by DZone contributors are their own.
Comments