DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones AWS Cloud
by AWS Developer Relations
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones
AWS Cloud
by AWS Developer Relations
The Latest "Software Integration: The Intersection of APIs, Microservices, and Cloud-Based Systems" Trend Report
Get the report
  1. DZone
  2. Data Engineering
  3. Databases
  4. Efficient Cassandra Write Pattern for Micro-Batching

Efficient Cassandra Write Pattern for Micro-Batching

The best way to write to a Cassandra cluster are concurrent asynchronous writes. In cases where data exhibits strong temporal locality, speed can be improved.

John Georgiadis user avatar by
John Georgiadis
·
May. 20, 15 · Interview
Like (1)
Save
Tweet
Share
30.72K Views

Join the DZone community and get the full member experience.

Join For Free

Cassandra is a strong candidate for storing streaming data such as time series. Therefore it is typically used in combination with Apache Storm or Apache Spark.

The fastest option for writing to a Cassandra cluster is through concurrent asynchronous writes. However, in cases where data exhibits strong temporal locality, the performance can be further improved.

Stream Processing and Micro Batches

Both Storm Trident and Spark Streaming exchange latency for throughput. Instead of processing individual messages as they arrive, they retrieve messages in small batches. The interval between consecutive batch retrievals is usually between 100-1000ms. The batch size may vary from a few messages to several 1000s messages.

Processing messages in batches allows implementing aggregation and persistence much more efficiently. E.g. an event counter would be updated only once per batch, rather than each time a new message arrives. Since the batch size is configurable, the latency vs throughput tradeoff can be balanced according to the application requirements. It is the potential correlation between messages in the same batch that can be leveraged to improve the write performance of Cassandra.

Cassandra Table Sharding

Cassandra distributes a table’s data across a group of replica sets according to each row’s partition key. The partition key is part of the table’s primary key with the remaining parts constituting the ‘clustering’ key. The partition key identifies the replica set where the row will be stored, the clustering keys provide unique naming for Cassandra’s extensible row columns.

Cassandra supports batch operations where several queries are packed into the same logical operation. However batches are not the fastest approach to inserting data to Cassandra. This is partly due to the atomicity guarantee of batches and partly to the fact that the statements of a batch still have to be distributed to their respective replica sets according to their partition key which may not include the node that the client sent the batch to.

The current version 2.1.5 of the Cassandra Java driver (and probably drivers for other languages) supports token-aware routing of statements. That is, when Session.execute(statement) is called, the driver will extract the partition key from the statement (based on the schema metadata), and by using a token map it will identify the replica set, i.e. the list of Cassandra nodes that store (or will store) the referenced partition. The driver will then return this replica set, optionally with the list of hosts randomly ordered to balance the workload across the nodes of the replica set. The driver will append additional hosts to the list of candidates based on the downstream chain of routing policies such as latency-based routing.

Asynchronous Writes

The fastest general-purpose approach to inserting data to Cassandra is described in Batch loading without the Batch keyword:

for (Statement s:list) {
    ResultSetFuture future = session.executeAsync(s);
    tasks.add(future);
    if (tasks.size() < 8000)
        continue;

    for (ResultSetFuture t:tasks)
        t.getUninterruptibly(10000, TimeUnit.MILLISECONDS);
    tasks.clear();
}

if (tasks.size() != 0) {
    for (ResultSetFuture t:tasks)
        t.getUninterruptibly(10000, TimeUnit.MILLISECONDS);
}

Throughput is moderated to avoid overloading the server. In a streaming environment, the above code will be run by multiple threads depending on the parallelism set for the persistence phase. The statements will be BoundStatements derived from the same PreparedStatement template using the attributes of each message (or aggregate result).

Assuming a token-aware driver, each individual statement will be sent to the correct replica set.

This approach is far more efficient than sending batch statements. On a cluster with 4 nodes, replication factor (RF) 3, and batch/threshold size of 8000 operations, the results for 1M insertions are:

 Pattern Write/Sec
Batch Statements
7100
Asynchronous Statements
13000

Token-Aware Batch Statement

Sending mixed-partition statements in the same batch is an anti-pattern. The Cassandra Java driver will not complain when it encounters this type of batch and will use the first statement in the batch to calculate the list of hosts to try to send the batch to. In a 4-node cluster with RF=3, each node holds 75% percent of the partition keys. Assuming a uniform distribution of incoming partition keys, 1/4 of the statements in the batch will be sent to the wrong replica set.

To circumvent the issue, statements will need to placed in multiple batches according to replica set. Fortunately the Java driver exports the call that associates a statement with the right replica-set:

static final int RF = 3;

void run() {
    for (Statement s:list) {
        statements.add(s);
        if (statements.size() < 8000)
            continue;
        execute(statements);    
        statements.clear();
    }
    if (statements.size() != 0)
        execute(statements);
}

void execute(List<Statement> list) {
    List<List<Statement>> groups = splitByToken(session.getCluster(), list);
    for (List<Statement> group:groups) {
        BatchStatement batch = new BatchStatement(Type.UNLOGGED);
        batch.addAll(group);    
        session.execute(batch);
    }
}

List<List<Statement>> splitByToken(Cluster cluster, List<Statement> batch) {
    Map<Set<Host>,List<Statement>> batches = new HashMap<>();
    for (Statement s:batch) {
        Set<Host> hosts = new HashSet<>();
        int replicas = 0;

        Iterator<Host> it = cluster.getConfiguration().getPolicies(). 
            getLoadBalancingPolicy().newQueryPlan(s.getKeyspace(), s);
        while (it.hasNext() && replicas < RF) {
            hosts.add(it.next());
            replicas++;
        }

        List<Statement> tokenBatch = batches.get(hosts);
        if (tokenBatch == null) {
            tokenBatch = new ArrayList<>();
            batches.put(hosts, tokenBatch);
        }
        tokenBatch.add(s);
    }

    return new ArrayList<>(batches.values());
}

Token-aware batches are more efficient than batches that reference multiple partition keys, however they are still not near as efficient as asynchronous execution of individual statements:

 Pattern Write/Sec
Batch Statements
7100
Token-Aware Batch Statements
9100
Asynchronous Statements
13000

Temporal Locality

The use case where token-aware batches shine is when the data within each batch exhibits high temporal locality with respect to the partition key. That is, when multiple messages in a batch refer to the same partition key. One can go as far as choosing the partition key based on the attributes that exhibit temporal locality. There’s a downside to this approach as it tends to create wide rows. Experts have suggested ways to overcome this through row partitioning (Advanced Time Series with Cassandra).

Temporal locality can perhaps be better defined using cache performance terms. Thus P% locality in a batch corresponds to P% cache hit rate assuming the messages were placed in a cache according to their partition key. With a 40% locality, then the token-aware batch approach has comparable performance to asynchronous execution. For higher temporal locality, the token-aware routing surpasses the asynchronous execution pattern in terms of writes per second.

 Pattern 0% Locality
20% Locality
 40% Locality
60% Locality
 80% Locality
Batch Statements (write/s)
7100
7500
8300
10600
16000
Token-Aware Batch Statements (write/s)
9100
10200
12600
17700
29400
Asynchronous Statements (write/s)
13000
13100
12700
13700
13200
Database Partition (database) Relational database clustering Driver (software)

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • Building Microservice in Golang
  • Monolithic First
  • The 5 Books You Absolutely Must Read as an Engineering Manager
  • Introduction to Spring Cloud Kubernetes

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends: