DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Related

  • Running Streaming ETL Pipelines with Apache Flink on Zeppelin Notebooks
  • Offline Data Pipeline Best Practices Part 2:Optimizing Airflow Job Parameters for Apache Hive
  • Medallion Architecture: Efficient Batch and Stream Processing Data Pipelines With Azure Databricks and Delta Lake
  • Rails Asynchronous Processing

Trending

  • How To Build Resilient Microservices Using Circuit Breakers and Retries: A Developer’s Guide To Surviving
  • Analyzing Techniques to Provision Access via IDAM Models During Emergency and Disaster Response
  • Distributed Consensus: Paxos vs. Raft and Modern Implementations
  • Using Python Libraries in Java
  1. DZone
  2. Culture and Methodologies
  3. Career Development
  4. Converting a Batch Job to Real-time

Converting a Batch Job to Real-time

In this post, I am going to take an application that would traditionally use batch processing and show how you could make it a real-time streaming application.

By 
Erik Nilsen user avatar
Erik Nilsen
·
Sep. 17, 18 · Tutorial
Likes (1)
Comment
Save
Tweet
Share
6.3K Views

Join the DZone community and get the full member experience.

Join For Free

Introduction

Often called stream processing, real-time processing allows applications to run computations and filter data at any scale. At Wallaroo Labs, we build and offer support for an open souce event-based stream processing framework called Wallaroo. Frameworks like Wallaroo, allow you to do highly parallel computation across clusters of workers without having to worry about any additional complexity.

One of the things we hear from developers who aren’t familiar with stream processing is that they aren’t sure about the use cases. They’re used to using a periodic cron job to do calculations over data at a certain interval. In this post, I am going to take an application that would traditionally use batch processing and show how you could make it a real-time streaming application. This will allow our application to go from periodically triggering our application logic to running the same logic with real-time results.

For this example, imagine that you want to be able to take some data and let users set alerts on this data. Using Django and Celery, I’ve created an application that ingests data from Coinbase using the coinbase-pro btc-usd websocket.

wsClient = coinbaseWebsocketClient()
wsClient.start()
# ...
wsClient.close()

Using the coinbase-pro client, connecting and managing the websocket connection is pretty straightforward. Since we only care about what Bitcoin is selling and bought at, we filter out all the other kinds of transactions. Once these transactions are saved to a SQLite database, we’re able to perform our calculations.

Celery Periodic Task Structure

I chose to use Celery to run our periodic tasks. Setting up Celery was pretty simple, just install the pip package and require the celery and crontab packages. For the purpose of this blog post, our calculation is straightforward. Users set an alert on a price and we send an alert to the client when the average of the last ten minutes of BTC transactions are greater than the specified threshold (you can view the full file here).

@app.task
def notify_on_price():
    avg_price = calculate_average_price()
    alerts = get_alerts(avg_price)
    for alert in alerts:
        notify_user(alert, avg_price)
    return True

Stream Processing Overview

There are quite a few problems with the approach above. Batch jobs are hard to scale and if our jobs were to take longer than 10 minutes to run then things really become a problem. Our users are only getting notifications once every ten minutes. Ideally as soon as the average price of Bitcoin changes, an alert is sent. Imagine if we later decided that we wanted to use this application to purchase and sell bitcoin, we’d certainly need to react to prices much faster.

One way this could be done is by using a stream processor. Rather than batching computation to a larger set of data, we run our application logic on each piece of data individually.

Wallaroo Application Structure

Our application is a perfect use case for Wallaroo. We have data coming from Coinbase and can save the average price and our user’s alerts in Wallaroo as state objects. If you need a refresher on Wallaroo terminology check out our core-concepts.

For this to work, we need to have two different pipelines. One for when we are adding new price data from coinbase and the other to store alert data from our Django application. Pipelines in Wallaroo are how you split up your application logic. Each pipeline has its own source, and messages from the source are processed sequentially through the pipeline’s computations. Computations can access both the state inside its own pipeline and the state outside of its pipeline. This is how updates to buy/sell prices always read the most up-to-date alert settings that are set by a separate pipeline.

High level Wallaroo architecture

Normally, running application logic on each piece of data as it flows through would be considered expensive and we might batch operations to save time or resources. Stream processors like Wallaroo make this style of computation fast through parallelism and scaling ability.

Let’s take a quick look at a few pieces of code to show what the difference between both applications are. The full application is available here.

class Alerts(object):
    def __init__(self):
        self.alerts = dict()

Rather than access our alerts from the database like we did in the Celery example, our Wallaroo application initializes an Alerts object that stores our alerts in a Python dictionary. Additionally, we provide two methods to access this object: the ability to add and remove from our dictionary. Alerts.alerts eventually will look like this {"price_to_notify": [user_id1, user_id2, ...]}.

class BTCPrice(object):
    def __init__(self):
        self.count = 0
        self.total = decimal.Decimal()
        self.average = decimal.Decimal()

Our price object also looks a bit different than our Celery example. With Celery, we were using SQLite’s AVG function to take the average of all the prices that came in a predefined time interval. In our Wallaroo application, I keep a count of the number of results we’ve seen so far, the total, and the current average. The average is calculated by dividing the total by the count. It’s a fairly basic calculation but you could use any Python library to do this as well. Things like Pandas and NumPy work great with Wallaroo.

The computation logic is very similar to what we we’re doing with Celery. The Wallaroo computations (view the full file here) may be more explicit but both extract the price data from Coinbase, calculate the average price, and then check to see if any users’ alert thresholds were crossed.

def maybe_send_alerts_based_on_average_price(btc_price, alerts):
# ...
    for (k,v) in alerts.alerts.items():
        if decimal.Decimal(k) <= btc_price.average:
            notify[k] = list(v)
            alerts.remove(k)
# ...
    return (None, False)

Even though the maybe_send_alerts_based_on_average_price function is called in the pipeline responsible for keeping track of the average BTC price, we are able to pass our Alerts object to this pipeline. This means that we are always using the most recent dictionary of alerts rather than needing to query for all of our alerts that match a certain criteria.

If you haven’t already, go ahead and try running this application on your own. Clone the repository here and start messing around with different intervals or add the ability to set alerts on eth-usd on the same pipeline.

Conclusion

As you can see, while there are a few differences between our Celery logic and our Wallaroo logic, the advantages between batching up our computation and running our application logic using a stream processor are quite large. We’re able to go from running our logic periodically to receiving notifications in real-time.

Wallaroo allows us to avoid all the problems we first talked about. We went from running somewhat simple logic every ten minutes to being able to react to prices in real-time. This is great if we wanted to add more functionality to our application, like buying and selling on our behalf or viewing real-time charts of this data.

There are many different use-cases for wanting to use a stream processor over batch processing. Many of which have been covered as examples in our blog.

Thanks to my coworkers Simon, Nisan, Andy, and Jonathan for providing feedback on both the blog post and the application.

application career Stream processing Data (computing) Celery (software) Pipeline (software)

Published at DZone with permission of Erik Nilsen, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Running Streaming ETL Pipelines with Apache Flink on Zeppelin Notebooks
  • Offline Data Pipeline Best Practices Part 2:Optimizing Airflow Job Parameters for Apache Hive
  • Medallion Architecture: Efficient Batch and Stream Processing Data Pipelines With Azure Databricks and Delta Lake
  • Rails Asynchronous Processing

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!