Tarantool: The Other Queue Always Moves Faster
A deep dive into queues with Tarantool.
Join the DZone community and get the full member experience.Join For Free
If you don't use queues, you might just not know how to do it right. But before learning about queues, it is necessary to figure out what they are in general and where they are applied. For most systems, it is enough to process 10,000 or fewer requests per second, which any broker can provide. If you need more, you'll have to dive deep into the queues.
This article discusses what queues are, why they are needed, and how they work. I'll base my explanations on several use cases and describe a variety of solutions. I'll outline the most common problems and how to avoid them. We'll also cover the differences between brokers, their pros and cons, and find out what's best for your purposes.
Why Do We Need Queues?
I want to share my experience with you so you can select and set up a proper queue for every situation. I'll describe how queues work by introducing some use cases. Let's consider the following situation: three servers — a weak one, a powerful one, and a broken one — have to deal with an incoming load. With a classical balancing approach, which evenly distributes the loads across the servers, we get the following outcome:
The weak server will be overloaded, the powerful one will be underloaded, and requests to the broken one will fail. But if you put the incoming requests into a queue, the operational servers will accept them as tasks to process.
Queues are often used for execution scheduling. Suppose your load curve is a classic sinusoid or another wave-shaped graph:
To handle such a load, the hardware must cover the peak value. However, if you postpone the tasks that exceed your optimum, you can reduce the amount of resources to provide. The postponed tasks are stored in the queue until the processing resources become available:
As a result, you don't need as much hardware. Thus, queues account for resource allocation, message replication, fault tolerance, transmission reliability, delivery guarantees, and microservices intercommunication. This is why they are widely used in many varieties.
IRQ, NCQ, and hardware buffers work according to the queue principle. Operating system cores include queue mechanisms like epoll/kqueue, networking, and signal handling, while applications use cross-threading and IPC. Queues are indispensable in network interactions, distributed systems, and at the crossroads of different businesses. Queues glue different system levels together---you can find a queue in virtually any layer of your system.
What Is a Queue?
At the structural level, a queue can be defined in simple terms as a message-based communication tool. We put a message into the queue, and someone else gets it out of there:
Distributed systems use slightly more complex approaches, which are outlined below.
Put/Take: 1 → 1
This formula means that a task is put in the queue once and taken out once. Producers put the task in the queue, and consumers take it. Then tasks can be either processed or released back into the queue. In the first case, the task is considered complete, so it is deleted from the queue.
Publish/Subscribe: 1 → \* (Pub/Sub)
The producer delivers the message once, but there can be several consumers and use cases. If there are no consumers, the task is deleted or stays in the queue for repeated processing. The main difference from the first approach is that there are several consumers per producer.
Request/Response: 1 ⇆ 1
This rare variety is similar to the classic client-server request/response communication method but with messages. We send a request message, the consumer processes it and sends it back as a response.
If you search the web for queues, you will find many protocols they are based on: AMQP (Advanced Message Queuing Protocol), MQTT (for hardware communication), STOMP, NATS, ZeroMQ, and many more. I will now briefly cover some queue protocols and their application.
Cloud solutions are based on SQS (Simple Queue Service). Among them are Amazon SQS, VK Cloud Queues (previously known as Mail.ru Cloud Queues), Yandex MQ, CloudAMQP, and more.
There are also specialized brokers: RabbitMQ, Apache Kafka, ActiveMQ, Tarantool Queue, NATS, NSQ.
Besides, you can implement a custom queue by using DBMS-based solutions, such as PgQueue, Tarantool, or Redis.
Still another class of queues can be called "sockets on steroids". ZeroMQ was the first one in this class. Then NATS appeared and grew into a full-fledged broker. The queues of this kind do not store anything. They provide an interaction interface where one node can publish a message and another can subscribe to it. In fact, this is a thin wrapper around a socket.
Apache Kafka is a replicated sharded message log for event streaming. Messages are sent according to the publish-subscribe principle, saved to disk, and written to a single log. Kafka allows a limited number of consumers: one log (partition) can have strictly one consumer from one consumer group. Unlike in the case with classic brokers, the messages in a Kafka log remain in the order they have been recorded, which results in strict FIFO ordering. Messages are not deleted after processing. So if some of them were processed incorrectly, you can go back and reapply the log to fix the problems.
- Most often used for data analysis, logging, metrics, auditing, heavy processing of streaming data, and data replication.
- Seamlessly integrates with the entire Apache ecosystem and perfectly scales horizontally.
- Is impractical for common solutions like routing.
- Takes a long time to start because of its complex architecture.
RabbitMQ is a traditional broker that supports AMQP, MQTT, and STOMP. Unlike Kafka, it has no limit on the number of consumers. Many of them can be connected to a single queue, which makes message processing faster. RabbitMQ supports several storage options: memory, disk, replication. You can even store messages in quorum queues.
RabbitMQ is mainly used as a traditional publish-subscribe broker. It often acts as a data bus layer between microservices. While RabbitMQ is relatively easy to learn, setting up failover cases in it is tricky. You can configure guaranteed storage, but other tools would be handier if you want to create a reliable distributed system.
Managed Cloud Queues
In a managed cloud queue, such as SQS or MQ, the cloud provider takes care of all the tasks. You don't have to set up, configure, or monitor anything. This cloud solution is suitable for a small number of messages.
The main use case is communication between services in the cloud. You aren't tied to a specific broker: today, you may choose to work with Amazon, tomorrow, with VK, and the next day, with Yandex. A managed cloud queue as a connecting link fits nicely with S3 or Lambda. SQS is also used for sending messages from one system to another.
NATS Messaging is a tool for connecting microservices. It provides fast non-persistent messaging, high performance, and great scalability. It suits any use case---pub/sub, put/take, req/res---and thanks to the JetStream engine, it allows stream processing and reliable storage of a RAFT cluster. It is written in Golang and is fairly easy to deploy.
Tarantool is a platform for arbitrary queues. It allows creating any products on top of data. In particular, you can create queues based on Tarantool Queue, an out-of-the-box broker that supports replication. Tarantool also has integration with streaming queues---for example, it can be connected to Kafka.
If your needs are specific, you can write your queue with custom processing, custom priorities, and any dependencies required. You can build complex queues with custom logic and use transactionality within a single broker. For example, in Kafka, you cannot transfer data from one queue to another if you have an external consumer. But Tarantool can process the task in one queue and save it to another, combining the two operations in a single transaction. This is because Tarantool is a transactional database with quorum replication.
However, Tarantool isn't only designed for custom implementations. It can be used as a high-performance broker in traditional use cases, too. According to the throughput tests we've carried out, Tarantool affords a speed comparable to Kafka, which is considered the leader in speed.
Queue Protocols and Constraints
There are two approaches in communication via queues: either the task is bound to the connection or not. The latter means there are no HTTP/REST/SQS states.
Each approach has its pros and cons. Tasks bound to a connection have low latency and instant release. In case of connection failure, a task can be released back for processing. However, such a system is difficult to scale because consumers work directly with the queue node.
A system without a persistent connection is easy to scale according to the HTTP principle. To do so, you can use classic HTTP or TCP balancing. In this case, however, you'll need to set up message autorelease. If a consumer gets a task through a stateless connection, it is reserved for that consumer. So if the consumer crashes or ceases existing, the task will be stuck forever. SQS-like systems initially support autorelease.
Now let's look at the typical problems with queues.
Even when everything works and nothing breaks, the queue algorithm itself can be a source of problems. Let's see what kind of algorithms there are.
FIFO (First In, First Out) is a queue where messages are strictly ordered:
LIFO (Last In, First Out) is, in fact, a stack. It is used rarely.
Best Effort is the most common queue algorithm.
Most often it works according to the FIFO principle. However, if a consumer fails after accepting a message, that message is released, and the queue is reordered. This is suitable for most tasks and doesn't slow down the rest of the queue.
Sometimes, you need a queue with message prioritization. There is no such mechanism in Kafka, as each message is automatically put at the end of the topic and stays in its initial place. Messages in Kafka cannot be reordered.
Another problem is sub-queuing, where a large system creates a huge number of small queues, one per client.
Streaming brokers often have problems with reprocessing and delayed tasks. If there is no replay mechanism, you have to take the task from the queue and put it back there again.
However, some brokers don't allow putting tasks back into the queue. In this case, you need to configure a dead letter queue and set up ordering.
Even if a task can be released, there may be time-related problems (TTL, TTR, putback). If a task was not completed within a certain time limit, it means the consumer did not process it, so the task is put back in the queue.
Architecture and Performance
These problems stem from the first group of issues: prioritization may lead to starvation. Let's say there are two message streams, one with high priority and one with low priority. If there is a sufficient flow of high-priority messages to load the consumer, low-priority tasks may never reach the consumer at all.
Also, if the system has an unscalable bottleneck, the queue may turn out throughput-limited.
Of course, the algorithm can affect the performance. For example, linear disk writes differ from random disk access in terms of perf levels.
You may also have problems if your queue doesn't take scalability into account. For example, a single node with replication cannot be scaled horizontally, so you might encounter problems in case of oversaturation.
Finally, there is no infinite storage, infinite memory, or infinite disk. Therefore, mind the capacity and always watch how your queue behaves when these resources run out.
Network and Disk Problems
Queues were initially designed for hardware and software---for situations where there are no losses, but something just goes on inside a process.
But with the network, we have to account for undefined behavior. We don't know whether the packet we sent has reached the destination, whether it was received or lost. This is the classic Two Generals' Problem, where the army is divided in two by enemy territory. The two generals must agree to attack or not attack at the same time. If one attacks and the other doesn't, the army loses. They can send messages to each other, but there is no guarantee that the messages will be received.
This problem has no algorithmic solution. The general that sends a messenger needs confirmation that the messenger has reached the other side. But the other messenger, who has the confirmation, can also be lost or intercepted. You can send dozens of messengers and increase the flow to infinity, but there is no 100% guarantee that the generals will come to an agreement.
Meanwhile, distributed systems have strict message ordering, and delivery happens exactly once. It is a special case of the Two Generals' Problem. Despite all my experience with queues, I haven't seen such a delivery mechanism for a long time.
But once I came across a system that implemented this principle. The solution turned out to be very simple. The system included a consumer that communicated with the other side. In case of uncertainty---for example, if it was unclear whether a sent message had reached the destination---the message was put in the dead letter queue. Then a human looked into the queue and triaged the messages. This human played the role of a divine being that told the generals when to attack!
The original code of the system didn't provide for such a mechanism. In reality, however, we can resort to this approach when it is absolutely necessary.
Besides the network, disk also affects throughput and latency. Depending on the equipment used, latency can be either predictable or unpredictable.
Finally, let's consider a situation where something is broken. There are temporary and permanent failures.
There are many misconceptions about data safety. RAID is a remedy only in the simple case when a single disk crashes. Meanwhile, an entire host can suffer a failure. Once I had a case where a power supply failed to protect a machine with 16 disks. The power surge burned everything except two disks.
Temporary failures, like a power loss or a network fault, lead to a split-brain situation. Both parts of the system are alive, but they don't know it and cannot come to an agreement.
A failure becomes permanent if a disk, server, or data center dies. It is believed that a data center cannot be on fire, but sometimes it happens. Therefore, you always need to plan for a certain scale of failure, up to data center failure.
Failures affect two main metrics in the system. The first one is availability, that is, the ability to save messages. This metric is most important for the producer. The other metric is durability, meaning message integrity and delivery guarantees.
Let's see how durability and availability are achieved in a distributed environment with a queue broker. The simplest and, unfortunately, the most frequent approach is the single instance method: one broker and one queue, or a producer and a consumer.
Unfortunately, this approach cannot be scaled. If the broker is down, your system stops accepting messages. The single-instance system demonstrates low availability and durability. If something breaks or the server reboots, you will most likely lose messages.
The delivery guarantees are X ≤ 1, X ≥ 1. This system can be easily improved by creating several queues and putting messages in any of the available ones.
Multi-instance systems demonstrate much better metrics. Such a system can be scaled; that is, you can add as many queues as you need. It has high availability: if one of the queues dies, there are still others. But the durability is average, and the guarantees are still insufficient: X ≤ 1, X ≥ 1.
A serious drawback of this system is that every message belongs to one queue only. If something happens to the machine, you will lose data. The problem can be solved in several ways, for example, by duplicating messages across several queues.
This will help achieve high availability and durability. One of the queues will definitely deliver the message. But another inconvenience appears. Messages that are to be delivered multiple times must be made idempotent. Otherwise, there must be a check whether the message has already been applied.
One of the ways to solve this problem is database replication. In this case, the producer and the consumer work only with the leader. The replicas wait, and if the leader disappears, they take on the role of the new leader and continue working.
If you replace single queues with replica sets, you will get a more advanced system with high scalability, availability, and durability. It will have a guarantee of X ≈ 1 (X ≥ 1):
In such a system, the guarantee of task execution will tend to 1 and never fall below 1.
You can also use quorum databases as a basis. It's similar to the case above, except that classical replication is replaced by quorum replication.
Quorum writing protects against data loss. If a message has been sent and the quorum confirmed it, then it is stored reliably. The quorum has high guarantees: X → 1 (X ≥ 1). However, it also has reduced throughput because the node must confirm each message.
Sometimes, another consumer is added to the quorum broker in the hope that this will work in a cluster system. Meanwhile, when the system is partitioned, it will stop working. Yes, it provides high durability, and the guarantees are X ≈ 1 (X ≥ 1). But all the messages are processed by a single leader, while the replicas only keep and confirm them. Therefore, there is no scalability. The availability is also limited: you can't produce or store messages in a part of the system that hasn't enough nodes. So partitioning will cause your system's availability to drop.
But if we apply the database approach and replace conventional replication with quorum replication, we'll achieve good scalability, high availability, and superb reliability. Our guarantees will be X ≈ 1 (X ≥ 1):
If we have more than one quorum, we need to distribute the nodes correctly. Quorums of different clusters shouldn't share a single availability zone.
What other pitfalls might we encounter?
If we are putting a new system into operation, we need to understand how to monitor it properly. Every queue has limits, even if you haven't set them explicitly. The normal state of a queue is being empty. Every sent message should be processed. If a queue begins to grow, then something is wrong with the system. Therefore, the queue size is the first metric to monitor.
When your queue starts to grow, the second indicator---time---comes in handy. You can measure QoS, that is, the period from the moment a message is enqueued until it is processed completely. Another metric to consider is the processing time between the moments the consumer takes a message and releases it. The measurements will show where the problem is.
The third metric is the number of repetitions and losses (failures). It is not always possible to keep track of the size of the queue and the time. There might be a situation where ten thousand messages are sent in a single second, and you will find that your queue has started to grow. The reason for that can be unclear since the size of the queue and the time are momentary metrics. Monitoring the number of retries and losses (failures), as well as the flow of messages itself, helps to understand what is happening with the queue.
If possible, log every message you put in the queue. That will make a lot of logs, but it is better to look for messages in the records than to have them silently disappear in case of an accident.
Set up failure policies. For example, do not accept new messages in case of problems. If this isn't possible, start deleting old unimportant messages and continue receiving new ones.
There are cases where some of the messages in the queue are still alive while others are well past their deadlines. In such cases, you can first process newer messages and then get back to the old ones.
Always have a plan for failure. Your system will definitely crash. It is up to you how it will crash and how you will fix it.
If you have no idea how your system crashes and recovers, you won't be able to recover it. At least you won't do it as fast as you want.
As Confucius said, "Our greatest glory is not in never falling, but in rising every time we fall". Therefore, it is important to know how your system recovers.
What Queue to Use?
Let's summarize our options and discuss what's best in every case.
For a system tolerant of message losses (1-2 messages per one million), where services have to exchange messages, and high throughput and scalability are required, take NATS. It's simple, evolving, and can be configured for persistence. Alternatively, you can take NSQ, which has less optimal characteristics, or ZeroMQ, which doesn't provide any persistence at all.
To give queues a quick try in the cloud with microservices, pick a cloud-based SQS (Simple Queue Service) from your hosting provider (like Amazon, VK Cloud Solutions, or Yandex).
To get started with queues, try simple brokers like RabbitMQ or NATS. Be sure to keep track of reliability and your system settings.
You might need to stream messages, process them, and transfer them to a Big Data system. In this case, you might want to ensure message integrity or use a strict FIFO mechanism, so your best choice would be Apache Kafka. It is designed for this approach, and it has all the mechanisms to process messages and replay them. You can also use JetStream from NATS or Tarantool Enterprise. These systems support streaming architecture, too.
More complex queuing scenarios might involve enqueuing pending messages or setting up a complex pipeline to split and merge messages. For these cases, you have two options. The simpler one is RabbitMQ, and the more complex is Tarantool Queue. With Tarantool, you can build any arbitrary queue configuration.
You can download Tarantool on our website,
Get help in the Telegram chat.
Opinions expressed by DZone contributors are their own.