Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Connecting Apache Camel and Apache Spark

DZone's Guide to

Connecting Apache Camel and Apache Spark

How to use Apache Camel's new Apache Spark component for different architectural styles and use cases.

· Integration Zone
Free Resource

Share, secure, distribute, control, and monetize your APIs with the platform built with performance, time-to-value, and growth in mind. Free 90-day trial of 3Scale by Red Hat

Apache Camel 2.17 will come with a brand new Apache Spark component. This is great news for all the people working with big data technologies in general and Spark in particular. 

The main purpose of the Spark integration with Camel is to provide a bridge between Camel connectors and Spark jobs. In particular Camel connector provides a way to route message from various transports, dynamically choose a job to execute, use incoming message as input data for that task and finally deliver the results of the execution back to the Camel pipeline.

Supported Architectural Styles

Spark component can be used as a driver application deployed into an application server (or executed as a fat jar).

Image title

Spark component can also be submitted as a job directly into the Spark cluster.

Image title

While Spark component is primary designed to work as a long running job serving as an bridge between Spark cluster and the other endpoints, you can also use it as a fire-once short job. 

Component URI Format

Currently the Spark component supports only producers —  it intended to invoke a Spark job and return results. You can call RDD, data frame or Hive SQL job.

spark:{rdd|dataframe|hive}


Working With RDD-oriented Jobs 

 To invoke an RDD job, use the following endpoint URI:

spark:rdd?rdd=#testFileRdd&rddCallback=#transformation

Where rdd option refers to the name of an RDD instance (subclass of org.apache.spark.api.java.AbstractJavaRDDLike) from a Camel registry, while rddCallback refers to the implementation of org.apache.camel.component.spark.RddCallback interface (also from a registry). RDD callback provides a single method used to apply incoming messages against the given RDD. Results of callback computations are saved as a body to an exchange.

public interface RddCallback<T> { 
  T onRdd(AbstractJavaRDDLike rdd, Object… payloads);
}

The following snippet demonstrates how to send message as an input to the job and return results:

String pattern = "job input";
long linesCount = producerTemplate.requestBody("spark:rdd?rdd=#myRdd&rddCallback=#countLinesContaining",  pattern, long.class);

The RDD callback for the snippet above registered as Spring bean could look as follows:

@BeanRddCallback<Long> countLinesContaining() { 
  return new RddCallback<Long>() {     
    Long onRdd(AbstractJavaRDDLike rdd, Object… payloads) {       
      String pattern = (String) payloads[0];       
      return rdd.filter({line -> line.contains(pattern)}).count();    
    }  
  }
}

And the RDD definition in Spring could looks as follows:

@Bean
AbstractJavaRDDLike myRdd(JavaSparkContext sparkContext) {  
  return sparkContext.textFile("testrdd.txt");
}

Annotated RDD Callbacks

Probably the easiest way to work with the RDD callbacks is to provide class with method marked with @RddCallback annotation:

import static org.apache.camel.component.spark.annotations.AnnotatedRddCallback.annotatedRddCallback; 

@BeanRddCallback<Long> rddCallback() {  
  return annotatedRddCallback(new MyTransformation());
} 

… 

import org.apache.camel.component.spark.annotation.RddCallback; 

public class MyTransformation {  

  @RddCallback 
  long countLines(JavaRDD<String> textFile, int first, int second) {   
    return textFile.count() * first * second; 
  } 
}

If you will pass CamelContext to the annotated RDD callback factory method, the created callback will be able to convert incoming payloads to match the parameters of the annotated method:

import static org.apache.camel.component.spark.annotations.AnnotatedRddCallback.annotatedRddCallback; 

@BeanRddCallback<Long> rddCallback(CamelContext camelContext) {  
  return annotatedRddCallback(new MyTransformation(), camelContext);
} 

…  

import org.apache.camel.component.spark.annotation.RddCallback; 

public class MyTransformation {  

  @RddCallback 
  long countLines(JavaRDD<String> textFile, int first, int second) {   
    return textFile.count() * first * second; 
  } 
} 

