DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • Leveraging Apache Flink Dashboard for Real-Time Data Processing in AWS Apache Flink Managed Service
  • Real-Time Streaming Architectures: A Technical Deep Dive Into Kafka, Flink, and Pinot
  • How to Create a Real-Time Scalable Streaming App Using Apache NiFi, Apache Pulsar, and Apache Flink SQL
  • Salesforce Data Cloud: Setting Up and Using the Ingestion API

Trending

  • Building a Zero-Cost Approval Workflow With AWS Lambda Durable Functions
  • Compliance Automated Standard Solution (COMPASS), Part 11: Compliance as Code, the OSCAL MCP Server Way
  • Beyond Manual Annotation: Engineering Self-Correcting Pseudo-Labeling Pipelines
  • LLM-Powered Deep Parsing for Industrial Inventory Search
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Apache Flink Basic Transformation Example

Apache Flink Basic Transformation Example

Apache Flink helps build big data in a efficient and scalable way. Learn how to use it to read data from a file, transform it to uppercase, and write it to another file.

By 
Upender Chinthala user avatar
Upender Chinthala
·
Apr. 10, 18 · Tutorial
Likes (5)
Comment
Save
Tweet
Share
21.6K Views

Join the DZone community and get the full member experience.

Join For Free

Apache Flink is a stream processing framework with added capabilities such as batch processing, graph algorithms, machine learning, reports, and trends insight. Using Apache Flink can help you build a vast amount of data in a very efficient and scalable manner.

In this article, we'll be reading data from a file, transforming it to uppercase, and writing it into a different file.

Gradle Dependencies

dependencies {

    compile "org.apache.flink:flink-java:1.4.2"

    compile "org.apache.flink:flink-streaming-java_2.11:1.4.2"

    compile "org.apache.flink:flink-clients_2.11"

}

Core Concept of Flink API

Image title

When working with the Flink API:

  • DataSource represents a connection to the original data source.
  • Transformation represents what needs to be performed on the events within the data streams. A variety of functions for transforming data are provided, including filtering, mapping, joining, grouping, and aggregating.
  • Data sink triggers the execution of a stream to produce the desired result of the program, such as saving the result to the file system, printing it to the standard output, writing to the database, or writing to some other application.

The data source and data sink components can be set up easily using built-in connectors that Flink provides to different kinds of sources and sinks.

Flink transformations are lazy, meaning that they are not executed until a sink operation is invoked.

Code:

package com.uppi.poc.flink;

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class UpperCaseTransformationApp {

 public static void main(String[] args) throws Exception {
  DataStream < String > dataStream = null;
  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  final ParameterTool params = ParameterTool.fromArgs(args);
  env.getConfig().setGlobalJobParameters(params);
  if (params.has("input") && params.has("output")) {
   //data source
   dataStream = env.readTextFile(params.get("input"));
  } else {
   System.err.println("No input specified. Please run 'UpperCaseTransformationApp --input <file-to-path> --output <file-to-path>'");
   return;
  }
  if (dataStream == null) {
   System.err.println("DataStream created as null, check file path");
   System.exit(1);
   return;
  }
  //transformation
  SingleOutputStreamOperator < String > soso = dataStream.map(String::toUpperCase);
  //data sink
  soso.writeAsText(params.get("output"), WriteMode.OVERWRITE);
  env.execute("read and write");
 }

}

DataSet API Transformation

As you can see, dataStream is initialized as null but later, we will create it.

DataStream<String> dataStream=null;

Initializing Flink Environment

The next step is to initialize stream execution environment by calling this helper method:

StreamExecutionEnvironment. getExecutionEnvironment()

Flink figures out which environment you submitted (whether it is a local environment or cluster environment).

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

Data Stream Creation 

To get the data stream from the data source, just call the built-in Flink API method readTextFile() from StreamExecutionEnvironment. This method reads file content from a given file and returns it as a dataStream object.

Example:

dataStream.readTextFile(params.get("input"));

ParameterTools

The ParamerterTools class represents user command line arguments. Example:

$ flink run  flink-basic-example-1.0.jar --input c:\tools\input.txt--output c:\tools\output.txt

We need to send all command line arguments to execution environment by calling:

env.getConfig().setGlobalJobParameters(params);

Writing Output to Data Sink

printn(): The data stream print method writes each entity of the data stream to a Flink log file.

writeAsText(): This method has two arguments: the first argument is the output file/path and the second argument is writer mode.

Example:

soso.writeAsText(params.get("output"),WriteMode.OVERWRITE);

Trigger Flow Execution

All actions specified earlier will happen. If you don’t call the execute method, your program will complete without doing anything. When calling the execute method, you can specify the name of the job. Example:

env.execute("read and write");

Running Flink Application

Step 1: Clone the project from GitHub and run the Gradle command >  gradlew clean build. Once the build is a success, it generates a flink-basic-example-1.0.jar file in the current project folder's /build/libs directory.

Step 2: Run the Flink server on Windows with start-local.bat.

Image title

Step 3: Run your own Flink application command line by going to the Flink installation folder and type the following command:

flink run <path-to-jar-file> --input <path-to-file> --output <path-to-file>

Example:

Image title

Image title

input.txt:

training in Big Data Hadoop, Apache Spark, Apache Flink, Apache Kafka, Hbase, Apache Hadoop Admin

output.txt:

TRAINING IN BIG DATA HADOOP, APACHE SPARK, APACHE FLINK, APACHE KAFKA, HBASE, APACHE HADOOP ADMIN

Step 5: Running the Flink application using the Flink web UI. Typelocalhost:8081(by default, Flink runs on port 8081).

Image title

Click Submit New Job > Add New Task and upload generated JAR file. This JAR lets you generate with:

gradlew clean build (jar location  - /build/libs)

Image title

Just enter the program arguments input box as:

--input <path-to-file> --output <path-to-file>

And hit Submit. After running the job, you will see the below screen if you hit Completed Jobs.

Image title

Here's the GitHub link.

Apache Flink Data stream

Opinions expressed by DZone contributors are their own.

Related

  • Leveraging Apache Flink Dashboard for Real-Time Data Processing in AWS Apache Flink Managed Service
  • Real-Time Streaming Architectures: A Technical Deep Dive Into Kafka, Flink, and Pinot
  • How to Create a Real-Time Scalable Streaming App Using Apache NiFi, Apache Pulsar, and Apache Flink SQL
  • Salesforce Data Cloud: Setting Up and Using the Ingestion API

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook