How to Develop a Data Processing Job Using Apache Beam
In this post, I'll show concrete examples and highlight several use cases of data processing jobs using Apache Beam.
Join the DZone community and get the full member experience.
Join For FreeAre you familiar with Apache Beam? If not, don't be ashamed, as one of the latest projects developed by the Apache Software Foundation and first released in June 2016, Apache Beam is still relatively new in the data processing world. As a matter of fact, it wasn't until recently when I started to work closely with Apache Beam, that I loved to learn and learned to love everything about it.
Apache Beam is a unified programming model that provides an easy way to implement batch and streaming data processing jobs and run them on any execution engine using a set of different IOs. Sound promising but still confusing? This is why I decided to launch a series of blog posts on Apache Beam. In this post, and in the following ones, I'll show concrete examples and highlight several use cases of data processing jobs using Apache Beam.
Our topic for today is batch processing. Let's take the following example: You work for a car dealership and want to analyze car sales over a given period of time (e.g. how many cars of each brand were sold?). This means that our data set is bounded (finite amount of data) and it won't be updated (the sales happened in the past). In this case, we can rely on a batch process to analyze our data.
As an input data, we have text logs of sold cars in the following format:
id,brand_name,model_name,sales_number
For example:
1,Toyota,Prius,3
2,Nissan,Sentra,2
3,Ford,Fusion,4
Before starting implementation of our first Beam application, we need to get aware of some core ideas that will be used later all the time. There are three main conceptions in Beam: Pipeline, PCollection, and PTransform.
- Pipeline encapsulates the workflow of your entire data processing tasks from start to finish.
- PCollection is a distributed dataset abstraction that Beam uses to transfer data between PTransforms.
- PTransform is a process that operates with input data (input PCollection) and produces output data (output PCollection). Usually, the first and the last PTransforms represent a way to input/output data which can be bounded (batch processing) or unbounded (streaming processing).
To simplify things, we can consider Pipeline as DAG (directed acyclic graph) which represents your whole workflow, PTransforms as nodes (that transform the data) and PCollections as edges of this graph. More information can be found in the Beam Programming Guide.
Now, let's get back to our example and try to implement the first pipeline which will process provided dataset.
Creating a Pipeline
First, just create a new pipeline:
Pipeline pipeline = Pipeline.create();
Then, let's create a new PTransform using the pipeline.apply() method which will read data from text file and create a new PCollection of strings. To do this, we use one of the already implemented IOs in Beam. TextIO allows to read from and write into text file(s) line by line. It has many other features, like working with different file systems, supporting file patterns, streaming of files. For more information, see the Apache Beam documentation.
apply(TextIO.read().from(“/path/to/input/file”))
The output of this PTransform is a new instance of PCollection<String> where every entry of the collection is a text line of input file.
Since we want to have the total number of sales per brand as a result, we must group them accordingly. Therefore, the next step will be to parse every line and create a key/value pair where key is a brand name and value is a number of sales. It's worth to mention that the output PCollection from a previous PTransform will be the input PCollection for this one.
.apply("ParseAndConvertToKV", MapElements.via(
new SimpleFunction<String, KV<String, Integer>>() {
@Override
public KV<String, Integer> apply(String input) {
String[] split = input.split(",");
if (split.length < 4) {
return null;
}
String key = split[1];
Integer value = Integer.valueOf(split[3]);
return KV.of(key, value);
}
}
))
On this step, we use Beam internal PTransform, that is called MapElements to create a new pair of key/values for every input entry using the provided implementation of SimpleFunction interface.
We then group the number of sales by brand using another Beam's transform - GroupByKey. As an output result we have a PCollection of key/values where key is brand name and value is an iterable collection of sales for that brand.
.apply(GroupByKey.<String, Integer>create())
Now we are ready to sum up all numbers of car sales per brand using our own implementation of ParDo transform:
.apply("SumUpValuesByKey", ParDo.of(new DoFn<KV<String, Iterable<Integer>>, String>() {
@ProcessElement
public void processElement(ProcessContext context) {
Integer totalSales = 0;
String brand = context.element().getKey();
Iterable<Integer> sales = context.element().getValue();
for (Integer amount : sales) {
totalSales += amount;
}
context.output(brand + ": " + totalSales);
}
})
To finalize the pipeline, we apply another IO transform to take the PCollection of strings and write them in a text file:
.apply(TextIO.write().to(“/path/to/output/dir”).withoutSharding());
The last thing we need to do is run our created pipeline:
pipeline.run();
Looks quite easy, doesn't it? This is the power of Apache Beam, which allows ys to create complicated data processing pipelines with a minimum amount of code.
For those of you familiar with Hadoop, you may have noticed that this pipeline resembles something:
- It reads and parses text data line by line creating new key/value pairs (Map)
- Then groups these key/values by key (GroupBy)
- Finally, it iterates over all values of one key applying some user function (Reduce)
Yes, that's true - this simple pipeline can be performed with a classic MapReduce job! But just compare how simpler and clearer it looks in Beam (despite being in Java!) and if we decide to extend our pipelines by adding another transform then it won't become much more complicated.
Building and Running a Pipeline
As I mentioned before, a Beam pipeline can be run on different runners (processing engines):
- Direct Runner
- Apache Apex
- Apache Flink
- Apache Gearpump
- Apache Spark
- Google Cloud Dataflow
To do this, we just need to add a correspondent dependency to our maven or gradle project configuration. The good thing is that we don't have to adjust or rewrite pipeline code to run it on each runner. Even better, we don't have to recompile our jars if all required runners dependency were included before - we just need to choose which runner to use and that's it!
Direct Runner is a local runner which is usually used to test your pipeline. When using Java, you must specify your dependency on the Direct Runner in your pom.xml.
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>2.4.0</version>
<scope>runtime</scope>
</dependency>
After, you have to compile your project:
# mvn clean package
And run your pipeline on direct runner:
# mvn exec:java -Dexec.mainClass=org.apache.beam.tutorial.analytic.SalesPerCarsBrand -Pdirect-runner -Dexec.args="--runner=DirectRunner"
For example, if our input file contains the following data:
# cat /tmp/beam/cars_sales_log
1,Toyota,Prius,3
2,Nissan,Sentra,2
1,Toyota,Yaris,4
3,Ford,Fusion,5
3,Ford,Kuga,3
Then the final result will be like this:
# cat /tmp/beam/cars_sales_report
Toyota: 7
Nissan: 2
Ford: 8
The list of all supported runners and the instructions, how to use them, can be found on this page.
Finally, all code of this example is published on this GitHub repository.
In the next part of this series, I will talk about streaming data processing in Beam. I'll take another example of data analytics task with an unbounded data source and we will see what Beam provides us in this case.
Published at DZone with permission of Alexey Romanenko, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments