DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • Offline Data Pipeline Best Practices Part 2:Optimizing Airflow Job Parameters for Apache Hive
  • Setting Up DBT and Snowpark for Machine Learning Pipelines
  • Upgrading Spark Pipelines Code: A Comprehensive Guide
  • Python Function Pipelines: Streamlining Data Processing

Trending

  • Ensuring Configuration Consistency Across Global Data Centers
  • Endpoint Security Controls: Designing a Secure Endpoint Architecture, Part 1
  • AI-Based Threat Detection in Cloud Security
  • The Cypress Edge: Next-Level Testing Strategies for React Developers
  1. DZone
  2. Culture and Methodologies
  3. Career Development
  4. How to Develop a Data Processing Job Using Apache Beam

How to Develop a Data Processing Job Using Apache Beam

In this post, I'll show concrete examples and highlight several use cases of data processing jobs using Apache Beam.

By 
Alexey Romanenko user avatar
Alexey Romanenko
·
Apr. 27, 18 · Tutorial
Likes (6)
Comment
Save
Tweet
Share
21.3K Views

Join the DZone community and get the full member experience.

Join For Free

Are you familiar with Apache Beam? If not, don't be ashamed, as one of the latest projects developed by the Apache Software Foundation and first released in June 2016, Apache Beam is still relatively new in the data processing world. As a matter of fact, it wasn't until recently when I started to work closely with Apache Beam, that I loved to learn and learned to love everything about it.

Apache Beam is a unified programming model that provides an easy way to implement batch and streaming data processing jobs and run them on any execution engine using a set of different IOs. Sound promising but still confusing? This is why I decided to launch a series of blog posts on Apache Beam. In this post, and in the following ones, I'll show concrete examples and highlight several use cases of data processing jobs using Apache Beam.

Our topic for today is batch processing. Let's take the following example: You work for a car dealership and want to analyze car sales over a given period of time (e.g. how many cars of each brand were sold?). This means that our data set is bounded (finite amount of data) and it won't be updated (the sales happened in the past). In this case, we can rely on a batch process to analyze our data.

As an input data, we have text logs of sold cars in the following format:

id,brand_name,model_name,sales_number

For example:

1,Toyota,Prius,3

2,Nissan,Sentra,2

3,Ford,Fusion,4

Before starting implementation of our first Beam application, we need to get aware of some core ideas that will be used later all the time. There are three main conceptions in Beam: Pipeline, PCollection, and PTransform.

  • Pipeline encapsulates the workflow of your entire data processing tasks from start to finish.
  • PCollection is a distributed dataset abstraction that Beam uses to transfer data between PTransforms.
  • PTransform is a process that operates with input data (input PCollection) and produces output data (output PCollection). Usually, the first and the last PTransforms represent a way to input/output data which can be bounded (batch processing) or unbounded (streaming processing).

To simplify things, we can consider Pipeline as DAG (directed acyclic graph) which represents your whole workflow, PTransforms as nodes (that transform the data) and PCollections as edges of this graph. More information can be found in the Beam Programming Guide.

Now, let's get back to our example and try to implement the first pipeline which will process provided dataset.

Creating a Pipeline

First, just create a new pipeline:

Pipeline pipeline = Pipeline.create();

Then, let's create a new PTransform using the pipeline.apply() method which will read data from text file and create a new PCollection of strings. To do this, we use one of the already implemented IOs in Beam. TextIO allows to read from and write into text file(s) line by line. It has many other features, like working with different file systems, supporting file patterns, streaming of files. For more information, see the Apache Beam documentation.

apply(TextIO.read().from(“/path/to/input/file”))

The output of this PTransform is a new instance of PCollection<String> where every entry of the collection is a text line of input file.

Since we want to have the total number of sales per brand as a result, we must group them accordingly. Therefore, the next step will be to parse every line and create a key/value pair where key is a brand name and value is a number of sales. It's worth to mention that the output PCollection from a previous PTransform will be the input PCollection for this one.

.apply("ParseAndConvertToKV", MapElements.via(
    new SimpleFunction<String, KV<String, Integer>>() {
        @Override
        public KV<String, Integer> apply(String input) {
            String[] split = input.split(",");
            if (split.length < 4) {
                return null;
            }
            String key = split[1];
            Integer value = Integer.valueOf(split[3]);
            return KV.of(key, value);
        }
    }
))

