DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • Leveraging Apache Flink Dashboard for Real-Time Data Processing in AWS Apache Flink Managed Service
  • Real-Time Streaming Architectures: A Technical Deep Dive Into Kafka, Flink, and Pinot
  • How to Create a Real-Time Scalable Streaming App Using Apache NiFi, Apache Pulsar, and Apache Flink SQL
  • Apache Flink: Full Checkpoint vs Incremental Checkpoint

Trending

  • Why Database Migrations Take Months and How to Speed Them Up
  • *You* Can Shape Trend Reports: Join DZone's Software Supply Chain Security Research
  • Build Your First AI Model in Python: A Beginner's Guide (1 of 3)
  • How to Build Local LLM RAG Apps With Ollama, DeepSeek-R1, and SingleStore
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Apache Flink Basic Transformation Example

Apache Flink Basic Transformation Example

Apache Flink helps build big data in a efficient and scalable way. Learn how to use it to read data from a file, transform it to uppercase, and write it to another file.

By 
Upender Chinthala user avatar
Upender Chinthala
·
Apr. 10, 18 · Tutorial
Likes (5)
Comment
Save
Tweet
Share
21.2K Views

Join the DZone community and get the full member experience.

Join For Free

Apache Flink is a stream processing framework with added capabilities such as batch processing, graph algorithms, machine learning, reports, and trends insight. Using Apache Flink can help you build a vast amount of data in a very efficient and scalable manner.

In this article, we'll be reading data from a file, transforming it to uppercase, and writing it into a different file.

Gradle Dependencies

dependencies {

    compile "org.apache.flink:flink-java:1.4.2"

    compile "org.apache.flink:flink-streaming-java_2.11:1.4.2"

    compile "org.apache.flink:flink-clients_2.11"

}

Core Concept of Flink API

Image title

When working with the Flink API:

  • DataSource represents a connection to the original data source.
  • Transformation represents what needs to be performed on the events within the data streams. A variety of functions for transforming data are provided, including filtering, mapping, joining, grouping, and aggregating.
  • Data sink triggers the execution of a stream to produce the desired result of the program, such as saving the result to the file system, printing it to the standard output, writing to the database, or writing to some other application.

The data source and data sink components can be set up easily using built-in connectors that Flink provides to different kinds of sources and sinks.

Flink transformations are lazy, meaning that they are not executed until a sink operation is invoked.

Code:

package com.uppi.poc.flink;

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class UpperCaseTransformationApp {

 public static void main(String[] args) throws Exception {
  DataStream < String > dataStream = null;
  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  final ParameterTool params = ParameterTool.fromArgs(args);
  env.getConfig().setGlobalJobParameters(params);
  if (params.has("input") && params.has("output")) {
   //data source
   dataStream = env.readTextFile(params.get("input"));
  } else {
   System.err.println("No input specified. Please run 'UpperCaseTransformationApp --input <file-to-path> --output <file-to-path>'");
   return;
  }
  if (dataStream == null) {
   System.err.println("DataStream created as null, check file path");
   System.exit(1);
   return;
  }
  //transformation
  SingleOutputStreamOperator < String > soso = dataStream.map(String::toUpperCase);
  //data sink
  soso.writeAsText(params.get("output"), WriteMode.OVERWRITE);
  env.execute("read and write");
 }

}

DataSet API Transformation

As you can see, dataStream is initialized as null but later, we will create it.

DataStream<String> dataStream=null;

Initializing Flink Environment

The next step is to initialize stream execution environment by calling this helper method:

StreamExecutionEnvironment. getExecutionEnvironment()

Flink figures out which environment you submitted (whether it is a local environment or cluster environment).

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

Data Stream Creation 

To get the data stream from the data source, just call the built-in Flink API method readTextFile() from StreamExecutionEnvironment. This method reads file content from a given file and returns it as a dataStream object.

Example:

dataStream.readTextFile(params.get("input"));

ParameterTools

The ParamerterTools class represents user command line arguments. Example:

$ flink run  flink-basic-example-1.0.jar --input c:\tools\input.txt--output c:\tools\output.txt

We need to send all command line arguments to execution environment by calling:

env.getConfig().setGlobalJobParameters(params);

Writing Output to Data Sink

printn(): The data stream print method writes each entity of the data stream to a Flink log file.

writeAsText(): This method has two arguments: the first argument is the output file/path and the second argument is writer mode.

Example:

soso.writeAsText(params.get("output"),WriteMode.OVERWRITE);

Trigger Flow Execution

All actions specified earlier will happen. If you don’t call the execute method, your program will complete without doing anything. When calling the execute method, you can specify the name of the job. Example:

env.execute("read and write");

Running Flink Application

Step 1: Clone the project from GitHub and run the Gradle command >  gradlew clean build. Once the build is a success, it generates a flink-basic-example-1.0.jar file in the current project folder's /build/libs directory.

Step 2: Run the Flink server on Windows with start-local.bat.

Image title

Step 3: Run your own Flink application command line by going to the Flink installation folder and type the following command:

flink run <path-to-jar-file> --input <path-to-file> --output <path-to-file>

Example:

Image title

Image title

input.txt:

training in Big Data Hadoop, Apache Spark, Apache Flink, Apache Kafka, Hbase, Apache Hadoop Admin

output.txt:

TRAINING IN BIG DATA HADOOP, APACHE SPARK, APACHE FLINK, APACHE KAFKA, HBASE, APACHE HADOOP ADMIN

Step 5: Running the Flink application using the Flink web UI. Typelocalhost:8081(by default, Flink runs on port 8081).

Image title

Click Submit New Job > Add New Task and upload generated JAR file. This JAR lets you generate with:

gradlew clean build (jar location  - /build/libs)

Image title

Just enter the program arguments input box as:

--input <path-to-file> --output <path-to-file>

And hit Submit. After running the job, you will see the below screen if you hit Completed Jobs.

Image title

Here's the GitHub link.

Apache Flink Data stream

Opinions expressed by DZone contributors are their own.

Related

  • Leveraging Apache Flink Dashboard for Real-Time Data Processing in AWS Apache Flink Managed Service
  • Real-Time Streaming Architectures: A Technical Deep Dive Into Kafka, Flink, and Pinot
  • How to Create a Real-Time Scalable Streaming App Using Apache NiFi, Apache Pulsar, and Apache Flink SQL
  • Apache Flink: Full Checkpoint vs Incremental Checkpoint

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends: