Storing and Querying Trillions of Events
Storing and Querying Trillions of Events
Last time, we conquered the daunting task of processing billions of events per day. This time, we're doing the unthinkable: processing trillions of events!
Join the DZone community and get the full member experience.Join For Free
This is the second post in series describing our recent infrastructure and architecture transition in Plumbr. The first part focused on the event-capturing part of the architecture. In the current post, we are analyzing how the captured events are stored and are later accessed via the user interface. This post walks you through the motivation for the change and describes the choices we made.
For the readers unfamiliar with what we do, I'll first give some background to give you context. Plumbr is designed to monitor every user interaction with an application. These interactions are captured by Agents deployed in the nodes. The captured interactions are sent to the Server, where they are stored to be queried later.
From such interactions, Plumbr captures different attributes to be stored and queried. This gave us the founding requirements for the data structure with different dimensions:
- Start and end timestamp of the interaction.
- Identity of the user performing the interaction.
- The operation accomplished (add item to shopping cart, create new invoice, etc.).
- Outcome of the interaction (successful, slow, or failed).
- For unsuccessful interaction the root cause(s) in source code.
In addition, every interaction has a number of metrics. An example of such a metric can be the total duration of the interaction in milliseconds.
Besides the structure of the data, the data access use cases are relevant when picking the storage. Following are some examples of how our users access the dataset:
- Show me the daily active users trend for a particular application over the past month.
- What were the top three root causes affecting my site performance last week?
- Compare the current and last week performance of the checkout operation. Present the results via comparing the latency distributions for both periods.
Last category of requirements to take into account was (by no surprise) the non-functional requirements part. From the various NFRs, the trickiest one to fulfill was to make sure we could quickly access vast amounts of data. We had to answer the questions like above from a dataset containing multiple trillions of events. Such answers were to be given in under few seconds.
The structural and access patterns of the data made it obvious that we are dealing with a textbook definition of time-series data. After acknowledging the fact, it became painfully obvious that the original decision to store everything in a relational database might not have been the best decision.
We went searching for a new storage solution suitable for time-series data. After testing InfluxDB, Cassandra, MonetDB, and Druid against the requirements, we ended up with Druid. In the following sections, I will walk you through the most important concepts in Druid allowing us to fulfill the requirements.
Plumbr answers analytical questions. This makes it possible to base the answers on aggregations instead of individual events. Understanding this, we configured Druid to perform data roll-up during data ingestion. Roll-ups allow us to shift the cost of (some) aggregations and computations to data storage phase instead of the data access phase.
If this sounded too complex, check out the following example. Let's use the following five events representing two different event types (logging in and paying an invoice) all taking place between 12:20 and 12:21 on the same day:
Now, we can roll these five events up to just two entries in the Druid storage:
|Range||Event type||Event count|
|12:20 – 12:21||login()||3|
|12:20 – 12:21||pay()||2|
As a result of the roll-up, we avoided storing individual events. We were able to accomplish this thanks to the event characteristics: instead of storing individual events we rolled the events up to just two individual values in Druid with pre-computed aggregates. The benefit of the roll-up is measurable both in terms of reduced storage requirements and the speedup of the queries.
In our case, the outcome of the roll-up is the reduction of the raw events by ten to hundred-fold depending on the particular application we end up monitoring. The price we had to pay for this is also clear. The minimal granularity of the data access operations is capped at one minute.
Apparently, time-series are…well, dependent on time. So, we have a continuous series of one-minute buckets containing rolled-up data. Most of the queries on such buckets of data perform simple associative aggregations (sum, avg, max, and alike).
The associative nature of the aggregations means that Druid can split the original query into separate chunks, run those subqueries in parallel on multiple nodes and then just combine partial results to calculate the final answer. To give you a better idea about this, let’s consider the following example:
User requests the system to “give me the list of the 10 most used endpoints from the www.example.com application during the last seven days.”
Instead of executing the original query in a single node, Druid Broker will split the query into sub-queries, each requesting data from one day from the seven-day period and executing each sub-query in a different Historical node:
After all the nodes have responded, all that remains is to aggregate the results in the Broker and send it back to the client sending the request.
If this sounds like an implementation of the famous MapReduce algorithm, then you are correct; it indeed is. As a result, you are greeted with significant query speedup with zero effort from the developers. A side effect of the approach is getting close to linear horizontal scaling from your infrastructure. Just add more servers to your cluster, make sure partitioning in time is configured according to usage patterns, and Druid will take care of the rest.
Real-Time Data vs. Historical Data
Druid has a built-in separation of concerns between serving the historical and real-time data. In our case, “real-time” translates to “last hour.” As we receive and process data from Plumbr Agents via different microservices, Druid will be constantly fed with new data points:
A dedicated indexing node (Indexer) consumes the incoming feed. The very same Indexer will be responsible for answering all (sub)queries about the most recent data from the last hour.
Once per hour, this indexing node converts the raw feed into rolled up data and hands it off to nodes where the data will be stored. These nodes are called Historical nodes. The approach allows Druid to efficiently query two datasets with different characteristics:
- Recent data, which is still likely to be changed is queried from raw events in the Indexer node.
- Older data, which is not expected to change, is queried from Historical nodes where the raw events are already rolled up.
The downside of the approach is the write-only format for the historical data. Making changes to the data in historical nodes is not possible. The only way to update the data is the regenerate entire segments from raw event stream
The data processing and storage changes took us six months to implement, test, and roll out. We are still polishing the outcome; major changes like this do not tend to expose the full complexity of the whiteboard and tests. I myself have had my fair share of 2 a.m. alerts when something has not been working as expected.
I cannot even imagine the life without this new architecture. The synchronous processing on the monolith backed by a relational database feels like a nightmare from a distant past I am trying to forget.
Published at DZone with permission of Nikita Salnikov-Tarnovski , DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.