On this step, we use Beam internal PTransform, that is called MapElements to create a new pair of key/values for every input entry using the provided implementation of SimpleFunction interface.

We then group the number of sales by brand using another Beam's transform - GroupByKey. As an output result we have a PCollection of key/values where key is brand name and value is an iterable collection of sales for that brand.

.apply(GroupByKey.<String, Integer>create())

Now we are ready to sum up all numbers of car sales per brand using our own implementation of ParDo transform:

.apply("SumUpValuesByKey", ParDo.of(new DoFn<KV<String, Iterable<Integer>>, String>() {

    @ProcessElement
    public void processElement(ProcessContext context) {
        Integer totalSales = 0;
        String brand = context.element().getKey();
        Iterable<Integer> sales = context.element().getValue();
        for (Integer amount : sales) {
            totalSales += amount;
        }
        context.output(brand + ": " + totalSales);
    }
})

To finalize the pipeline, we apply another IO transform to take the PCollection of strings and write them in a text file:

.apply(TextIO.write().to(“/path/to/output/dir”).withoutSharding());

The last thing we need to do is run our created pipeline:

pipeline.run();

Looks quite easy, doesn't it? This is the power of Apache Beam, which allows ys to create complicated data processing pipelines with a minimum amount of code.

For those of you familiar with Hadoop, you may have noticed that this pipeline resembles something:

  • It reads and parses text data line by line creating new key/value pairs (Map)
  • Then groups these key/values by key (GroupBy)
  • Finally, it iterates over all values of one key applying some user function (Reduce)

Yes, that's true - this simple pipeline can be performed with a classic MapReduce job! But just compare how simpler and clearer it looks in Beam (despite being in Java!) and if we decide to extend our pipelines by adding another transform then it won't become much more complicated.

Building and Running a Pipeline

As I mentioned before, a Beam pipeline can be run on different runners (processing engines):

  • Direct Runner
  • Apache Apex
  • Apache Flink
  • Apache Gearpump
  • Apache Spark
  • Google Cloud Dataflow

To do this, we just need to add a correspondent dependency to our maven or gradle project configuration. The good thing is that we don't have to adjust or rewrite pipeline code to run it on each runner. Even better, we don't have to recompile our jars if all required runners dependency were included before - we just need to choose which runner to use and that's it!

Direct Runner is a local runner which is usually used to test your pipeline. When using Java, you must specify your dependency on the Direct Runner in your pom.xml.

<dependency>
	<groupId>org.apache.beam</groupId>
	<artifactId>beam-runners-direct-java</artifactId>
	<version>2.4.0</version>
	<scope>runtime</scope>
</dependency>

After, you have to compile your project:

# mvn clean package

And run your pipeline on direct runner:

# mvn exec:java -Dexec.mainClass=org.apache.beam.tutorial.analytic.SalesPerCarsBrand -Pdirect-runner -Dexec.args="--runner=DirectRunner"

For example, if our input file contains the following data:

# cat /tmp/beam/cars_sales_log 
1,Toyota,Prius,3 
2,Nissan,Sentra,2 
1,Toyota,Yaris,4 
3,Ford,Fusion,5 
3,Ford,Kuga,3

Then the final result will be like this:

# cat /tmp/beam/cars_sales_report 
Toyota: 7 
Nissan: 2 
Ford: 8

The list of all supported runners and the instructions, how to use them, can be found on this page.

Finally, all code of this example is published on this GitHub repository.

In the next part of this series, I will talk about streaming data processing in Beam. I'll take another example of data analytics task with an unbounded data source and we will see what Beam provides us in this case.

Data processing Apache Beam Pipeline (software) career

Published at DZone with permission of Alexey Romanenko, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Offline Data Pipeline Best Practices Part 2:Optimizing Airflow Job Parameters for Apache Hive
  • Setting Up DBT and Snowpark for Machine Learning Pipelines
  • Upgrading Spark Pipelines Code: A Comprehensive Guide
  • Python Function Pipelines: Streamlining Data Processing

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!