Introducing Apache Flink’s State Processor API

DZone 's Guide to

Introducing Apache Flink’s State Processor API

Flink State Processor to the rescue for all your streaming needs.

· Big Data Zone ·
Free Resource


Flink: a group of 12 or more cows... or this cool stream processing thing

How to Read, Write, and Modify the State of Flink Applications

Having the ability to access the state of Apache Flink applications from the outside has been one of the most in-demand features by Flink users for several years. With the introduction of the State Processor API in Flink 1.9 developers and data engineers now can read, write and modify Flink’s savepoints and checkpoints with a powerful extension of the DataSet API.

In this post, we discuss the importance of this feature for Apache Flink, what you can use it for, and how to use it. Finally, we will detail the future of the State Processor API and how it brings together the community’s plans to evolve Flink into a system for unified batch and stream processing.

You may also like: Real-Time Stream Processing With Apache Kafka Part One.

Stateful Stream Processing With Apache Flink Until Flink 1.9

All non-trivial stream processing applications are stateful, and most of them are designed to run for months or years. Over time, many of them accumulate a lot of valuable states that can be very expensive or even impossible to rebuild if they are lost due to a failure.

In order to guarantee the consistency and durability of application state, Flink featured a sophisticated checkpointing and recovery mechanism from very early on. With every release, the Flink community has added more and more state-related features to improve checkpointing and recovery speed, the maintenance of applications, and practices to manage applications.

Flink developers and software engineers have frequently requested the ability to access the state of an application “from the outside.” Such a feature would enable Flink users to validate or debug the state of an application, migrate the state of one application to another, potentially evolve an application from the Heap State Backend to the RocksDB State Backend, or even import the initial state of an application from an external system like a relational database.

Despite many convincing reasons to expose application state externally, your options to access it have been fairly limited until now. Flink’s Queryable State feature only supports key-lookups (point queries) and does not guarantee the consistency of returned values (the value of a key might be different before and after an application recovered from a failure).

Moreover, the Queryable State does not allow you to add or modify the state of an application. Also, savepoints, which are consistent snapshots of an application’s state, could not address the user’s requirements because they encode the state data with a custom binary format.

State Processor API to Read and Write Application State

The State Processor API that comes with Flink 1.9 is a true game-changer in how you can work with application state! In a nutshell, the feature extends the DataSet API with Input and OutputFormats to read and write savepoint or checkpoint data. Due to the interoperability of DataSet and Table API, you can even use relational Table API or SQL queries to analyze and process state data.

For example, you can take a savepoint of a running stream processing application and analyze it with a DataSet batch program to verify that the application behaves correctly. Or you can read a batch of data from any store, preprocess it, and write the result to a savepoint that you use to bootstrap the state of a streaming application.

It’s also possible to fix inconsistent state entries now. Finally, the State Processor API opens up many ways to evolve a stateful application that were previously blocked by parameter and design choices that could not be changed without losing all the state of the application after it was started. For example, you can now arbitrarily modify the data types of states, adjust the maximum parallelism of operators, split or merge operator state, re-assign operator UIDs, etc.

Mapping Application State to DataSets

The State Processor API maps the state of a streaming application to one or more data sets that can be separately processed. In order to be able to use the API, you need to understand how this mapping works.

Typically, a stateful Flink job is composed of different operators, one or more source operators, a few operators for the actual processing, and one or more sink operators. Each operator runs in parallel in one or more tasks and can work with different types of state.

An operator can have zero, one, or more “operator states” that are organized as lists scoped to the operator’s tasks. If the operator is applied on a keyed stream, it can also have zero, one, or more “keyed states” which are scoped to a key that is extracted from each processed record. You can think of keyed state as a distributed key-value map.

The following figure shows the application “MyApp,” which consists of three operators called “Src,” “Proc,” and “Snk.” Src has one operator state (os1), Proc has one operator state (os2) and two keyed states (ks1, ks2), and Snk is stateless.

State Processor API, Apache Flink

State Processor API, Apache Flink

A savepoint or checkpoint of MyApp consists of the data of all states, organized in a way that the states of each task can be restored. When processing the data of a savepoint (or checkpoint) with a batch job, we need a mental model that maps the data of the individual tasks’ states into data sets or tables. In fact, we can think of a savepoint as a database.

Every operator (identified by its UID) represents a namespace. Each operator state of an operator is mapped to a dedicated table in the namespace with a single column that holds the state’s data of all tasks. All keyed states of an operator are mapped to a single table consisting of a column for the key, and one column for each keyed state. The following figure shows how a savepoint of MyApp is mapped to a database.

State Processor API, Apache Flink

State Processor API, Apache Flink

The figure shows how the values of Src’s operator state are mapped to a table with one column and five rows, one row for all list entries across all parallel tasks of Src. Operator state os2 of the operator “Proc” is similarly mapped to an individual table. The keyed states ks1 and ks2 are combined to a single table with three columns, one for the key, one for ks1 and one for ks2. The keyed table holds one row for each distinct key of both keyed states. Since the operator “Snk” does not have any state, its namespace is empty.

The State Processor API now offers methods to create, load, and write a savepoint. You can read a DataSet from a loaded savepoint or convert a DataSet into a state and add it to a savepoint. DataSets can be processed with the full feature set of the DataSet API. With these building blocks, all of the before-mentioned use cases (and more) can be addressed. The Flink documentation provides more information about how to use the State Processor API in detail.

Why DataSet API?

In case you are familiar with Flink’s roadmap, you might be surprised that the State Processor API is based on the DataSet API. The Flink community plans to extend the DataStream API with the concept of BoundedStreams and deprecate the DataSet API.

When designing this feature, the community evaluated the DataStream API or Table API but neither could provide the right feature set. Since the importance of the feature is paramount for the progress of Flink’s APIs, the community decided to build it on the DataSet API, but kept its dependencies on the DataSet API to a minimum. Hence, migrating it to another API should be fairly easy.


Flink users have requested a feature to access and modify the state of streaming applications from the outside for a long time. With the State Processor API, Flink 1.9.0 finally exposes the application state as a data format that can be manipulated.

This feature opens up many new possibilities for how users can maintain and manage Flink streaming applications, including arbitrary evolution of stream applications and exporting and bootstrapping of application state. To put it concisely, the State Processor API unlocks the black box that savepoints used to be until now! Excited yet? Get your hands on the latest Flink release and check out the State Processor API!

Related Articles

apache flink, big data, data processing, open souce, open source big data tools, stream processing

Published at DZone with permission of Fabian Hueske . See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}