Distributed Data Querying With Alluxio
This blog is about how I used Alluxio to reduce p99 and p50 query latencies and optimized the overall platform costs for a distributed querying application.
Join the DZone community and get the full member experience.Join For Free
This blog is about how I used Alluxio to reduce p99 and p50 query latencies and optimized the overall platform costs for a distributed querying application. I walk through the product and architecture decisions that lead to our final architecture, discuss the tradeoffs, share some statistics on the improvements, and discuss future improvements to the system.
The product I worked on was a business analytics dashboard. After logging in, users had access to hundreds of metrics that they could “slice and dice” in any way they found interesting. The overall corpus of data users could was in the neighborhood of 300 TB, but each user had access to datasets between 100GB and 1 TB. The application supported around 16,000 concurrent requests across hundreds of unique datasets. While most of the queries users performed were aggregations (~96%), the endless permutations of queries meant the majority of queries would be over the entire data set for that user rather than an aggregated summary of that data.
Before using Alluxio, we directly queried user data stored in S3 with our Apache Spark Cluster running on Mesos. We issued queries via Apache Livy, which provided a friendly HTTP interface to manage the Spark jobs and their lifecycle and used Sparks JDBC/Thrift interface to perform live queries on the data. We wrote a simple API interface between Livy and the front end application to transmit interactions from the UI into queries in Spark and completing the lifecycle. I illustrated this in the figure below.
It’s worth going into a bit of detail about how we managed the lifecycle of the Spark tables. Effectively, each user had their own in-memory tables in Spark. These tables were created by materializing data from S3 into Spark tables with the
.saveAsTablemethod. User data was stored in one embarrassingly wide table. Once in Spark, the API server sent JDBC queries to Spark, completed them and send them to our API server to return them to the UI.
Most of the complications with this architecture arise from when the Spark tables needed to be refreshed. Reading data from S3 was an expensive operation and we tried to do it only when completely necessary. We built a simple system based off our upload to s3 architecture that would send state changes to the s3 data to our API server. The API server kept track of the state of each s3 path and used some logic to determine when to refresh the data in Spark. When this task was orchestrated perfectly users did not notice, however, if we needed to update the data for a user query or if the data was being updated the same time a user query was being sent the latencies would balloon.
As for performance characteristics of this system, our average query latency was 5 seconds, and 99th percentile query latency was 20 seconds. The difficulty with this measurement is that not all queries are created equally. If data was stored in Spark tables and didn’t need to be refreshed we would experience the average latency for most queries, however, in cases when data needed to be refreshed, users experienced an unacceptably high latency. My task was to reduce that tail latency and I looked for technologies that would help us do this with the least overhead.
- Reduce time transferring data from S3 to Spark.
- Decrease p99 latency.
- Stretch: Increase average latency.
I evaluated many systems to accomplish the goals of this project, including both open source and proprietary solutions. One of the most significant influences on the technology we chose for this architecture enhancement was our requirement of separation of data into single tenants for storage. With this requirement, we eliminated most database-oriented solutions because running a fleet of databases for each client was more than we wanted to take on at the time. The most natural solution was some cache in front of S3 where we could keep the data on disk separated by user and when in Spark they could materialize as separate tables in-memory. We were also resource constrained and the thought of re-writing our entire application to work with a new datastore wasn’t something we were interested in taking on.
Alluxio is a distributed data layer designed for in-memory workloads. In the simplest sense, it’s a distributed in-memory cache that you can put in front of your object stores and distributed file systems. I first learned about Alluxio being used at Didi and Baidu from conference talks I attended. I searched the internet relentlessly to find something bad said about Alluxio and I couldn't find anything substantial other than it being written in Java (eye roll). Around this time I put together a proof of concept to see how easy it was to cache objects and invalidate the cache and Alluxio checked all the boxes. To start, we wanted to see how much performance we could squeeze out of just adding Alluxio nodes to our architecture without doing anything clever. We were happy with the initial tests and wrote out some requirements for adding Alluxio without any new complications to an already complicated system.
Optimizing Cache Strategy
With Alluxio added, the weight of missing an opportunity to move data from S3 to Spark was much lighter. Alluxio helped us in two meaningful ways. First, Alluxio could mount our S3 paths, and most of the state changes that occurred within the bucket didn’t need to be orchestrated by our cache warming system. Secondly, reading data from Alluxio rather than S3 was, on average, 10x faster. In effect, if we didn’t move data from Alluxio to Spark in an optimized time sensitive way, it wasn’t a big deal because those tables could be materialized quickly. At first, we didn’t do much to try to optimize our Alluxio usage, but to optimize cost and avoid caching several TBs of data, we mounted file paths for some of our heaviest usages easing the cognitive and computational overhead of warming caches for users with unpredictable patterns. For users who didn’t use the platform as much, we used a cache warming system that warmed the cache and created Spark tables when users were likely to use them and destroyed the cache otherwise. This change helped us reduce the overall size of our cluster, and we gained a comprehensive understanding of usage patterns over time. Additionally, with the ease at which adding and dropping Alluxio nodes and rebalancing the data led us to experiment with Alluxio for our use cases without compromising user experience.
While it took a while to feel comfortable with our cache strategy, once refined our p99 latencies reduced to 7 seconds. Further, 98% of all queries happened with Alluxio rather than S3 directly. The 2% of queries that happened against S3 were new data sources not yet mounted by Alluxio or optimized by our cache warming system.
On top of the improvements to the latencies was an increase in customer satisfaction as measured through contract renewals and a decrease in customer support tickets about slow query times.
We incurred some significant initial costs to run memory-optimized nodes for Alluxio. Over the months using Alluxio, we were able to make some optimization to our cache strategy based on usage patterns and avoid storing the entirety of our production data in Alluxio. Even with the increased costs to operate the new nodes, the operation burden was only increased slightly and the customer experience was improved dramatically.
One of the greatest strengths of Alluxio is its simplicity. It doesn’t try to do too much or be more than a virtual representation of data in underlying object storage. In this architecture, I rely heavily on Spark for the query and compute aspect, but Spark is far from the only compute engine I could use. One of the most significant costs of this system is the orchestration layer for Spark running on Mesos (we moved this stack to Kubernetes later on). Once supported by Alluxio, using S3 Select or Google Cloud Data Flow as a replacement for Spark would put the responsibility of compute orchestration and scheduling onto a cloud provider and most likely decrease costs when the services cycle down during low utilization.
This post covered a good deal of the orchestration we invested in and with a little more investment, we could also use the Alluxio Java Client, write our own S3 querying engine and utilize the Alluxio SDK for our own caching strategy. This sounds more like an enjoyable engineering exercise than something we’d get any user experience or cost optimization gains from, but it’s interesting to think about. It’s unclear at this point what direction this project will do in, but it’s very likely Alluxio will be at the center of it.
Opinions expressed by DZone contributors are their own.
DevOps Pipeline and Its Essential Tools
You’ve Got Mail… and It’s a SPAM!
The Native Way To Configure Path Aliases in Frontend Projects
What Is JHipster?