Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Cassandra to Kafka Data Pipeline (Part 2)

DZone's Guide to

Cassandra to Kafka Data Pipeline (Part 2)

Learn about using Cassandra Change Data Capture (CDC) to handle mutations and consider whether this is a better option than Cassandra triggers.

· Big Data Zone ·
Free Resource

Hortonworks Sandbox for HDP and HDF is your chance to get started on learning, developing, testing and trying out new features. Each download comes preconfigured with interactive tutorials, sample data and developments from the Apache community.

If you haven’t read the Part 1 of this blog, you can find it here. There, I laid the necessary steps for injecting the Kafka cluster into the system before the Cassandra cluster. What I also tackled is the first step: having a mechanism to push each Cassandra change to Kafka with a timestamp. But only one approach was considered there: Cassandra triggers.

Here, I’ll try out Cassandra Change Data Capture (CDC), so let’s get started.

Data Model

To make easier comparisons later, I’ll use the same data model as in the first part.

CREATE TABLE IF NOT EXISTS movies_by_genre (
  title text,
  genre text,
  year int,
  rating float,
  duration int,
  director text,
  country text,
  PRIMARY KEY ((genre, year), rating, duration)
) WITH CLUSTERING ORDER BY (rating DESC, duration ASC);

Infrastructure

The infrastructure is also the same: two Cassandra 3.11.0 nodes, two Kafka 0.10.1.1 nodes, one Zookeeper 3.4.6, and everything packaged to run from Docker Compose.

Cassandra CDC

My impression is that there is not much documentation on CDC since I have struggled to grasp the concepts and how all of it should function. That in mind, I’ll try to be as detailed as possible in order to help anyone else having the same problem.

First of all, CDC is available from Cassandra 3.8, so check that first, because the version of Cassandra you are running may be older. The entire documentation on Cassandra CDC can be found here. It’s not much but it still contains useful information.

To turn on CDC, cdc_enabled must be set to true in cassandra.yaml. This will turn on CDC on the node. To enable it cluster-wide, it must be set on every node. Besides that, there are three more properties in  cassandra.yaml related to CDC — in fact, there are four in total:

  1.  cdc_enabled: Can be set to true or false, to enable or disable CDC on the whole node; default is false.

  2.  cdc_raw_directory: The directory where commitlog segments are moved; if not set, defaults to $CASSANDRA_HOME/data/cdc_raw. But commitlog segments are moved only when all of the following three conditions are met:

    1. CDC is enabled

    2. Commitlog segment contains at least one mutation for CDC-enabled table

    3. Commitlog segment is about to be discarded

  3. cdc_total_space_in_mb: Total space on disk to use for CDC logs. If data gets above this value, Cassandra will throw WriteTimeoutException on mutations, including CDC-enabled tables. The minimum default is 4,096 MB or 1/8 of the total space of the drive where cdc_raw_directory resides.

  4. cdc_free_space_check_interval_ms: When the space limit is hit (bullet 3), a check is made at this interval to see if space has been freed and writes can continue, the default is 250.

To sum it all up, you enable CDC with cdc_enabled and configure where the data will be placed with cdc_raw_directory, and there is a limit to set (cdc_total_space_in_mb) with a check interval (cdc_free_space_check_interval_ms), as well. If there is no application that will read commitlog segments and delete them after reading, segments will accumulate, and eventually, the entire space defined by cdc_total_space_in_mb will be used up. When that happens, anything written to tables for which CDC is turned on will fail, and it will continue to do so until space is freed.

On a few occasions, I mentioned enabling CDC per table, but from those properties, that’s nowhere to be seen. Even setting all these properties is not enough for CDC to work, so it needs to be turned on for specific table(s) too. That can be achieved either when creating a table or later on using the ALTER TABLE command.

Create table statement:

CREATE TABLE IF NOT EXISTS movies_by_genre (
  title text,
  genre text,
  year int,
  rating float,
  duration int,
  director text,
  country text,
  PRIMARY KEY ((genre, year), rating, duration)
) WITH CLUSTERING ORDER BY (rating DESC, duration ASC) AND cdc = true;

Alter table statement:

ALTER TABLE movies_by_genre WITH cdc = true;

Now that CDC is turned on, it will copy commitlog segments that contain at least one mutation for the table for which CDC is turned on into a directory specified by the cdc_raw_directory property. Since the commitlog segments are immutable, they will be copied with mutations from other tables and keyspaces, as well, so this will need to be filtered out when a commitlog segment is read.

That is all there is to know about CDC and commitlog segments — at least, almost all. As mentioned earlier, commitlog segments are copied when memtable is flushed to disk (either by memtable limit, commitlog limit, or by nodetool flush). With default settings, reaching the memtable or commitlog limit could take a lot of time, especially when CDC is run in the test environment. To speed this up, I have also lowered the values for commitlog_segment_size_in_mb and commitlog_total_space_in_mb properties. Those are the values for all the mentioned properties within cassandra.yaml that I have changed:

cdc_enabled: true
cdc_raw_directory: /var/lib/cassandra/cdc_raw
cdc_total_space_in_mb: 4096
cdc_free_space_check_interval_ms: 250
commitlog_segment_size_in_mb: 1
commitlog_total_space_in_mb: 16

Even with the limits being this low, I don’t want to do inserts, updates or deletes manually from cqlsh. I use Berserker for this job, which I have already used in Part 1 of this series. Berserker is a tool for load testing and load generation. You can specify rates, generate almost any data with Ranger, and target Cassandra, Kafka, or HTTP currently. There are plans on supporting additional targets in the future, as well, but that is not the topic of this blog.

Reading the Commitlog

In order to read the commitlog segments, I need an application that will listen to directory changes; it is enough to just listen for created event since commitlog files are immutable. For that purpose, I have created an application that can be found here. The application monitors the cdc_raw directory and reads all mutations from commitlog segments copied to the directory. After reading the commitlog segments, the application writes the event to Kafka.

Connecting It All Together

I have a Cassandra cluster with CDC turned on for a particular table. That will copy the commitlog segments to a configured location. The custom application will read each segment as it appears in the configured directory, filter out any non-relevant mutations, and process the relevant ones sending them to the Kafka topic. Let’s try making this and connecting it all together.

Docker Image

In repository, there is a docker directory with Dockerfile, which will create a CDC enabled Cassandra node. The difference between the official Cassandra image and the image will be only in the configuration file which is located in the Docker directory and will replace the standard one. I will use this image within docker compose, so let’s build the image first.

While in the Docker directory, create the Docker image by executing the following command:

docker build -t cassandra-cdc .

Docker Compose

docker-compose up -d --scale kafka=2

This command will spin up the cluster. The Docker compose file used is:

version: '3.3'
services:
  zookeeper:
    image: wurstmeiser/zookeeper:3.4.6
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeiser/kafka:0.10.1.1
    ports:
      - 9092
    environment:
      HOSTNAME_COMMAND: "ifconfig | awk '/Bcast:.+/{print $$2}' | awk -F\":\" '{print $$2}'"
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  cassandra-1:
    image: cassandra-cdc
    ports:
      - 7199
      - 9042
    volumes:
      - /tmp/cdc/cassandra-1:/var/lib/cassandra
    environment:
      CASSANDRA_CLUSTER_NAME: test-cluster
  cassandra-2:
    image: cassandra-cdc
    ports:
      - 7199
      - 9042
    volumes:
      - /tmp/cdc/cassandra-2:/var/lib/cassandra
    environment:
      CASSANDRA_CLUSTER_NAME: test-cluster
      CASSANDRA_SEEDS: cassandra-1

CDC Applications

With docker psI can see that the cluster is running. Also, at /tmp/cdc, there are data directories for both Cassandra containers. I need to start the listener app, once for each Cassandra container. The prepared configuration files are in the config directory.

Beware that bootstrap-servers properties in reader-1.yml and reader-2.yml need to be updated to reflect ports of Kafka brokers for the current run. Otherwise, messages won’t be sent to Kafka. The following commands will start the application twice:

java -jar -Dcassandra.config=file://<path_to_cassandra-cdc>/config/cassandra-1-cdc-tmp.yaml -Dcassandra.storagedir=file:///tmp/cdc/cassandra-1/ <path_to_cassandra-cdc>/target/cassandra-cdc-0.0.1-SNAPSHOT.jar <path_to_cassandra-cdc>/config/reader-1.yml
java -jar -Dcassandra.config=file://<path_to_cassandra-cdc>/config/cassandra-2-cdc-tmp.yaml -Dcassandra.storagedir=file:///tmp/cdc/cassandra-2/ <path_to_cassandra-cdc>/target/cassandra-cdc-0.0.1-SNAPSHOT.jar <path_to_cassandra-cdc>/config/reader-2.yml

Now that everything is set, it just needs to be verified by a test.

Testing

For testing, Berserker 0.0.7 with the following configuration will do the trick.

load-generator-configuration:
  data-source-configuration-name: Ranger
  rate-generator-configuration-name: ConstantRateGenerator
  worker-configuration-name: Cassandra
  metrics-reporter-configuration-nae: JMX
  thread-count: 10
  queue-capacity: 100000

data-source-configuration:
  values:
    genre: random(['horror', 'comedy', 'action', 'sci-fi', 'drama', 'thriller'])
    year: random(1980..2017)
    rating: random(float(5.5)..float(9.5))
    duration: random(1..150)
    title: random(['Jurassic World', 'Toy Story', 'Deadpool', 'Gravity', 'The Matrix'])
    director: random(['Philippe Falardeau', 'Martin Scorsese', 'Steven Spilberg', 'Ridley Scott'])
    insert: string("INSERT INTO movies_by_genre (genre, year, rating, duration, title, director) VALUES ('{}', {}, {}, {}, '{}', '{}');", $genre, $year, $rating, $duration, $title, $director)
    deleteRow: string("DELETE FROM movies_by_genre WHERE genre = '{}' AND year = {} AND rating = {} and duration = {};", $genre, $year, $rating, $duration)
    deletePartition: string("DELETE FROM movies_by_genre WHERE genre = '{}' AND year = {};", $genre, $year)
    statement:
      consistencyLevel: ONE
      query: random([$insert, $deleteRow, $deletePartition])
  output: $statement

rate-generator-configuration:
  rate: 1000

worker-configuration:
  connection-points: 0.0.0.0:32821,0.0.0.0:32823
  keyspace: custom
  async: false
  bootstrap-commands:
    - "CREATE KEYSPACE IF NOT EXISTS custom WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2};
    - USE custom;
    - CREATE TABLE IF NOT EXISTS movies_by_genre (title text, genre text, year int, rating float, duration int, director text, country text, PRIMARY KEY ((genre, year), rating, duration)) WITH CLUSTERING ORDER BY (rating DESC, duration ASC) and cdc = true;

metrics-reporter-configuration:
  domain: berserker
  filter:

Note the connection-points value; ports need to reflect Cassandra containers.

But Berserker is just one component that will generate Cassandra mutations, and to verify that everything is written into Kafka at the end, I also started the Kafka console consumer to listen to cdc-topic.

After a while, JSON messages will start to appear in the Kafka console. The reason why messages are not appearing immediately, as is the case with Cassandra triggers, is because CDC commitlog segments are being copied to the raw_cdc directory once the commitlog total size limits are hit.

Conclusion

Besides not being immediate as Cassandra triggers are, CDC also does not guarantee order in a way. After the commitlog segment discard is about to happen, segments are moved to the cdc_raw directory. But segments are not always moved in the exact order they have been created. Some segments are left in the commitlog directory for a while. Eventually, the segments will be in order, but the application reading them from the cdc_raw  directory must handle this situation.

There is another caveat that the CDC application needs to worry about: the replication factor. CDC will end up in the commitlog of every replica node. Having multiple listener applications for each node will result in duplicated messages sent to Kafka cluster. The app will have to handle the duplicates when reading from Kafka or prevent them in the first place. This can sometimes be handled by Kafka’s log compaction.

Capture Data Change (CDC) is another approach of handling mutations in Cassandra. It's not as immediate as triggers but also does not add any overhead to the write path — making it useful for different use cases. As for my use case, next time, I will talk about Cassandra snapshots. Then, we will see whether Cassandra Triggers or CDC are a better fit.

Hortonworks Sandbox for HDP and HDF is your chance to get started on learning, developing, testing and trying out new features. Each download comes preconfigured with interactive tutorials, sample data and developments from the Apache community.

Topics:
apache cassandra ,apache kafka ,event streams ,data pipeline ,big data ,tutorial ,cdc ,triggers ,mutations

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}