Making Apache Spark the Most Versatile and Fast Data Platform Ever
Making Apache Spark the Most Versatile and Fast Data Platform Ever
Get a good grasp of SnappyData, an open-source project, and its positioning with respects to Apache Spark and other fast data systems.
Join the DZone community and get the full member experience.Join For Free
Hortonworks Sandbox for HDP and HDF is your chance to get started on learning, developing, testing and trying out new features. Each download comes preconfigured with interactive tutorials, sample data and developments from the Apache community.
We released SnappyData 1.0 GA last week. We couldn't be more proud of our team for their effort, dedication, and commitment. It has been a long, enduring test for most in the team, having worked on distributed in-memory computing for so many years and going back to our days in VMWare-Pivotal. Just in the last year, the team has closed about 1,000 JIRAs and improved performance 5-10 fold, while supporting several customers and the community. The project roughly added 200K source lines and another 70K lines of test code.
If you have been following SnappyData closely, here is the list of improvements since the 0.9 release.
In this post, I will focus on the broader audience still trying to grasp SnappyData and its positioning with respects to Spark and other fast data systems. I am hoping you will be intrigued enough to give SnappyData a try and star our Git repository.
While Apache Spark is a general-purpose engine for both real-time and batch big data processing, its core is designed for high throughput and batch processing. It cannot process events at a time, do point reads/writes, or manage mutable state. Spark has to rely on an external store to share data and get low-latency access, updates, and high concurrency. And when coupled with external stores, its in-memory performance is heavily impacted by the frequent data transfers required from such stores into Spark’s memory. The schematic below captures this challenge.
Figure 1: Spark’s runtime architecture.
Figure 2: Spark enhanced with hybrid (row + column) store.
SnappyData adds mutability with transactional consistency into Spark, permits data sharing across applications, and allows for a mix of low-latency operations (such as a KV read/write operations) with high-latency ones (an expensive aggregation query or ML training job).
SnappyData introduces hybrid in-memory data management into Spark. SnappyData's Spark++ clusters can now analyze streams as well as manage transactional and historical states for advanced insight using Spark as its programming model. The single cluster strategy for compute and data provides the best possible performance while avoiding expensive stitching of complex distributed systems (the norm today).
The schematic above depicts this fusion of Spark with a hybrid in-memory DB. Applications can submit Spark jobs to be executed inside the cluster achieving up to 3 orders of magnitude performance gains, compared to a design where Spark is purely a computational layer. SnappyData also provides a very high-performance connector for Spark so any Spark cluster can connect to SnappyData as a store. In this “smart connector” mode, unlike other connectors, SnappyData uses the same columnar format as Spark for data movement along with several other optimizations making it significantly faster than every store we have tried in the market today.
The details of how SnappyData is architected is described in this CIDR paper. The schematic below captures its key components and the ecosystem.
Figure 3: SnappyData core, Spark Facade, and eco-system. The components in the center and highlighted in light green are some of the SnappyData extensions into the Spark runtime.
Deep Insights on Live Data
We have seen a flurry of stream processing engines (Google Data Flow, Flink, Storm, and IBM Infosphere, to name a few) all aimed at capturing perishable insights — insights obtained as events that, if not acted upon immediately, lose their value.
They all support programming constructs that can use either custom application logic or SQL to detect a condition or pattern within the stream. For instance, finding the most popular products selling now, top-K URLs, et.c can all be continuously captured as KPIs and made accessible to any application. For deeper insight, you often need to correlate current patterns to historical patterns or relate current patterns to other contextual datasets (Are sudden changes in temperature and pressure correlated to previously known patterns in a manufacturing device? Did the environment play a role?). Such insight, once presented to users, solicits other, deeper questions.
Often, this requires large-scale distributed joins, integration with disparate data sources, evaluating on the incremental training of ML/DL models, and even permitting instant visualization/data exploration tools that pose ad-hoc questions on all of this data. While some of the existing tools would permit joining a stream to related datasets, what we find is that these related datasets are managed in enterprise repositories that are themselves large, diverse (NoSQL, SQL, text, etc.) and also constantly changing. Imagine a telco operator placing location sensitive offers/ads on mobile devices that require offers and subscriber profiles from CRM systems or from partner systems. You have to combine a high-velocity CDR (call data record) stream with live data sets that reside in CRM systems.
Trying to execute a real-time join with CRM systems in real time is not possible. What you need is an engine that supports replicating changes in the CRM system into a system that also manages the stream state (CDC). Moreover, this dependent state itself can be large. Most of the current solutions fall short. Streaming systems focus on how to manage streams and offer primitive state management.
True analytics on live data requires a different design center that can consume any "live" data set in the enterprise, not just incoming streams.
SnappyData aims to offer true deep insight on any live data — event streams (sensor streams), trapping continuous changes in enterprise databases (MySQL, Cassandra, etc.), historical data in-memory, or even datasets in remote sources. For instance, you can run a continuous or interactive query that combines windowed streams, reference data, and even large datasets in S3/HDFS. You can even use probabilistic data structures to condense extremely large datasets into main-memory and make instant decisions using approximate query processing. The SnappyData design center is more like a highly scalable MPP database that runs in-memory and offers streaming support.
Figure 4 shows what the current state-of-the-art is for a streaming system. Figure 5 depicts what a SnappyData-enabled system might look like.
Figure 4: Challenging to run complex analytics with streaming systems.
Figure 5: SnappyData’s architecture for live analytics
Don't All Modern Business Intelligence Tools Support Live Analytics?
While there are several BI tools in the market that support live analytics by connecting directly to the source, most don't scale or perform. The prolific response in the BI tools community has been to pre-aggregate or generate multi-dimensional cubes, cache these in-memory, and allow the BI visualizations to be driven from this cache. Unfortunately, this doesn't work for two reasons:
- These caches are read-only, take time to construct, and don't provide access to the live data we expect.
- Increasingly, analytics requires working with many data sources, fluid NoSQL data, and too many dimensions. It is far too complex and time-consuming to generate multi-dimensional cubes.
Figure 6 captures the challenges in existing tools for business intelligence.
Figure 6: Challenges in business intelligence tools.
SnappyData manages data in distributed memory, offering extreme performance through columnar data management, code generation, vectorization, and statistical data structures. And it natively supports all the Spark data types: nested objects, JSON, text, and of course, structured Spark SQL types.
The net effect is to enable access to live data in streams, tables, and external data sources to any modern BI tool. Our goal is to offer interactive analytic performance even for live big data across many concurrent users.
Why Not Spark Itself?
If you are Spark-versed, you might be wondering why this isn't necessarily possible in Spark. All in-memory state in Spark is immutable, thereby requiring applications to relegate mutating state to external data stores like Cassandra. All analytical queries require repeated copying and even complex deserialization, making analytical queries very expensive to execute. Furthermore, all queries in Spark are scheduled as jobs, often consuming all available CPU executing one query at a time and hence offering low concurrency in query processing. In analytics, workloads are often a mix of expensive scans/aggregations or drill-down questions that look for pinpointed datasets. Unlike Spark, SnappyData distinguishes low-latency queries from high-latency ones and ensures that application requests are handled with high concurrency.
Working With Heterogenous Data
Live data often arrives in a range of formats, such as text, XML, JSON, custom objects in streams. Data can be self-describing, nested, composed as a graph, and not compliant to a pre-defined schema.
SnappyData capitalizes on Spark’s ability to connect to virtually any data source and infer its schema. Here is a code snippet to read a collection of JSON documents from MongoDB and store in memory as a column table in SnappyData. Note that there was no need to specify the schema for the table. Internally, each JSON document in the collection is introspected for its schema. All these individual schema structures are merged to produce the final schema for the table.
val mongoDataFrame = snappySparkSession.loadFromMongoDB(ReadConfig( Map("uri" -> "mongodb://example.com/database.collection") ) mongoDataFrame.printSchema //The schema is automatically inferred by sampling documents snappySparkSession.write.format("column").saveAsTable("aColumnTable") snappySparkSession.sql("select avg(product.retailPrice - discount) from aColumnTable") // Run queries on nested JSON objects
SnappyData applications can connect, transform, and import/export data from S3, CSV, Parquet, Hadoop, NoSQL stores like Cassandra, Elastic, and HBase, all relational databases, object data grids, and more. Essentially, the data model in SnappyData is the same as Spark.
It is difficult to find big data stores that don’t claim interactive speeds. The common myth is that if data is managed in memory combined with enough CPU (and even GPU), queries will execute at interactive speeds. Practitioners will note that this is often not the case. For instance, you may be surprised to find that Spark executes queries faster on parquet files than on its in-memory cache (see charts below). This has much to do with the layout of the data in-memory and if the code is written keeping in mind modern-day multi-core CPU architectures.
SnappyData heavily leverages columnar storage in-memory, using row storage when appropriate, co-locates related data sets to avoid shuffles and a range of other optimizations as described in an earlier blog.
Here are the results from a simple performance benchmark on a MacBook Oro (4 cores) comparing Spark caching, Spark+Cassandra, and Spark + Kudu. We measure the load times for 100 million records as well as the time for both analytic and point queries. We use a single table with two columns (ID and symbol). We made sure all comparisons were fair and went through basic documented tuning options.
Figure 7: Note that lower is better.
Figure 9: SnappyData and Spark did not define ID as a key in the above test.
Published at DZone with permission of Pierce Lamb . See the original article here.
Opinions expressed by DZone contributors are their own.