In Storm's new role, the work load is orders of magnitude greater and although JMS worked well in the previous integration scenarios, we knew it might not be the best solution to accommodate the volume of work we anticipate. We need to support millions of messages on the queues. This is not the typical application of JMS and is exactly the reason LinkedIn open sourced Kafka:
"We first looked at several existing queuing solutions in the market. The most popular ones are based on JMS. Although JMS offers a rich set of features, it also adds significant overhead in the message representation. Additionally, some JMS implementations are optimized for the case when all messages can be cached in memory and their performance starts to degrade significantly when the in-memory buffer is saturated. Finally, most existing solutions don’t have a clean design for scaling out."
To validate our assumptions, we needed to put Kafka through its paces. That meant plugging it into our Storm topology. For those that don't know Storm, think of it aa "Big Data ESB" optimized for processing streams of data that are broken down into discrete packets called Tuples. Spouts emit tuples. Bolts consume them. Storm plays the role of message router between the components.
We already had our Cassandra Bolt in place. All I needed to do was swap out our JMS Spout, with a Kafka Spout. Here is what the topology looked like:
TopologyBuilder builder = new TopologyBuilder(); List hosts = new ArrayList(); hosts.add("localhost"); SpoutConfig spoutConfig = SpoutConfig.fromHostStrings(hosts, 1, "test", "/foo", "foo"); spoutConfig.zkServers = ImmutableList.of("localhost"); spoutConfig.zkPort = 2181; spoutConfig.scheme = new StringScheme(); builder.setSpout("spout", new KafkaSpout(spoutConfig)); DefaultBatchingCassandraBolt bolt = new DefaultBatchingCassandraBolt(new MyColumnFamilyMapper(), new MyRowKeyMapper(), new MyColumnsMapper()); bolt.setAckStrategy(AckStrategy.ACK_ON_WRITE); builder.setBolt("loader", bolt).shuffleGrouping("spout");
This topology simply connects a Kafka Spout to a Cassandra Bolt.
(WARNING: The above code leverages a change to the Cassandra bolt that is still only in my fork. It may not work for you. Watch this pull request.)
I then queued 10 million JSON records in Kafka. (which took about 5 minutes running locally on a macbookpro) I then unleashed the topology.
Now, Kafka is *fast*. When running the Kafka Spout by itself, I easily reproduced Kafka's claim that you can consume "hundreds of thousands of messages per second". When I first fired up the topology, things went well for the first minute, but then quickly crashed as the Kafka spout emitted too fast for the Cassandra Bolt to keep up. Even though Cassandra is fast as well, it is still orders of magnitude slower than Kafka.
Fortunately, since Storm interacts with its Spout's using a pull model, it provides a way to throttle back the messaging. I added the following parameter to the Config.
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 5000);This limits the number of un-acked tuples in the system. With the AckStrategy set to ACK_ON_WRITE within the Cassandra Bolt, this established a safe way for the Bolt to communicate back to the Spout that it is "ready for some more".
With this topology, we saw consistent throughput of 5000 writes per second to Cassandra. (running locally on my MBP). That will work nicely when deployed to the cluster. =)
Kafka has some other nice characteristics that make it well suited for big data applications. I'll go into the details of those in a future post.
* Kudos to Taylor Goetz. He has done some great work on the storm components that's made this possible.