DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Related

  • MongoDB to Couchbase: An Introduction to Developers and Experts
  • NoSQL for Relational Minds
  • Unveiling the Clever Way: Converting XML to Relational Data
  • Keep Calm and Column Wise

Trending

  • Infrastructure as Code (IaC) Beyond the Basics
  • Ethical AI in Agile
  • A Deep Dive Into Firmware Over the Air for IoT Devices
  • The Full-Stack Developer's Blind Spot: Why Data Cleansing Shouldn't Be an Afterthought
  1. DZone
  2. Data Engineering
  3. Databases
  4. An Introduction to Temporal Data Handling With Apache Flink

An Introduction to Temporal Data Handling With Apache Flink

Learn how to express continuous stream enrichment in relational and time-varying terms using Flink.

By 
Marta Paes user avatar
Marta Paes
·
May. 24, 19 · Tutorial
Likes (6)
Comment
Save
Tweet
Share
13.2K Views

Join the DZone community and get the full member experience.

Join For Free

Gone are the days when developers battled hard to figure out how to manage and model temporal data for effective point-in-time analysis. The introduction of temporal tables in the SQL standard, in 2011, made it easier to validate or enrich data against dynamically changing, historical datasets. Prior to that, users were doomed to implement this as part of the application logic, often hurting the length of the development lifecycle as well as the maintainability of the code.

temporal data, temporal tables, Apache Flink, stream processing

For example: given a stream with Taxi Fare events tied to the local currency of the ride location, we might want to convert the fare price to a common currency for further processing. As conversion rates excel at fluctuating over time, each Taxi Fare event would need to be matched to the rate that was valid at the time the event occurred in order to produce a reliable result.

Modeling Temporal Data With Apache Flink

Flink 1.7.0 introduced the concept of temporal tables into its streaming SQL and Table API: parameterized views on append-only tables — or, any table that only allows records to be inserted, never updated, or deleted — that are interpreted as a changelog and keep data closely tied to time context, so that it can be interpreted as valid only within a specific period of time. There are two basic requirements to “elevate” a stream into a temporal table:

  1. Define a primary key and a versioning field that can be used to keep track of the changes that happen over time.

  2. Expose the stream as a temporal table function that maps each point in time to a static relation.

Going back to our example use case, a temporal table is just what we need to model the conversion rate data such as to make it useful for point-in-time querying. Temporal table functions are implemented as an extension of Flink’s generic table function class and can be defined in the same straightforward way to be used with the Table API or SQL parser.

import org.apache.flink.table.functions.TemporalTableFunction;

(...)

// Get the stream and table environments.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(env);

// Provide a sample static data set of the rates history table.
List <Tuple2<String, Long>>ratesHistoryData =new ArrayList<>();

ratesHistoryData.add(Tuple2.of("USD", 102L)); 
ratesHistoryData.add(Tuple2.of("EUR", 114L)); 
ratesHistoryData.add(Tuple2.of("YEN", 1L)); 
ratesHistoryData.add(Tuple2.of("EUR", 116L)); 
ratesHistoryData.add(Tuple2.of("USD", 105L));

// Create and register an example table using the sample data set.
DataStream<Tuple2<String, Long>> ratesHistoryStream = env.fromCollection(ratesHistoryData);

Table ratesHistory = tEnv.fromDataStream(ratesHistoryStream, "r_currency, r_rate, r_proctime.proctime");

tEnv.registerTable("RatesHistory", ratesHistory);

// Create and register the temporal table function "rates".
// Define "r_proctime" as the versioning field and "r_currency" as the primary key.
TemporalTableFunction rates = ratesHistory.createTemporalTableFunction("r_proctime", "r_currency");

tEnv.registerFunction("Rates", rates);

(...)

What does this Rates function do, in practice? Imagine we would like to check what the conversion rates looked like at a given time — say, 11:00. We could simply do something like:

SELECT * FROM Rates('11:00');

Even though Flink does not yet support querying temporal table functions with a constant time attribute parameter, these functions can be used to cover a much more interesting scenario: temporal table joins.

Streaming Joins Using Temporal Tables

Temporal tables reach their full potential when used in combination — erm, joined — with streaming data, for instance to power applications that must continuously whitelist against a reference dataset that changes over time for auditing or regulatory compliance. Efficient joins have long been an enduring challenge for query processors due to their computational cost and resource consumption; joins over streaming data carry a couple more challenges:

  • The unbounded nature of streams means that inputs are continuously evaluated and intermediate join results can consume memory resources indefinitely. Flink gracefully manages its memory consumption out-of-the-box (even for heavier cases where joins require spilling to disk) and supports time-windowed joins to bound the amount of data that needs to be kept as state.
  • Streaming data might be out-of-order and late, so it is not possible to enforce an ordering upfront and time handling requires some thinking to avoid unnecessary outputs and retractions.

In the particular case of temporal data, time-windowed joins are not enough (well, at least not without getting into some expensive tweaking): sooner or later, each reference record will fall outside of the window and be wiped from state, making unavailable for future join results. To address this limitation, Flink has introduced support for temporal table joins to cover time-varying relations.

temporal data, temporal tables, Apache Flink, stream processing

Each record from the append-only table on the probe side ( Taxi Fare ) is joined with the version of the record from the temporal table on the build side ( Conversion Rate ) that matches the probe side record time attribute ( time ) for the same value of the primary key ( currency ). Remember the temporal table function ( Rates ) we registered earlier? It can now be used to express this join as a simple SQL statement that would otherwise require a heavier statement with a subquery.

Regular Join

Temporal Table Join

SELECT SUM(tf.price * rh.rate) AS conv_fare
FROM taxiFare AS tf,
     ratesHistory AS rh
WHERE tf.currency = rh.currency
      AND r.time = (SELECT MAX(rh2.time) 
                    FROM ratesHistory AS rh2
                    WHERE rh2.currency = tf.currency 
                    AND rh2.time <= tf.time);
SELECT tf.time, 
       tf.price * rh.rate AS conv_fare 
FROM taxiFare AS tf
LATERAL TABLE(Rates(tf.time)) AS rh
WHERE tf.currency = rh.currency;


Temporal table joins support both processing and event time semantics and effectively limit the amount of data kept in state while allowing records on the build side to be arbitrarily old, as opposed to time-windowed joins. Probe-side records only need to be kept in state for a very short time to ensure correct semantics in case there are out-of-order records. The challenges mentioned at the beginning of this section are overcome by:

  • Narrowing the scope of the join: only the time-matching version of ratesHistory is visible for a given taxiFare.time.
  • Pruning unneeded records from state: for cases using event time, records between the current time and the watermark delay are persisted for both the probe and build side. These are discarded as soon as the watermark arrives and the results are emitted — allowing the join operation to move forward in time and the build table to “refresh” its version in state.

With temporal table joins, it is now possible to express continuous stream enrichment in relational and time-varying terms using Flink without dabbling into syntactic patchwork or compromising performance. Extending this syntax to batch processing for enriching historic data with proper (event) time semantics is also part of the Flink roadmap!


For a deeper dive into the hardships of joining data in streaming environments, watch Piotr Nowojski’s talk at Flink Forward San Francisco 2019: “How to Join Two Data Streams?”; or visit the Apache Flink documentation page. If you would like to get some hands-on practice in joining streams with Flink SQL (and Flink SQL in general), check out this free training for Flink SQL. The training environment is based on Docker and set up in just a few minutes.

Database Data (computing) Relational database Apache Flink Joins (concurrency library)

Published at DZone with permission of Marta Paes. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • MongoDB to Couchbase: An Introduction to Developers and Experts
  • NoSQL for Relational Minds
  • Unveiling the Clever Way: Converting XML to Relational Data
  • Keep Calm and Column Wise

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!