DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workkloads.

Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • Getting Started With Apache Cassandra
  • The Magic of Apache Spark in Java
  • How to Geo-Partition Data in Distributed SQL
  • Apache Spark for the Impatient

Trending

  • Unlocking the Potential of Apache Iceberg: A Comprehensive Analysis
  • Beyond ChatGPT, AI Reasoning 2.0: Engineering AI Models With Human-Like Reasoning
  • Issue and Present Verifiable Credentials With Spring Boot and Android
  • Java Virtual Threads and Scaling
  1. DZone
  2. Data Engineering
  3. Databases
  4. Efficient Cassandra Write Pattern for Micro-Batching

Efficient Cassandra Write Pattern for Micro-Batching

The best way to write to a Cassandra cluster are concurrent asynchronous writes. In cases where data exhibits strong temporal locality, speed can be improved.

By 
John Georgiadis user avatar
John Georgiadis
·
May. 20, 15 · Interview
Likes (1)
Comment
Save
Tweet
Share
33.9K Views

Join the DZone community and get the full member experience.

Join For Free

Cassandra is a strong candidate for storing streaming data such as time series. Therefore it is typically used in combination with Apache Storm or Apache Spark.

The fastest option for writing to a Cassandra cluster is through concurrent asynchronous writes. However, in cases where data exhibits strong temporal locality, the performance can be further improved.

Stream Processing and Micro Batches

Both Storm Trident and Spark Streaming exchange latency for throughput. Instead of processing individual messages as they arrive, they retrieve messages in small batches. The interval between consecutive batch retrievals is usually between 100-1000ms. The batch size may vary from a few messages to several 1000s messages.

Processing messages in batches allows implementing aggregation and persistence much more efficiently. E.g. an event counter would be updated only once per batch, rather than each time a new message arrives. Since the batch size is configurable, the latency vs throughput tradeoff can be balanced according to the application requirements. It is the potential correlation between messages in the same batch that can be leveraged to improve the write performance of Cassandra.

Cassandra Table Sharding

Cassandra distributes a table’s data across a group of replica sets according to each row’s partition key. The partition key is part of the table’s primary key with the remaining parts constituting the ‘clustering’ key. The partition key identifies the replica set where the row will be stored, the clustering keys provide unique naming for Cassandra’s extensible row columns.

Cassandra supports batch operations where several queries are packed into the same logical operation. However batches are not the fastest approach to inserting data to Cassandra. This is partly due to the atomicity guarantee of batches and partly to the fact that the statements of a batch still have to be distributed to their respective replica sets according to their partition key which may not include the node that the client sent the batch to.

The current version 2.1.5 of the Cassandra Java driver (and probably drivers for other languages) supports token-aware routing of statements. That is, when Session.execute(statement) is called, the driver will extract the partition key from the statement (based on the schema metadata), and by using a token map it will identify the replica set, i.e. the list of Cassandra nodes that store (or will store) the referenced partition. The driver will then return this replica set, optionally with the list of hosts randomly ordered to balance the workload across the nodes of the replica set. The driver will append additional hosts to the list of candidates based on the downstream chain of routing policies such as latency-based routing.

Asynchronous Writes

The fastest general-purpose approach to inserting data to Cassandra is described in Batch loading without the Batch keyword:

for (Statement s:list) {
    ResultSetFuture future = session.executeAsync(s);
    tasks.add(future);
    if (tasks.size() < 8000)
        continue;

    for (ResultSetFuture t:tasks)
        t.getUninterruptibly(10000, TimeUnit.MILLISECONDS);
    tasks.clear();
}

if (tasks.size() != 0) {
    for (ResultSetFuture t:tasks)
        t.getUninterruptibly(10000, TimeUnit.MILLISECONDS);
}

Throughput is moderated to avoid overloading the server. In a streaming environment, the above code will be run by multiple threads depending on the parallelism set for the persistence phase. The statements will be BoundStatements derived from the same PreparedStatement template using the attributes of each message (or aggregate result).

