DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Stream Processing and Trending Hashtags

In this tutorial, we'll begin building a cool application that allows you to take in data from the Twitter API to perform stream processing on hastags.

Sean Allen user avatar by
Sean Allen
·
Jun. 29, 18 · Tutorial
Like (3)
Save
Tweet
Share
5.52K Views

Join the DZone community and get the full member experience.

Join For Free

A prospective Wallaroo user contacted us and asked for an example of chaining state computations together so the output of one could be fed into another to take still further action. In particular, their first step was doing aggregation.

Doing chained state computations is a general problem with many applications and is straightforward in Wallaroo. To illustrate the concepts using a realistic yet relatively easy to understand use-case I decided to go with an updated version of a previous blog post. Back in November of 2017, we published an example Wallaroo app that identified top Twitter hashtags in real-time.

My example is a rewrite of the Wallaroo code that powers that example while keeping the supporting Twitter client and Flask-based web application intact.

The original "Trending Hashtags" application differs in a few fundamental ways from our updated example.

First, the original application has no chained state computations. There's a single one. Second, it's not parallelized. There's a single hashtag finder instance and a single state object responsible for tracking the top hashtags.

The Guts

Here's the definition of the data pipeline from our original application:

ab = wallaroo.ApplicationBuilder("Trending Hashtags")

    ab.new_pipeline("Tweets_new", 
      wallaroo.TCPSourceConfig(in_host, in_port, Decoder() ))

    ab.to(HashtagFinder)

    ab.to_stateful(ComputeHashtags(), 
      HashtagsStateBuilder(), "hashtags state")

    ab.to_sink(wallaroo.TCPSinkConfig(out_host, out_port, Encoder()))

to is a non-parallel stateless computation. to_stateful is a non-parallel state computation.

The serialized nature of the original example makes the logic very easy to follow but, will never be able to take advantage of parallelization of computation in Wallaroo.

Getting "top K" in a parallel fashion is straightforward but not something a lot of folks have experience with. What you want to do is break your "top" items into a series of smaller parallel aggregates. Creating many smaller parallel aggregates allows you to handle larger incoming streams of data. Each of those smaller aggregates can output its top K as it changes. These are then sent to a single, non-parallelized aggregate that takes the top K from all the smaller aggregates and manages a true "top K" listing. The assumption is that, for a given time window, there will be far fewer outputs from each "smaller aggregate" than the number of inputs to the start of the pipeline. This final aggregation is going to be a bottleneck. There's nothing we can do about that. Our problem requires it; we can, however, decrease the number of messages it receives by doing as much work as possible in parallel before it.

In Wallaroo this would look like:

 ab = wallaroo.ApplicationBuilder("Trending Hashtags")

    ab.new_pipeline("Tweets", 
      wallaroo.TCPSourceConfig(in_host, in_port, decoder))

    ab.to_parallel(find_hashtags)

    ab.to_state_partition(count_hashtags, HashtagCounts, 
      "raw hashtag counts", extract_hashtag_key, raw_hashtag_partitions)

    ab.to_stateful(top_hashtags, TopTags, "top hashtags")

    ab.to_sink(wallaroo.TCPSinkConfig(out_host, out_port, encoder))

Let's break that apart for folks who aren't familiar with how Wallaroo's Application Builder API works.

We declare a new application called "Trending Hashtags":

ab = wallaroo.ApplicationBuilder("Trending Hashtags") 

That consists of a single data pipeline, "Tweets." This data pipeline will receive data from the Twitter firehose over TCP:

ab.new_pipeline("Tweets", 
  wallaroo.TCPSourceConfig(in_host, in_port, decoder))

The incoming data will be routed to an instance of a parallelized stateless computation find_hashtags. find_hashtags will parse each tweet looking for hashtags:

ab.to_parallel(find_hashtags) 

Any hashtags found in the previous step are sent to a partitioned state computation called count_hashtags. Each partition has its own HashtagCounts object that we use to maintain a listing of hashtags seen and their count. Data partitioning in Wallaroo is controlled by the developer so we supply a list of valid partition keys ( raw_hashtag_partitions) and a function that examines incoming hashtags and extracts a key from them extract_hashtag_key:

ab.to_state_partition(count_hashtags, HashtagCounts, 
  "raw hashtag counts", extract_hashtag_key, raw_hashtag_partitions)

Whenever the "top K" for a given raw hashtag counts changes, a new message will be sent to our final step, a non-parallelized state computation ( top_hashtags) that keeps a listing of the current top K hashtags in a state object TopTags.

ab.to_stateful(top_hashtags, TopTags, "top hashtags") 

Whenever the TopTags managed by top_hashtags changes, a message is output with information about the top tags which is sent to our sink where it is sent out via TCP:

ab.to_sink(wallaroo.TCPSinkConfig(out_host, out_port, encoder)) 

Conclusion

All the code for our Parallel Twitter Trending Hashtags example is available on GitHub. You can clone the code, install your Python and Wallaroo dependencies, supply your Twitter credentials and run it to see it in action.

The Wallaroo specific logic is all in a single file twitter_wallaroo_app.py. Feel free to dive in and check it out. In a couple of weeks, I'm going to publish a post about that looks at how the windowing used to determine trending works in this application.

Stream processing Web application

Published at DZone with permission of Sean Allen, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • Easy Smart Contract Debugging With Truffle’s Console.log
  • Promises, Thenables, and Lazy-Evaluation: What, Why, How
  • PostgreSQL: Bulk Loading Data With Node.js and Sequelize
  • Do Not Forget About Testing!

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends: