Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Cassandra to Kafka Data Pipeline (Part 1)

DZone's Guide to

Cassandra to Kafka Data Pipeline (Part 1)

When observing the diagrams, it seems like a pretty straightforward and trivial thing to do. But there’s more to it, especially when you want to do it with no downtime.

· Big Data Zone
Free Resource

Need to build an application around your data? Learn more about dataflow programming for rapid development and greater creativity. 

I've wanted to create a system which in its core uses event sourcing ever since I read Martin Kleppmann’s Making Sense of Stream Processing. The book is really amazing. Martin explains all concepts from their basic building blocks in a really simple, understandable way. I recommend this book to everyone.

The idea is to have a running Cassandra cluster and to evolve a system with no downtime in such a way that Kafka is the source of truth with immutable facts. Every other system (in this case, the Cassandra cluster) should use these facts and aggregate/transform them for its purpose. Also, since all facts are in Kafka, it should be easy to drop the whole database, index, cache, or any other data system and recreate it from scratch again.

The following diagrams illustrate the system evolution.

Image title

Starting system architecture.

Image title Target system architecture.

When observing the diagrams, it seems like a pretty straightforward and trivial thing to do. But there’s more to it, especially when you want to do it with no downtime.

Evolution Breakdown

I tried to break down the evolution process to a few conceptual steps. Here's what I came up with:

  1. Have a mechanism to push each Cassandra change to Kafka with a timestamp.
  2. Start collecting each Cassandra change to a temporary Kafka topic. I need to start collecting before a snapshot is taken; otherwise, there will be a time window in which incoming changes would be lost. It also needs to go to a temporary topic since there's data in the database that should be first in an ordered sequence of events.
  3. Take the existing database snapshot
  4. Start reading data from the snapshot into the right Kafka topic. Since the data from the snapshot was created first, it should be placed first into Kafka.
  5. After the snapshot is read, redirect the data from the temporary Kafka topic to the right Kafka topic, minding the timestamp of when the snapshot is taken. This step is essential to be done correctly and may be the hardest part. Since change event collecting started before the snapshot, there is a possibility that some events also exist in the snapshot, as well. To avoid inconsistencies, each event should be idempotent. We need to be as precise as possible when comparing the event timestamp with the snapshot timestamp.
  6. Create a new Cassandra cluster/keyspace/table and Kafka stream to read from Kafka and insert into the new Cassandra cluster/keyspace/table. As a result, the new Cassandra cluster should practically be a copy/clone of the existing one.
  7. Wait for the temporary Kafka topic to deplete. If I change the application to read from the new Cassandra right away and the Kafka temporary topic hasn't caught up with the system, there will be significant read delays (performance penalties). To make sure everything is in order, monitoring the time to propagate the change to the new Cassandra cluster will help. If this number is decent (a few milliseconds), we can proceed to the next step.
  8. Change the application to read from the new Cassandra instead of old and still write to old. Since everything is done within the no downtime context, the application is actually several instances of applications on different nodes and they won’t be changed simultaneously (that would cause downtime). I need to change one at a time, while others are still having the old software version. For this reason, the application still needs to write to the old Cassandra, since other application nodes are still reading from the old Cassandra.
  9. When each application instance is updated, change the application to write directly to the right Kafka right. Now each node, one by one, can be updated to the new application version, which will write directly to Kafka. In parallel, old nodes will write to the old Cassandra, which will propagate to Kafka topic, and new nodes will write directly to the Kafka topic. When the change is complete, all nodes are writing directly to the Kafka topic and we are good to go.
  10. Clean up. At this point, the system writes to the right Kafka topic. The stream is reading from it and making inserts into the new Cassandra. The old Cassandra and Kafka temporary topics are no longer necessary — it should be safe to remove them.

Well, that’s the plan, so we’ll see whether it is doable or not.

There are a few reasons why I’ve chosen to evolve an existing system instead of building one the way I want from scratch:

  1. It's more challenging — hence more fun!
  2. The need for evolving existing systems is the everyday job of software developers; you don’t get a chance to build a system for a starting set of requirements with the guarantee that nothing in it will ever change (except for a college project, perhaps).
  3. When a system needs to change, you have two options: build a new one from scratch and when ready, replace the old or evolve the existing. I’ve done the former a few times in my life, and it might seem as fun at the beginning, but it takes awfully long, with a lot of bug fixing, often ends up as a catastrophe and is always expensive
  4. Evolving a system takes small changes with more control, instead of placing a totally new system instead of the old.
  5. I’m a fan of Martin Fowler’s blog, Evolutionary Database Design, fits particularly nicely in this topic.

Since writing about this in a single post would render quite a huge post, I’ve decided to split it into a few, I’m still not sure how many, but I’ll start and see where it takes me. Bear with me.

Data Model

I’ll start with a data model. Actually, it is just one simple table, but it should be enough to demonstrate the idea. The following CQL code describes the table.

CREATE TABLE IF NOT EXISTS movies_by_genre (
  title text,
  genre text,
  year int,
  rating float,
  duration int,
  director text,
  country text,
  PRIMARY KEY ((genre, year), rating, duration)
) WITH CLUSTERING ORDER BY (rating DESC, duration ASC);

The use case for this table might not be that common since the table is actually designed to have a complex primary key with at least two columns as a partition key and at least two clustering columns. The reason for that is it will leverage examples since handling a complex primary key might be necessary for someone reading this.

In order to satisfy the first item from the evolution breakdown, I need a way to push each Cassandra change to Kafka with a timestamp. There are a few ways to do it: Cassandra Triggers, Cassandra CDC, Cassandra Custom Secondary Index, and possibly some other ways, but I’ll investigate only these three.

Cassandra Triggers

For this approach, I’ll use two Cassandra 3.11.0 nodes, two Kafka 0.10.1.1 nodes, and one Zookeeper 3.4.6. Every node will run in a separate Docker container. I decided to use Docker since it keeps my machine clean and it is easy to recreate infrastructure.

To create a trigger in Cassandra, ITrigger interface needs to be implemented. The interface itself is pretty simple:

public interface ITrigger {

    public Collection<Mutation> augment(Partition update);
}

That’s all there is to it. The interface has been changed since Cassandra 3.0. Earlier versions of Cassandra used the following interface:

public interface ITrigger {

    public Collection<Mutation> augment(ByteBuffer partitionKey, ColumnFamily update);
}

Before I dive into implementation, let’s discuss the interface a bit more. There are several important points regarding the implementation, which are explained on the interface’s javadoc:

  1. Implementation of this interface should only have a constructor without parameters.
  2. ITrigger implementation can be instantiated multiple times during the server lifetime. (Depends on the number of times the trigger folder is updated.)
  3. ITrigger implementation should be stateless (avoid dependency on instance variables).

Besides that, augment method is called exactly once per update and the Partition object contains all relevant information about the update. You might notice that return type is not void but rather a collection of mutations. This way, the trigger can be implemented to perform some additional changes when certain criteria are met. But since I just want to propagate data to Kafka, I’ll just read the update information, send it to Kafka, and return empty mutation collection. In order not to pollute this article with a huge amount of code, I’ve created a Maven project that creates a JAR file.

I’ll try to explain the code in the project. Firstly, there is a FILE_PATH constant, which points to /etc/cassandra/triggers/KafkaTrigger.yml and this is where YAML configuration for the trigger class needs to be. It should contain configuration options for Kafka brokers and for topic name. The file is pretty simple since the whole thing contains just two lines:

bootstrap.servers: cluster_kafka_1:9092,cluster_kafka_2:9092
topic.name: trigger-topic

I’ll come to that later when we build our Docker images. Next, there is a constructor which initializes the Kafka producer and ThreadPoolExecutor. I could have done it without ThreadPoolExecutor, but the reason for it is that the trigger augment call is on Cassandra’s write path. In that way, it impacts Cassandra’s write performances. To minimize that, I’ve moved trigger execution to background threads. This is doable in this case since I'm not making any mutations — I can just start the execution in another thread and return an empty list of mutations immediately. If the trigger needs to make a mutation based on partition changes, that would need to happen in the same thread.

Reading data from partition update in augment method is really a mess. Cassandra API is not that intuitive and I went through a real struggle to read all the necessary information. There are a few different ways to update a partition in Cassandra, and these are ones I’ve covered:

  1. Insert
  2. Update
  3. Delete director column
  4. Delete title column
  5. Delete both director and title columns
  6. Delete row
  7. Delete range of rows for last clustering column (duration between some values)
  8. Delete all rows for specific rating clustering column
  9. Delete range of rows for first clustering column (rating between some values)
  10. Delete the whole partition

A simplified algorithm would be:

if (isPartitionDeleted(partition)) {
    handle partition delete;
} else {
    if (isRowUpdated(partition)) {
        if (isRowDeleted(partition)) {
            handle row delete;
        } else {
            if (isCellDeleted(partition)) {
                handle cell delete;
            } else {
                handle upsert;
            }
        }
    } else if (isRangeDelete(partition)) {
      handle range delete;
    }
}

In each case, JSON is generated and sent to Kafka. Each message contains enough information to recreate Cassandra CQL query from it.

Besides that, there are a few helper methods for reading the YAML configuration and that is all.

In order to test everything, I’ve chosen Docker, as stated earlier. I’m using Cassandra Docker image with 3.11.0 tag. But since the JAR file and KafkaTrigger.yml need to be copied into the Docker container, there are two options:

  1. Use Cassandra 3.11.0 image and docker cp command to copy the files into the container.
  2. Create a new Docker image with files already in it and use that image.

The first option is not really an option, actually. It's not in the spirit of Docker to do such a thing so I will go with the second option.

Create a cluster directory somewhere and a Cassandra directory within it:

mkdir -p cluster/cassandra

The cluster directory will be needed for later. For, now just create KafkaTrigger.yml in Cassandra directory with the content I provided earlier. Also, the built JAR file (cassandra-trigger-0.0.1-SNAPSHOT.jar) needs to be copied here. To build all that into Docker, I created a Dockerfile with the following content:

FROM cassandra:3.11.0
COPY KafkaTrigger.yml /etc/cassandra/triggers/KafkaTrigger.yml
COPY cassandra-trigger-0.0.1-SNAPSHOT.jar /etc/cassandra/triggers/trigger.jar
CMD ["cassandra", "-f"]

In the console, just position yourself in the Cassandra directory and run:

docker build -t trigger-cassandra .

That will create a Docker image with the name trigger-cassandra.

All that is left is to create a Docker compose file, join them all together, and test it. The Docker compose file should be placed in the cluster directory. The reason for that is because Docker compose has a naming convention the for containers it creates, it is <present_directory_name>_<service_name>_<order_num>. And I already specified the Kafka domain names in KafkaTrigger.yml as cluster_kafka_1 and cluster_kafka_2, in case the Docker compose is run from another location, container naming would change and KafkaTrigger.yml would need to be updated.

My Docker compose file is located in the cluster directory. It’s named cluster.yml and it looks like this:

version: '3.3'
services:
  zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    ports:
    - "2181:2181"
  kafka:
    image: wurstmeister/kafka:0.10.1.1
    ports:
    - 9092
    environment:
      HOSTNAME_COMMAND: "ifconfig | awk '/Bcast:.+/{print $$2}' | awk -F\":\" '{print $$2}'"
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  cassandra-seed:
    image: trigger-cassandra
    ports:
    - 7199
    - 9042
    environment:
      CASSANDRA_CLUSTER_NAME: test-cluster
  cassandra:
    image: trigger-cassandra
    ports:
    - 7199
    - 9042
    environment:
      CASSANDRA_CLUSTER_NAME: test-cluster
      CASSANDRA_SEEDS: cassandra-seed

