Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Asyncio Tarantool Queue: Get in the Queue

DZone's Guide to

Asyncio Tarantool Queue: Get in the Queue

Learn about using Tarantool Queues to solve certain tasks. Specifically, learn about working with Tarantool Queue in Python with asyncio.

Free Resource

Discover Tarantool's unique features which include powerful stored procedures, SQL support, smart cache, and the speed of 1 million ACID transactions on a single CPU core!

Image title

In this article, I’m going to pay specific attention to information processing using Tarantool queues. I’ll show the way that we use them to solve certain tasks and I will specifically describe our work with Tarantool Queue in Python with asyncio.

The Task of Notifying an Entire User Base

Mail.Ru runs several media sites, like News, Auto, Lady, Health, Hi-Tech, etc. Each one of them is visited by millions of users daily. The sites are adapted for mobile devices and the majority of them offer a touchscreen version. For the convenience of our users, we’ve created the News mobile application, which is quite popular on Android and iOS devices. Basically, when a “hot” piece of news is published, the user receives a push message. It goes something like this: the chief editor chooses a piece of news, presses “fire” on a management platform, and that’s it — go! But what next? We need to send out the news as quickly as possible to all subscribers.

If someone receives the push message 30 minutes later, the news might not be that hot anymore, and the user might have heard it from a different source. That’s no good for us.

Our database is stored in Tarantool. We need to query it as quickly as possible and send push messages to all subscribers. For each of them, there is a push token and some device info (app version, screen resolution, time zone, the best time for the user to receive a message) stored in the database. It’s particularly important to specify the time zone since it’s not a good idea to send out push messages at night when everyone is asleep.

The requirements are clear, let’s move on.

Solving the Task

For any task, we always start with a straightforward approach. The pseudocode is nice and clear:

while «There are users»:
    Choose «batch» of users
    Send a push message to every user from the «batch»

The main while loop will run until all the users have been notified. If the database isn’t that big, there is nothing else we need to do. Problem solved. For a big database, however, this simple solution will produce unacceptable lags in message delivery. So, what can be improved here? How can we make this loop faster? How can we make the send-out time constant regardless of the size of the database? To answer these questions, we will need to specify the details of the message-sending process.

The solution to the queue problem is kind of obvious: segregate the processes of picking users from the database and dispatching the messages by the platforms. But the devil is in the details. In order to send the messages to two platforms in two different ways, we can divide the users from a given “batch” into iOS and Android, group the users, and add the dispatch message to the required queue. Then the messages can be processed; in other words, sent out. Schematically, all of these processes look like this:

Image title

Going through the user database and processing messages via a queue.

What’s good about this approach? We separate the process of going through the user database from that of sending push messages. Therefore, we’ll make the “batch” sorting ( select_range ) go faster in our initial cycle. If later on, during message processing, we encounter problems with one of the platforms (which happens frequently), it won’t affect the other platform. So, we can easily parallel the message processing jobs between server cores since now we have logical queues. If we need to expand our system a bit, we’ll simply add new logical queues.

Solving Load and Scaling Problems

If we increase the load on one server, we’ll soon run out of CPU. So should we add one more server? Yes, a completely identical one. But it’s better if we architect the service. That way, if we can make the system work on two servers, we’ll be able to easily add a couple dozen more. That’s our principle: two servers minimum even if there is no real load. Multiple servers would also enhance the reliability of our service. The service architecture, in this case, would look like this:

Image title

Working with the user database on two servers

So we have two servers with their own queues (of course, there is also the user database; let’s assume it exists somewhere close by, available for select_range running, but we won’t pay attention to this detail). It’s very important to run the “go through” cycle simultaneously on both servers. We could iterate our cycle on one of the servers, choose batches, put each batch in different queues, and evenly distribute the “batches” among servers. Using this approach, we would have to “move” the data around the network. Choosing a “batch” and putting it in a queue on a different server is the weak point of this approach. Instead, we need to run select_range in parallel across servers.

In order to do that, we have to pick a “batch” from one of the servers and add a small message with the user id from the current “batch.” After processing this small message on the second server, we get a new “batch” starting with the specified id, then we create an identical message for the “neighboring server” etc. until we are finished with the whole database. The current “batch” must always be processed locally in its own queue. Thus, we’ll move the code to our data, split the “batch” building among the servers, and won’t move the data around the network.

The sequence diagram would look like this:

Image title

The “for all the users” cycle is performed via queue.put(last_id). The dispatch process will end when select_range runs out of users. It’s crucial that the sending does not block the database. This is similar to the MapReduce process in Hadoop — it’s the same “divide and conquer” principle.

The same architecture is used in our production environment. For every type of mobile device and platform, individual logical queues are used, which allows for independent parallel processing and great performance. It takes about two minutes to send the news push messages to all of the users in our database of two million users. Simultaneously, the eight-server cluster sends about 10,000 push messages per second.

Specifics of Writing Code for Tarantool Queue

How do we work with a large number of logical queues? How do we simultaneously handle and create data for all the queues in one Python process? Asynchronous programming techniques to the rescue. In the following examples, I’m using Centos 6.4, Python 3, asyncio, aiotarantool_queue, Tarantool 1.6 and Tarantool Queue.

Tarantool Queue can handle quite a heavy load. Several logical queues can be created via queue.create_tube  calls in one Tarantool Queue instance (logical queues are called “tubes”). And several types of logical queues are supported. To consume a task from the queue, there is a “take/ack” mechanism, whereby “take” marks the task as “in process” and “ack” deletes the task from the queue, thus confirming its successful completion. If it doesn’t reach the “ack” command, another process can “pick up” the task and complete “take.” Task completion can be delayed using the delay parameter. Few queue implementations have functionality and performance similar to this.

