Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Real-Time Streaming Pattern: Joining Event Streams

DZone's Guide to

Real-Time Streaming Pattern: Joining Event Streams

In this post, we look at data processing patterns used to build event triggered streaming applications, focusing on joining event streams.

· Big Data Zone ·
Free Resource

Hortonworks Sandbox for HDP and HDF is your chance to get started on learning, developing, testing and trying out new features. Each download comes preconfigured with interactive tutorials, sample data and developments from the Apache community.

Introduction

This week I will continue series of posts to looking at data processing patterns used to build event triggered streaming applications, focusing on joining event streams. I'll cover some related use cases and how you would go about implementing within Wallaroo.

This purpose of these posts is to help you understand the data processing use cases that Wallaroo is best designed to handle and how you can go about building them.

I will be looking at the Wallaroo application builder, the part of your application that hooks into the Wallaroo framework.

Check out my previous posts examining streaming patterns: Triggering Alerts and Preprocessing for Sentiment Analysis.

Pattern: Joining Event Streams

The joining event streams pattern takes multiple data pipelines and joins them to produce a new signal message that can be acted upon by a later process.

This pattern can is used in a variety of use cases. Here are a few examples:

  • Merging data for an individual across a variety of social media accounts.
  • Merging click data from a variety of devices (e.g. mobile and desktop) for an individual user.
  • Tracking locations of delivery vehicles and assets that need to be delivered.
  • Monitoring electronic trading activity for clients on a variety of trading venues.

Use Case

A good example is one that we've looked at in previous Wallaroo posts; Identifying Loyal customers for segmentation.

For the purpose of this post, I've simplified the use case and adapted the application builder code.

The simplified use case is as follows: an email promotion is sent to the individual who clicks on an ad if they have been identified as a loyal customer.

This use case requires two event streams. One that ingests records for identified loyal customers and saves them to a state object. The second ingests a stream of click data. When an identified loyal customer performed an incoming click, that ad click will trigger an email with the promotion.

Wallaroo Application Builder

Overview

Application Diagram

ab = wallaroo.ApplicationBuilder("Joining Streams Example")

ab.new_pipeline("Loyal Customer Stream", wallaroo.TCPSourceConfig(ll_host, ll_port, ll_decoder))
ab.to_state_partition(save_loyal_customer, LoyaltyCustomers, "loyalty customers", extract_customer_key)
ab.done()

ab.new_pipeline("Click Stream",wallaroo.TCPSourceConfig(cc_host, cc_port, cc_decoder))
ab.to_state_partition(check_loyal_click, LoyaltyCustomers, "loyalty customers", extract_customer_key)
ab.to_sink(wallaroo.TCPSinkConfig(out_host, out_port, cc_encoder))
return ab.build()

Wallaroo Application Builder Code

Check out the Wallaroo API reference for detailed information about the application builder and computation options.

Now let's break down and describe the individual lines of the application builder.

ab.new_pipeline("Loyal Customer Stream", wallaroo.TCPSourceConfig(ll_host, ll_port, ll_decoder)) 

Defines the Wallaroo pipeline including the pipeline name, "Loyal Customer Stream" and the source of the data.

ab.to_state_partition(save_loyal_customer, LoyaltyCustomers, "loyalty customers", extract_customer_key) 

This step is a stateful partition that calls a function save_loyal_customer. Since this is a partitioning step, the data for a specific customer would be routed automatically by Wallaroo to where the state object for that customer lives. The partition routing is executed via extract_customer_key.

ab.new_pipeline("Click Stream",wallaroo.TCPSourceConfig(cc_host, cc_port, cc_decoder)) 

Defines the Wallaroo pipeline including the pipeline name, "Click Stream" and the source of the data.

ab.to_state_partition(check_loyal_click, LoyaltyCustomers, "loyalty customers", extract_customer_key, initial_partitions) 

This step makes use of the same stateful partition that was defined in the previous step, but calls a function check_loyal_click that will check to see if the customer who performed the click is indeed a loyal customer.

This is the way that you implement joining in Wallaroo, by having a computation in each pipeline that makes use of a shared state object. Each of these computations will interact with the state object and perform the required join logic.

ab.to_sink(wallaroo.TCPSinkConfig(out_host, out_port, cc_encoder)) 

In the last step, we will pass data out of Wallaroo for further processing. In this case, we will only pass along messages for loyal customers to be processed by an email server external to Wallaroo.

Conclusion

The joining streams pattern is used frequently when building streaming data applications and since Wallaroo allows you to implement any joining logic you require for the join, it is a very powerful model.

Give Wallaroo a Try

We hope that this post has piqued your interest in Wallaroo!

If you are just getting started, we recommend you try our Docker image, which allows you to get Wallaroo up and running in only a few minutes.

Hortonworks Community Connection (HCC) is an online collaboration destination for developers, DevOps, customers and partners to get answers to questions, collaborate on technical articles and share code examples from GitHub.  Join the discussion.

Topics:
big data ,event streams ,real-time streaming ,tutorial ,wallaroo

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}