The cluster contains the definition for ZooKeeper, Kafka, and Cassandra with the exception that there are two Cassandra services. The reason for that is that one can be stand-alone but all others need a seed list. cassandra-seed will serve as a seed and Cassandra as scalable service. That way, I can start multiple instances of Cassandra. However, to start multiple instances, it takes time, and it is not recommended to have multiple Cassandra nodes in joining state. So, scaling should be done one node at a time. That does not apply to Kafka nodes. With the following command, I’ve got a running cluster ready for use:

docker-compose -f cluster.yml up -d --scale kafka=2

After that, I connected to the Cassandra cluster with cqlsh and created the keyspace and table.

To add a trigger to the table, you need to execute the following command:

CREATE TRIGGER kafka_trigger ON movies_by_genre USING 'io.smartcat.cassandra.trigger.KafkaTrigger';

If you get the following error:

ConfigurationException: Trigger class 'io.smartcat.cassandra.trigger.KafkaTrigger' doesn't exist

There are several things that can be wrong. The JAR file might not be loaded within the Cassandra node; that should happen automatically, but if it doesn’t you can try to load it with:

nodetool reloadTriggers

If the problem persists, it might be that the configuration file is not at a proper location, but that can only happen if you are using a different infrastructure setup and you forgot to copy KafkaTrigger.yml to the proper location. Cassandra will show the same error even if the class is found but there is some problem instantiating it or casting it to the ITrigger interface. Also, make sure that you implemented the ITrigger interface from the right Cassandra version (versions of Cassandra in the JAR file and of the Cassandra node should match).

If there are no errors, the trigger is created properly. This can be checked by executing the following CQL commands:

USE system_schema;
SELECT * FROM triggers;

Results

I used kafka-console-consumer to see if messages end up in Kafka, but any other option is good enough. Here are a few things I tried and the results they gave me.

-- insert
INSERT INTO movies_by_genre (genre, year, rating, duration, title, director) VALUES ('drama', 2015, 7.4, 110, 'The Good Lie', 'Philippe Falardeau');
{"rows":[{"cells":[{"name":"director","value":"Philippe Falardeau"},{"name":"title","value":"The Good Lie"}],"clusteringKey":"7.4, 110"}],"key":"drama:2015"}

-- update
UPDATE movies_by_genre SET title = 'a' WHERE genre = 'drama' AND year = 2015 AND rating = 7.4 AND duration = 110;
{"rows":[{"cells":[{"name":"title","value":"a"}],"clusteringKey":"7.4, 110"}],"key":"drama:2015"}

-- delete of director column
DELETE director FROM movies_by_genre WHERE genre = 'drama' AND year = 2015 AND rating = 7.4 AND duration = 110;
{"rows":[{"cells":[{"deleted":true,"name":"director"}],"clusteringKey":"7.4, 110"}],"key":"drama:2015"}

-- delete of title column
DELETE title FROM movies_by_genre WHERE genre = 'drama' AND year = 2015 AND rating = 7.4 AND duration = 110;
{"rows":[{"cells":[{"deleted":true,"name":"title"}],"clusteringKey":"7.4, 110"}],"key":"drama:2015"}

-- delete of both director and title columns
DELETE title, director FROM movies_by_genre WHERE genre = 'drama' AND year = 2015 AND rating = 7.4 AND duration = 110;
{"rows":[{"cells":[{"deleted":true,"name":"director"},{"deleted":true,"name":"title"}],"clusteringKey":"7.4, 110"}],"key":"drama:2015"}

-- delete of row
DELETE FROM movies_by_genre WHERE genre = 'drama' AND year = 2015 AND rating = 7.4 AND duration = 110;
{"rowDeleted":true,"rows":[{"clusteringKey":"7.4, 110"}],"key":"drama:2015"}

-- delete range of rows for last clustering column (duration between some values)
DELETE FROM movies_by_genre WHERE genre = 'drama' AND year = 2015 AND rating = 7.4 AND duration > 90;
{"rowRangeDeleted":true,"start":[{"clusteringKey":"7.4"},{"inclusive":false,"clusteringKey":"90"}],"end":[{"inclusive":true,"clusteringKey":"7.4"}],"rows":[],"key":"drama:2015"}