Assuming a token-aware driver, each individual statement will be sent to the correct replica set.

This approach is far more efficient than sending batch statements. On a cluster with 4 nodes, replication factor (RF) 3, and batch/threshold size of 8000 operations, the results for 1M insertions are:

 Pattern Write/Sec
Batch Statements
7100
Asynchronous Statements
13000

Token-Aware Batch Statement

Sending mixed-partition statements in the same batch is an anti-pattern. The Cassandra Java driver will not complain when it encounters this type of batch and will use the first statement in the batch to calculate the list of hosts to try to send the batch to. In a 4-node cluster with RF=3, each node holds 75% percent of the partition keys. Assuming a uniform distribution of incoming partition keys, 1/4 of the statements in the batch will be sent to the wrong replica set.

To circumvent the issue, statements will need to placed in multiple batches according to replica set. Fortunately the Java driver exports the call that associates a statement with the right replica-set:

static final int RF = 3;

void run() {
    for (Statement s:list) {
        statements.add(s);
        if (statements.size() < 8000)
            continue;
        execute(statements);    
        statements.clear();
    }
    if (statements.size() != 0)
        execute(statements);
}

void execute(List<Statement> list) {
    List<List<Statement>> groups = splitByToken(session.getCluster(), list);
    for (List<Statement> group:groups) {
        BatchStatement batch = new BatchStatement(Type.UNLOGGED);
        batch.addAll(group);    
        session.execute(batch);
    }
}

List<List<Statement>> splitByToken(Cluster cluster, List<Statement> batch) {
    Map<Set<Host>,List<Statement>> batches = new HashMap<>();
    for (Statement s:batch) {
        Set<Host> hosts = new HashSet<>();
        int replicas = 0;

        Iterator<Host> it = cluster.getConfiguration().getPolicies(). 
            getLoadBalancingPolicy().newQueryPlan(s.getKeyspace(), s);
        while (it.hasNext() && replicas < RF) {
            hosts.add(it.next());
            replicas++;
        }

        List<Statement> tokenBatch = batches.get(hosts);
        if (tokenBatch == null) {
            tokenBatch = new ArrayList<>();
            batches.put(hosts, tokenBatch);
        }
        tokenBatch.add(s);
    }

    return new ArrayList<>(batches.values());
}

Token-aware batches are more efficient than batches that reference multiple partition keys, however they are still not near as efficient as asynchronous execution of individual statements:

 Pattern Write/Sec
Batch Statements
7100
Token-Aware Batch Statements
9100
Asynchronous Statements
13000

Temporal Locality

The use case where token-aware batches shine is when the data within each batch exhibits high temporal locality with respect to the partition key. That is, when multiple messages in a batch refer to the same partition key. One can go as far as choosing the partition key based on the attributes that exhibit temporal locality. There’s a downside to this approach as it tends to create wide rows. Experts have suggested ways to overcome this through row partitioning (Advanced Time Series with Cassandra).

Temporal locality can perhaps be better defined using cache performance terms. Thus P% locality in a batch corresponds to P% cache hit rate assuming the messages were placed in a cache according to their partition key. With a 40% locality, then the token-aware batch approach has comparable performance to asynchronous execution. For higher temporal locality, the token-aware routing surpasses the asynchronous execution pattern in terms of writes per second.

 Pattern 0% Locality
20% Locality
 40% Locality
60% Locality
 80% Locality
Batch Statements (write/s)
7100
7500
8300
10600
16000
Token-Aware Batch Statements (write/s)
9100
10200
12600
17700
29400
Asynchronous Statements (write/s)
13000
13100
12700
13700
13200
Database Partition (database) Relational database clustering Driver (software)

Opinions expressed by DZone contributors are their own.

Related

  • Getting Started With Apache Cassandra
  • The Magic of Apache Spark in Java
  • How to Geo-Partition Data in Distributed SQL
  • Apache Spark for the Impatient

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!