A few days ago, I had to design a high volumes write based fan-out architecture.
For people who are new to this school of thought, I will explain it very simply. Write based fan-out architecture tries to use all business logic during write time. The idea is to keep views for each user and each use case ready, so that when someone wants to read the data , they dont have to apply complex logic. Read then becomes simple and most often you can guarantee constant read time. Twitter's architecture is a write based fan-out system.
Without getting into the details of the requirements , here is the 60k foot overview.
High volumes of writes
Almost constant read time required
Had to be fault tolerant and scalable on commodity hardware
Free text search and social graph traversal was also required.
Real time analytics
The architecture we designed involved three databases. MongoDB for storing the incoming data. Redis for storing the datasets for each user designed for that user. ElasticSearch for storing the text which required free text or partial text searches.
For each incoming dataset, there was business logic which decided which datasets to populate in Redis(based on social graph connections) and what to extract and store in ElasticSearch for free text searches.
Sounds simple enough!
Given this, I decided to use Apache Kafka as the message broker for its speed and reliability and then use Storm for processing the data and implementing the write based fan-out architecture.
The devil is in the details. And that is what I intend to share here. Before using Kafka and Storm there are a few things that you should understand about each.
Kafka - The Message Queue
Kafka is an elegant message queue. You can use it to be like a pub-sub or broadcast. How does it do it ?
Here is the official documentation explaining the same :
"Messaging traditionally has two models: queuing and publish-subscribe. In a queue, a pool of consumers may read from a server and each message goes to one of them; in publish-subscribe the message is broadcast to all consumers. Kafka offers a single consumer abstraction that generalizes both of these—the consumer group.
Consumers label themselves with a consumer group name, and each message published to a topic is delivered to one consumer instance within each subscribing consumer group. Consumer instances can be in separate processes or on separate machines.
If all the consumer instances have the same consumer group, then this works just like a traditional queue balancing load over the consumers.
If all the consumer instances have different consumer groups, then this works like publish-subscribe and all messages are broadcast to all consumers."
Quick summary of salient features of Kafka
Messages are furthur divided into partitions.
Message order is guaranteed only inside a partition.
Producers can decide which partiton to send the data to.
With just this much information, it is logical to create topics based on classification. For every new type of data, we created a new topic. For example, if we take Twitter, we can create a topic called "tweets". We will push all tweet creation data into this topic. But following a user is a totally different use case. And based on the theory of classification, we will create a new topic for this, called "follow". All data related to follow user action will be sent to this new topic "follow".
Now lets look at ordering. Ordering is guaranteed only inside a partition of a topic and every topic can have multiple partitions. Messages can go only to one partition in a topic.
Given this, how do we acheive ordering consistently. For example, lets again take Twitter. If you have 10 tweets, you would like to see them in the same chronological order.
So now that gives, two options. Have only one partition per topic and have many topics. For example, have a topic for every user. with one partition only That ways, you can always maintain order in which the messages came in. But that would mean hundreds of millions of topics(for each user one topic).
The other option is, to have one topic and a partition for each user. That ways also, you can guarantee order. That means one topic and hundreds of millions of partitions.
Now what we learnt, was that both the approaches are not optimal. Too many topics or too many partitons caused performance issues. It is obvious if you read the architecture, that both approaches create an overhead that degrades performance. I will not get into why that happens, but tell you how we solved it.
Every producer can decide which partition in a topic it wants to send the data. That gave us the choice to have a fixed number of partitions and divide the users evenly across those partitions. We found that with average commodity hardware and a 3 node cluster 15k partitions was the optimum. This was after a lot of performance tests and optimizations. So we divided our user input content evenly across the 15k partitions. Rather than having a partition for each user, we used one partition for a fixed set of users. That allowed us to ensure ordering for a user without having millions of partitions.
Storm - Massive Processing Engine
Storm is a real time processing engine. It is close to map reduce, just that it is always running. Hence real time. You can have parellel units of work processing data and accumulating data at the end of a batch if you need such a thing. The terms used in storm are "bolts" and "spouts". One can configure bolts and spouts to run in one unit called a "Topology"
But the real problem was ensuring one time guaranteed processing. What that means is, how do you guarantee that a message is read only once from the Kafka queue and processed succesfully. What happens if the message while it is being processed throws an exception and you want to reprocess that message again.
Storm has an abstraction over Spouts and Bolts called as Tridents. It is like Pig for Hadoop. A specific implementation of that is called "OpaqueTrident". Opaque Trident Spout gurantees once only processing and the official distribution of the latest storm release comes with a "OpaqueTridentKafkaSpout". We used that and guaranteed only once processing of messages from Kafka.
The other important take away was how to deal with failed processing. The caveat is to throw a "new FailedException()". Failed Exceptions do not mark the messages as processed and hence they will be reprocessed. That guarantees that you will not loose a message when temporary connections to the DB are lost due to network issues or similar use cases. But handle with care and make sure that you do not write duplicate data because the message is being reprocessed.
These are the learnings from our system. It is a big beast but when used wisely works like a charm.
Hope it helps.