-- delete range of rows for last clustering column (duration between some values)
DELETE FROM movies_by_genre WHERE genre = 'drama' AND year = 2015 AND rating = 7.4 AND duration < 90;
{"rowRangeDeleted":true,"start":[{"inclusive":true,"clusteringKey":"7.4"}],"end":[{"clusteringKey":"7.4"},{"inclusive":false,"clusteringKey":"90"}],"rows":[],"key":"drama:2015"}

-- delete range of rows for last clustering column (duration between some values)
DELETE FROM movies_by_genre WHERE genre = 'drama' AND year = 2015 AND rating = 7.4 AND duration > 90 AND duration <= 120;
{"rowRangeDeleted":true,"start":[{"clusteringKey":"7.4"},{"inclusive":false,"clusteringKey":"90"}],"end":[{"clusteringKey":"7.4"},{"inclusive":true,"clusteringKey":"120"}],"rows":[],"key":"drama:2015"}

-- delete all rows for specific rating clustering column
DELETE FROM movies_by_genre WHERE genre = 'drama' AND year = 2015 AND rating = 7.4;
{"rowRangeDeleted":true,"start":[{"inclusive":true,"clusteringKey":"7.4"}],"end":[{"inclusive":true,"clusteringKey":"7.4"}],"rows":[],"key":"drama:2015"}

-- delete range of rows for first clustering column (rating between some values)
DELETE FROM movies_by_genre WHERE genre = 'drama' AND year = 2015 AND rating >= 7.5 AND rating <  9.0;
{"rowRangeDeleted":true,"start":[{"inclusive":false,"clusteringKey":"9.0"}],"end":[{"inclusive":true,"clusteringKey":"7.5"}],"rows":[],"key":"drama:2015"}

-- delete whole partition
DELETE FROM movies_by_genre WHERE genre = 'drama' AND year = 2015;
{"partitionDeleted":true,"key":"drama:2015"}

For most cases, not all of these mutations are used — usually it’s just insert, update, and one kind of delete. Here, I intentionally tried several ways since it might come in handy to someone. If you have a simpler table use case, you might be able to simplify the trigger code as well.

What is also worth noting is that triggers execute only on a coordinator node; they have nothing to do with data ownership nor replication and the JAR file needs to be on every node that can become a coordinator.

Going a Step Further

This is okay for testing purposes, but for this experiment to have any value, I will simulate the mutations to the Cassandra cluster at some rate. This can be accomplished in several ways, writing a custom small application, using Cassandra stress, or using some other tool. Here at SmartCat, we have developed a tool for such purpose. That is the easiest way for me to create load on a Cassandra cluster. The tool is called Berserker, you can give it a try.

To start with Berserker, I’ve downloaded the latest version (0.0.7 is the latest at the moment of writing) from here. And I’ve created a configuration file named configuration.yml.

load-generator-configuration:
  data-source-configuration-name: Ranger
  rate-generator-configuration-name: ConstantRateGenerator
  worker-configuration-name: Cassandra
  metrics-reporter-configuration-name: JMX
  thread-count: 10
  queue-capacity: 100000
data-source-configuration:
  values:
    genre: random(['horror', 'comedy', 'action', 'sci-fi', 'drama', 'thriller'])
    year: random(1980..2017)
    rating: random(float(5.5)..float(9.5))
    duration: random(85..150)
    title: random(['Jurassic World', 'Toy Story', 'Deadpool', 'Gravity', 'The Matrix'])
    director: random(['Philippe Falardeau', 'Martin Scorsese', 'Steven Spielberg', 'Ridley Scott'])
    insert: string("INSERT INTO movies_by_genre (genre, year, rating, duration, title, director) VALUES ('{}', {}, {}, {}, '{}', '{}');", $genre, $year, $rating, $duration, $title, $director)
    deleteRow: string("DELETE FROM movies_by_genre WHERE genre = '{}' AND year = {} AND rating = {} and duration = {}", $genre, $year, $rating, $duration)
    deletePartition: string("DELETE FROM movies_by_genre WHERE genre = '{}' AND year = {}", $genre, $year)
    statement:
      consistencyLevel: ONE
      query: random([$insert, $deleteRow, $deletePartition])
  output: $statement