So we install Tarantool, putting github.com/tarantool/queue in /usr/local/lua. In
the /etc/tarantool/instances.enabled/q1.lua Tarantool configuration script, we specify:

#!/usr/bin/env tarantool
package.path = package.path .. ';/usr/local/lua/tnt-queue/?.lua' box.cfg{listen = 3301, slab_alloc_arena = 2}
queue = require 'queue'
queue.start()
box.queue = queue

Let’s start our instance of the queue:

$ tarantoolctl start q1

Enter the console:

$ tarantoolctl enter q1

/usr/bin/tarantoolctl: Connecting to /var/run/tarantool/q1.control /usr/bin/tarantoolctl: connected to ... /run/tarantool/q1.control unix/:/var/run/tarantool/q1.control

Allow guest access and create the q1 logical queue:

> box.schema.user.grant('guest','read,write,execute','universe') > queue.create_tube('q1', 'fifo')
^D

One queue can be drained like this:

queue = Tarantool.Queue(host="localhost", port=3301)
while True:
    task = queue.take(tube="q1")
    process(task)
    task.ack()

To drain N queues, we create N processes. In each process, we have to connect to the proper queue and start up the exact same cycle. This approach does work, but in the case of multiple queues, there will be multiple connections to Tarantool Queue. There will also be many running processes taking up system resources. Having so many connections makes Tarantool less effective. Finally, we’ll need connections for Google and Apple servers. And once again, the fewer connections that we have, the smaller the load and the more available server resources there will be.

In my next article, I’ll explain why using just one Tarantool connection can significantly increase performance (which is very important for our use case). The same approach can be used here. We can modify our initial pseudocode to work with two queues and then adapt it for asyncio.

import asyncio
import aiotarantool_queue

@asyncio.coroutine
def worker(tube):
    while True:
        task = yield from tube.take(.5)
        if not task:
            break
        # process(task.data)
        yield from task.ack()

loop = asyncio.get_event_loop()
queue = aiotarantool_queue.Queue("127.0.0.1", 3301, loop=loop)
workers = [asyncio.async(worker(tube), loop=loop)
        for tube in (queue.tube('q1'), queue.tube('q2'), queue.tube('q3'))]
loop.run_until_complete(asyncio.wait(workers))
loop.run_until_complete(queue.close())
loop.close()

In this process, we create a connection to the queue. We also create coroutines with a “take/ack” cycle for all of the logical queues. We start the event loop and sort out our queues. And that is our queue work pattern.

I’d like to mention that the code remains linear, there are no callbacks. Under the hood of this code, there is a hidden feature: the tasks from the queue are read in “batches” — it’s all provided by aiotarantool_queue out of the box. And no waiting, queue pulling, or timeouts! Of course, you’re going to have to create several processes like this to distribute a load across all CPU cores, but that wouldn’t be a big deal. Queue handling in Python processes would look about the same. There would be processes instead of coroutines. But the synchronous approach could make this code even more confusing and what’s more important — less effective.

But there are some cons to asyncio. Our Python program may have to use other libraries that are not ready for an asynchronous environment. While it’s not hard to do, closely studying a library’s code and adapting it to use asyncio calls instead of blocking calls may be necessary. But if you need an effective service, putting the effort in and adapting the work of external libraries will pay off.

But What About Redis and RabbitMQ?

Why do we use Tarantool Queue and not Redis or RabbitMQ? It wasn’t easy to decide on a product: we considered both Redis and RabbitMQ. We even had a prototype on Redis. All of these solutions have decent efficiency. But it’s not just about “which one is faster”...

First of all, we need the queue to be reliable and not to be located entirely in memory. Tarantool with WAL looks more reliable than Redis and RabbitMQ.

Each queue system has its own specifics. Redis offers a “pub/sub” mechanism but that doesn’t suit our needs — we need an actual queue. There are “rpush/blpop” lists and operations with blocking and new data waiting in Redis but there is no “take/ack” mechanism. This is the mechanism that provides reliability for us in production — it’s been proving itself useful continuously.

RabbitMQ offers lots of different queue templates. We would only need a piece of RabbitMQ functionality to solve our task. Its performance is really high; however, if we started saving data to disk, performance would decline significantly. Yet another point is that we would need experienced system administrators to run RabbitMQ. They would need to be able to diagnose production issues and not just simply restart the RabbitMQ instance.

I’d like to pay special attention to the RabbitMQ Python API and its connector for asyncio. The API for queues is implemented with callbacks. The callback code can get complicated and thus can become hard to support. To create message.ack in asyncio, we would have to create a Future and wait for it. The code looks too complex. We also failed to send several “put/take” calls in one connection.

Redis does much better with asyncio: there is a great connector made by the asyncio creator that works really fast.

To Sum Up

We talked about the architecture that allows parallelizing work on a user database with the help of queue systems on several servers. We reviewed the patterns of Tarantool Queue and asyncio usage. We paid some attention to the issues with code development and queue systems. We took a look at RabbitMQ and Redis problems and also the benefits of Tarantool Queue.

I hope that readers will find this information useful. Feel free to share your experiences in dealing with queues and tell us why you chose one solution or another.

References

Discover Tarantool's unique features such as powerful stored procedures, SQL support, smart cache, and the speed of 1 million ACID transactions on a single CPU.

Topics:
database ,tarantool ,asyncio ,tutorial ,queues ,scaling

Published at DZone with permission of Dmitriy Kalugin-Balashov. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}