Streamdrill and the Top-K-Problem
Streamdrill and the Top-K-Problem
Join the DZone community and get the full member experience.Join For Free
Hortonworks Sandbox for HDP and HDF is your chance to get started on learning, developing, testing and trying out new features. Each download comes preconfigured with interactive tutorials, sample data and developments from the Apache community.
In my last post I’ve discussed what streamdrill does: It solves the top-k problem in real-time which consists in counting activities of different event times over a certain time interval.
So far, so good, but you might wonder why you couldn’t just do this by yourself. After all, it’s just counting, right? Actually, the problem that streamdrill solves is simple, as long as:
- the data volume is small
- the time windows are small compared to the data volume
- the number different kinds of events is small and bounded
- you don’t need the results in real-time
But as soon as you have millions of events per day, wish to aggregate over days, weeks, or even months, and potentially have several million of different types of events, the problem gets quite complicated.
You may end up in such a situation faster than you think:
Event rates may rise up beyond what you originally envisioned (in particular if you’re successful. ;-))
The number of different event types may explode. This might either be because the underlying sets are already large (say, IP addresses, or users in a social network), or because you are tracking combinations such that the sizes multiply.
Let’s dig deeper into this problem to better understand why this problem quickly gets hard.
One thing you need to keep in mind is that the other solutions we will discuss still involve some amount of coding. So if you compare streamdrill against Hadoop, you would need to do a non-trivial amount of coding for Hadoop, because Hadoop is a general purpose framework taking care of the scaling, but doesn’t solve the top-k problem out of the box.
|streamdrill||Standard SQL||Counters||Stream Processing|
|solves top-k out of the box||✓||✕||✕||✕|
|focusses computation on "hot set"||✓||✕||✕||✕|
|memory based for high throughput||✓||✕||✕||✓|
|scales to cluster||(✓ ⌚)||✕||✓||✓|
|exact results||(✓ ⌚)||✓||✓||✓|
✓ = yes, ✕ = no, (✓) = possible, (✓ $$$) = possible, but expensive, (✓ ⌚) = possible, but not yet.
Approach 1: Store and crunch later
Let’s start by using a traditional approach based on some SQL database. To do that, you would create a table with a timestamp column and columns to reflect the fields in your data. Then you would just pipe in each event into the database. To get the counts you would run something like
SELECT count(*) FROM events WHERE timestamp > '2012-12-01 00:00' AND timestamp < '2012-12-31 23:59'potentially also grouping to focus on certain types of events, and adding an
ORDER BY count(*) clause to get the most active elements.
There a number of problems with this approach:
- If you use a normal disk based database, you will be able to add only a few hundred, at best thousands, events per second,
- As you add more data, your database will become slower over time (and you will be adding a lot of data!). Leo has a number of nice benchmarks on his blog for MongoDB and Cassandra insertion performance.
- Just adding the data is not enough, you also need to crunch the whole data to compute the activities. But the longer the time window, the longer will the query take to run.
- While the query runs, there will be considerable load on your server, making the addition of events even slower.
- Eventually, you will get so slow that your results will already be a few minutes or even hours old once you get them. Hardly real-time.
What’s more, you’re probably only interested in the top 100 active elements, so most of the computation is spent on data you’re not interested in.
In other words, putting the data into some big database and crunching it later won’t really scale. If you’ve got a lot of money on the side, you can employ some form of clustering using map reduce or a similar approach to bring down the times, but the bottom line is the same: You crunch a lot of data which you don’t really need, and the problem will only become harder if you get more and more data. And “harder” also means a lot of coding and operations work (which you don’t have if you just use streamdrill ;)).
Approach 2: Just keeping the counters
So just storing the data away and crunching it later won’t work. So how about doing the counting on the spot? That way, you wouldn’t have to store all those duplicate events. Note that just keeping the counters isn’t sufficient, you also need to maintain the global index such that you can quickly identify the top-k entries.
Using a complex event processing framework like Esper would also be a way to reduce the coding load, as Esper comes with a nice query language which let’s you formulate averages over time windows in compact way.
Let’s assume your data doesn’t fit into memory anymore (otherwise it won’t be Big Data, right?). One option is to again store the counters in a database. However, just as in the previous example this restricts the number of updates you can handle. Also, you will generate a lot of changes on the database and not all databases handle that amount of write throughput gracefully. For example, Cassandra only marks old entries for deletion and cleans up during the compaction phases. In our experience, such compactions will eventually take hours and put significant load on your system, cutting the throughput in half.
And again, most of the time is spent on elements which will never make the top-k entries.
Approach 3: Stream processing frameworks
Instead of keeping counters in a database, you could also try and scale out using a stream processing framework like Twitter’s Storm, or Yahoo’s S4. Such systems let you define the computation tasks in the form of small worker threads which are then distributed over a cluster automatically by the framework, also keeping the counters in memory.
While this looks appealing (and in fact, allows you to scale to several hundred thousand events per second), note that this only solves the counting part, but not the global index of all activities. Computing that in a way which scales is non-trivial. You can collect the counter updates at a worker thread which then maintains the index, but what if it doesn’t fit into memory? You could partition the index, but then you’d have to aggregate the data to compute queries, and you’d have to do this yourself, so again, a lot of complexity. The above stream processing frameworks also don’t come with easy support for query, so you’d need to build some infrastructure to collect the results yourself.
And again, you also have a lot of computation for elements which will never show up in your top 100.
While conceptually simple (in the end, you just count, right), the top-k problem which streamdrill addresses becomes hard if there are more things to count than fit into memory, and the event rate is higher than what you can write to disk.
Finally, let’s discuss some of the differences:
As streamdrill is memory based, all the data is lost when streamdrill crashes or is restarted. However, we already have functionality in the engine to write snapshots of all the data, but those aren’t available yet via the API streamdrill.
Right now, streamdrill does not support clustering. We just haven’t found it necessary so far, but it’s something that is possible and will be included soon.
Finally, as I’m going to explain in more depth in the next post, streamdrill is based on approximate algorithms which trade exactness versus performance. Again, if exactness is really an issue, you can get it by combining with one of the other technologies. This is possible, but not our top priority for now.
In the next post, I’ll explain how streamdrill approaches the problem.
Published at DZone with permission of Mikio Braun , DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.