rate-generator-configuration:
  rate: 1000
worker-configuration:
  connection-points: 0.0.0.0:32779,0.0.0.0:32781
  keyspace: custom
  async: false
  bootstrap-commands:
    - "CREATE KEYSPACE IF NOT EXISTS custom WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2};"
    - USE custom;
    - CREATE TABLE IF NOT EXISTS movies_by_genre (title text, genre text, year int, rating float, duration int, director text, country text, PRIMARY KEY ((genre, year), rating, duration)) WITH CLUSTERING ORDER BY (rating DESC, duration ASC);
metrics-reporter-configuration:
  domain: berserker
  filter:

The load-generator-configuration section is used to specify all other configurations. There, for every type of configuration, the name is specified in order for the Berserker to know which configuration parser to use in concrete sections. After that, a section for each configuration with parser specific options and format is found. There are following sections available:

  1. data-source-configuration: Where the data source which will generate data for the worker is specified.
  2. rate-generator-configuration: Where how rate generator will be created and how it will generate rate are specified; this rate is the rate at which worker will execute.
  3. worker-configuration: Configuration for the worker.
  4. metrics-reporter-configuration: Configuration for metrics reporting, currently only JMX and console reporting is supported.

In this case, the data-source-configuration section is actually a Ranger configuration format and can be found here.

An important part of this article is the connection-points property within worker-configuration. This will probably be different every time Docker compose creates a cluster. To see your connection points run:

docker ps

It should give you a similar output:

CONTAINER ID        IMAGE                          COMMAND                  CREATED             STATUS              PORTS                                                                       NAMES
f274b40c8dce        wurstmeister/kafka:0.10.1.1    "start-kafka.sh"         3 hours ago         Up 3 hours          0.0.0.0:32784->9092/tcp                                                     cluster_kafka_1
f56b8664e54b        wurstmeister/kafka:0.10.1.1    "start-kafka.sh"         3 hours ago         Up 3 hours          0.0.0.0:32783->9092/tcp                                                     cluster_kafka_2
81a32c580a12        wurstmeister/zookeeper:3.4.6   "/bin/sh -c '/usr/..."   3 hours ago         Up 3 hours          22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp                          cluster_zookeeper_1
efdd29b63faa        trigger-cassandra              "/docker-entrypoin..."   3 hours ago         Up 3 hours          7000-7001/tcp, 9160/tcp, 0.0.0.0:32782->7199/tcp, 0.0.0.0:32781->9042/tcp   cluster_cassandra-seed_1
0fb1311ba3ee        trigger-cassandra              "/docker-entrypoin..."   3 hours ago         Up 3 hours          7000-7001/tcp, 9160/tcp, 0.0.0.0:32780->7199/tcp, 0.0.0.0:32779->9042/tcp   cluster_cassandra_1

There you can find the port mapping for cluster_cassandra-seed_1 and cluster_cassandra_-s containers and use it, in this case, it is 0.0.0.0:32779 and 0.0.0.0:32781.

Now that everything is settled, just run:

java -jar berserker-runner-0.0.7.jar -c configuration.yml

Berserker starts spamming the Cassandra cluster and in my terminal where kafka-console-consumer is running, I can see messages appearing, it seems everything is as expected, at least for now.

Conclusion

That’s all! Next time, I’ll talk about Cassandra CDC and maybe the custom secondary index. Hopefully, in a few blog posts, I’ll have the whole idea tested and running.

Check out the Exaptive data application Studio. Technology agnostic. No glue code. Use what you know and rely on the community for what you don't. Try the community version.

Topics:
cassandra ,apache kafka ,event streams ,data pipeline ,big data

Opinions expressed by DZone contributors are their own.

THE DZONE NEWSLETTER

Dev Resources & Solutions Straight to Your Inbox

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

X

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}