… 

// Convert String "10" to integer
long result = producerTemplate.requestBody("spark:rdd?rdd=#rdd&rddCallback=#rddCallback", 
                                           Arrays.asList(10, “10”), long.class);

Working With DataFrame Jobs

Instead of working with RDDs Spark component can work with DataFrames as well. To invoke an DataFrame job, use the following URI:

spark:dataframe?dataFrame=#testDataFrame  &dataFrameCallback=#transformation

Where dataFrame option refers to the name of an DataFrame instance (instance of of org.apache.spark.sql.DataFrame) from a Camel registry, while dataFrameCallback refers to the implementation of org.apache.camel.component.spark.DataFrameCallback interface (also from a registry). DataFrame callback provides a single method used to apply incoming messages against the given DataFrame. Results of callback computations are saved as a body to an exchange.

public interface DataFrameCallback<T> { 
  T onDataFrame(DataFrame dataFrame, Object… payloads);
}

The following snippet demonstrates how to send message as an input to a job and return results:

String model = "Micra";
long linesCount = producerTemplate.requestBody("spark:dataFrame?dataFrame=#cars&dataFrameCallback=#findCarWithModel",   
                                               model, long.class);

The DataFrame callback for the snippet above registered as Spring bean could look as follows:

@Bean
RddCallback<Long> findCarWithModel() {  
  return new DataFrameCallback<Long>() {    
    @Override    
    public Long onDataFrame(DataFrame dataFrame, Object… payloads) {      
      String model = (String) payloads[0];      
      return dataFrame.where(dataFrame.col(“model”).eqNullSafe(model)).count();    
    }  
  };
}

The DataFrame definition in Spring could look as follows:

@Bean
DataFrame cars(HiveContext hiveContext) {  
  DataFrame jsonCars = hiveContext.read().json("/var/data/cars.json"); 
  jsonCars.registerTempTable("cars"); 
  return jsonCars;
}

Hive Jobs

Instead of working with RDDs or DataFrame Spark component can also receive Hive SQL queries as payloads. To send Hive query to Spark component, use the following URI:

spark:hive

The following snippet demonstrates how to send message as an input to a job and return results:

long carsCount = template.requestBody("spark:hive?collect=false", 
                                      "SELECT * FROM cars", Long.class);
List<Row> cars = template.requestBody("spark:hive", 
                                      "SELECT * FROM cars", List.class);

The table we want to execute query against should be registered in a HiveContext before we query it. For example in Spring such registration could look as follows:

@Bean
DataFrame cars(HiveContext hiveContext) { 
  DataFrame jsonCars = hiveContext.read().json(“/var/data/cars.json”); 
  jsonCars.registerTempTable(“cars”); 
  return jsonCars;
}

Bridging Spark With Protocol Gateways

Camel Spark component excels when it comes to bridging Spark processing with various protocol gateways. For example if your Internet Of Things (IoT) device has to perform some action based on a decision being a result of an Spark job analysis, you can bridge Spark component to an MQTT endpoint:

from("paho:analytics?brokerUrl=tcp://mybroker.com:1883").  
  to("spark:rdd?rdd=#myRdd&rddCallback=#job").  
  to("paho:analytics-responses?brokerUrl=tcp://mybroker.com:1883");

Now whenever your device sends MQTT message to the analytics topic, Camel will send that message to the Spark job defined as an RDD callback. Then results of the Spark computations will be returned back to the IoT device via an another MQTT topic (analytics-responses). 

What’s Next?

This article demonstrates just a part of the Camel and Spark integration possibilities. In the coming articles I will focus on some other useful Spark and Camel features such as Camel Spark streaming API, submitting Camel jobs to the Spark cluster, running Camel Spark in a Kubernetes environment and many more.

Explore the core elements of owning an API strategy and best practices for effective API programs. Download the API Owner's Manual, brought to you by 3Scale by Red Hat

Topics:
internet of things ,apache camel ,big data ,spark

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}