Cassandra Bulk CDC Extract
Our client was not able to achieve the performance they needed from their traditional RDBMS persistence platform. Their requirement was to have a highly available and sharded persistence platform. They were willing to sacrifice consistency for improved read and write performance. They made the decision to migrate to Cassandra to improve performance of the application. Consumers that were accessing data in the traditional RDBMS now had to figure out how to get the data they needed out of Cassandra. Specifically, the analytics team had to modify all of their ETL to source from Cassandra instead of the RDBMS.
The application APIs accessed the data in Cassandra by the Record Key or one row at a time. Record Key access to Cassandra is very fast. ETL processes generally extract all data that has changed since the last extract was executed. This is called Change Data Capture (CDC) and requires reading all rows in a batch to determine what has changed since the last extract. Querying Cassandra without a row key results in a full scan of the data. This is not an option in an operational production environment. In short, the data model was optimized for the operational data access paths, not the bulk extracts required for analytics.
The requirements for sourcing data from Cassandra for analytics were:
- Select only entities (rows) that have been updated or added since the last ETL batch run.
- Minimize dependencies on the development infrastructure.
- Minimize dependencies on the development team to alter API or data model.
- Minimize impact to the production Cassandra rings.
- Near real-time processing.
- Create a template solution for other sources that migrate to Cassandra.
- Minimize the amount of custom code. Most ETL teams do not have Java resources and Java code is hard to maintain for an ETL relative to Pentaho artifacts.
- Maintain a complete history of all changes to the operational data.
The biggest problem to solve was trying to figure out a way to determine what has changed in Cassandra without querying the data and impacting production performance. Cassandra provides two pieces of functionality that provides a simple and elegant solution. First, incremental backups can be implemented on the Cassandra ring. This will produce SSTables, Cassandra’s file storage structure, on a periodic basis. The SSTables will only contain new data. Data in Cassandra is not updatable. New data is created and the old data is marked for deletion. This means that all data changes are guaranteed to be in the SSTable incremental backups. Secondly, the sstable2json tool is provided with Cassandra. This tool converted the SSTable into a well-defined JSON output that can be further processed. This solved the first requirement of Change Data Capture (CDC).
Ultimately, an additional Cassandra ring was stood up for the analytics team. This ring was replicated from production and had incremental backups enabled. This architecture met the next three requirements because there was no negative performance impact to the production ring and it did not require any changes to be made by the development team.
Cassandra generates an enormous amount of data and hence a lot of incremental backups. The question is how to process the JSON output efficiently. A series of scripts were written to process the SStables, convert them to JSON and push them up to the Hadoop Distributed File System (HDFS)in a Hadoop cluster. A Pig macro was written to convert the JSON into a standard format:
· Row Key
· Column Name
· Column Value
· Column Timestamp
In essence, they created a generic Entity Attribute Value (EAV) representation of the Cassandra row. Pig scripts created Map Reduce jobs that were executed in parallel on commodity hardware in the Hadoop cluster. The Hadoop cluster provided the processing power to process the large amount of data in the incremental backups. The Pig macro was wrapped by an entity specific ETL Pig script that provided further transformation and manipulation of the data based on the specific contents of the row for a given entity. The results were stored in HDFS and partitioned by the batch number (e.g. YYYYMMDDHH24MISS.sssssss). This part of the solution met the last four requirements because:
- the Hadoop cluster is fast enough to provide near real-time processing,
- the Pig macro and architecture is a template that can be used on any Cassandra column family
- Pig scripting is much easier to learn than writing Java code to create Map Reduce jobs and
- HDFS contains a history of all changes to the data in the operational system (Cassandra), including expiring and deleted data.
Traditional ETL was developed using Pentaho Data Integrator to populate a dimensional data warehouse in an RDBMS. Pentaho has a plugin that can read data directly from HDFS files. The data that was loaded into the RDBMS was aggregate data. All of the detailed data is stored in HDFS and is accessible for adhoc queries using Hive, Pig or other big data tools. This is a very flexible solution because the tables in the RDBMS can be recreated at any time using the data in HDFS to change the grain of the fact, add dimension attributes, etc.