DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Because the DevOps movement has redefined engineering responsibilities, SREs now have to become stewards of observability strategy.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Related

  • Leveraging Apache Flink Dashboard for Real-Time Data Processing in AWS Apache Flink Managed Service
  • Real-Time Streaming Architectures: A Technical Deep Dive Into Kafka, Flink, and Pinot
  • How to Create a Real-Time Scalable Streaming App Using Apache NiFi, Apache Pulsar, and Apache Flink SQL
  • Apache Flink: Full Checkpoint vs Incremental Checkpoint

Trending

  • Traditional Testing and RAGAS: A Hybrid Strategy for Evaluating AI Chatbots
  • GitHub Copilot's New AI Coding Agent Saves Developers Time – And Requires Their Oversight
  • AI Agents: A New Era for Integration Professionals
  • Go 1.24+ Native FIPS Support for Easier Compliance
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Apache Flink Basic Transformation Example

Apache Flink Basic Transformation Example

Apache Flink helps build big data in a efficient and scalable way. Learn how to use it to read data from a file, transform it to uppercase, and write it to another file.

By 
Upender Chinthala user avatar
Upender Chinthala
·
Apr. 10, 18 · Tutorial
Likes (5)
Comment
Save
Tweet
Share
21.2K Views

Join the DZone community and get the full member experience.

Join For Free

Apache Flink is a stream processing framework with added capabilities such as batch processing, graph algorithms, machine learning, reports, and trends insight. Using Apache Flink can help you build a vast amount of data in a very efficient and scalable manner.

In this article, we'll be reading data from a file, transforming it to uppercase, and writing it into a different file.

Gradle Dependencies

dependencies {

    compile "org.apache.flink:flink-java:1.4.2"

    compile "org.apache.flink:flink-streaming-java_2.11:1.4.2"

    compile "org.apache.flink:flink-clients_2.11"

}

Core Concept of Flink API

Image title

When working with the Flink API:

  • DataSource represents a connection to the original data source.
  • Transformation represents what needs to be performed on the events within the data streams. A variety of functions for transforming data are provided, including filtering, mapping, joining, grouping, and aggregating.
  • Data sink triggers the execution of a stream to produce the desired result of the program, such as saving the result to the file system, printing it to the standard output, writing to the database, or writing to some other application.

The data source and data sink components can be set up easily using built-in connectors that Flink provides to different kinds of sources and sinks.

Flink transformations are lazy, meaning that they are not executed until a sink operation is invoked.

Code:

package com.uppi.poc.flink;

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class UpperCaseTransformationApp {

 public static void main(String[] args) throws Exception {
  DataStream < String > dataStream = null;
  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  final ParameterTool params = ParameterTool.fromArgs(args);
  env.getConfig().setGlobalJobParameters(params);
  if (params.has("input") && params.has("output")) {
   //data source
   dataStream = env.readTextFile(params.get("input"));
  } else {
   System.err.println("No input specified. Please run 'UpperCaseTransformationApp --input <file-to-path> --output <file-to-path>'");
   return;
  }
  if (dataStream == null) {
   System.err.println("DataStream created as null, check file path");
   System.exit(1);
   return;
  }
  //transformation
  SingleOutputStreamOperator < String > soso = dataStream.map(String::toUpperCase);
  //data sink
  soso.writeAsText(params.get("output"), WriteMode.OVERWRITE);
  env.execute("read and write");
 }

}

DataSet API Transformation

As you can see, dataStream is initialized as null but later, we will create it.

DataStream<String> dataStream=null;

Initializing Flink Environment

The next step is to initialize stream execution environment by calling this helper method:

StreamExecutionEnvironment. getExecutionEnvironment()

Flink figures out which environment you submitted (whether it is a local environment or cluster environment).

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

Data Stream Creation 

To get the data stream from the data source, just call the built-in Flink API method readTextFile() from StreamExecutionEnvironment. This method reads file content from a given file and returns it as a dataStream object.

Example:

dataStream.readTextFile(params.get("input"));

ParameterTools

The ParamerterTools class represents user command line arguments. Example:

$ flink run  flink-basic-example-1.0.jar --input c:\tools\input.txt--output c:\tools\output.txt

We need to send all command line arguments to execution environment by calling:

env.getConfig().setGlobalJobParameters(params);

Writing Output to Data Sink

printn(): The data stream print method writes each entity of the data stream to a Flink log file.

writeAsText(): This method has two arguments: the first argument is the output file/path and the second argument is writer mode.

Example:

soso.writeAsText(params.get("output"),WriteMode.OVERWRITE);

Trigger Flow Execution

All actions specified earlier will happen. If you don’t call the execute method, your program will complete without doing anything. When calling the execute method, you can specify the name of the job. Example:

env.execute("read and write");

Running Flink Application

Step 1: Clone the project from GitHub and run the Gradle command >  gradlew clean build. Once the build is a success, it generates a flink-basic-example-1.0.jar file in the current project folder's /build/libs directory.

Step 2: Run the Flink server on Windows with start-local.bat.

Image title

Step 3: Run your own Flink application command line by going to the Flink installation folder and type the following command:

flink run <path-to-jar-file> --input <path-to-file> --output <path-to-file>

Example:

Image title

Image title

input.txt:

training in Big Data Hadoop, Apache Spark, Apache Flink, Apache Kafka, Hbase, Apache Hadoop Admin

output.txt:

TRAINING IN BIG DATA HADOOP, APACHE SPARK, APACHE FLINK, APACHE KAFKA, HBASE, APACHE HADOOP ADMIN

Step 5: Running the Flink application using the Flink web UI. Typelocalhost:8081(by default, Flink runs on port 8081).

Image title

Click Submit New Job > Add New Task and upload generated JAR file. This JAR lets you generate with:

gradlew clean build (jar location  - /build/libs)

Image title

Just enter the program arguments input box as:

--input <path-to-file> --output <path-to-file>

And hit Submit. After running the job, you will see the below screen if you hit Completed Jobs.

Image title

Here's the GitHub link.

Apache Flink Data stream

Opinions expressed by DZone contributors are their own.

Related

  • Leveraging Apache Flink Dashboard for Real-Time Data Processing in AWS Apache Flink Managed Service
  • Real-Time Streaming Architectures: A Technical Deep Dive Into Kafka, Flink, and Pinot
  • How to Create a Real-Time Scalable Streaming App Using Apache NiFi, Apache Pulsar, and Apache Flink SQL
  • Apache Flink: Full Checkpoint vs Incremental Checkpoint

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!