In a previous post, I provided an introduction to ZeroMQ. Continuing along with ZeroMQ, today I'd like to take a look at how you manage various "socket options" in ZeroMQ, particularly when it comes to flow control. If you've never used ZeroMQ, I recommend reading my previous post first. Once you're caught up, let's get started...
ZeroMQ "fire and forget"
One of the awesome things you may have noticed about ZeroMQ is that you can send a message without waiting for it to be received. In fact, the endpoint that's going to receive the message doesn't even need to be connected, and your application will happily act as if the message is winging its way along. While this is great for easily getting up to speed with ZeroMQ, there are some things you need to be aware of.
ZeroMQ is not magic; merely a close approximation
When you send a message in on a ZeroMQ socket, internally ZeroMQ is storing that message in an in-memory queue. So long as you don't send messages faster than whatever's downstream can read them, all is well. The problem comes when your downstream end can't process them fast enough.
Let's consider the following sender, which will send a short message as quickly as possible:
import sys import time import zmq context = zmq.Context() sock = context.socket(zmq.PUSH) sock.connect(sys.argv) while True: sock.send(sys.argv + ':' + time.ctime())
Now if we just run this without having a corresponding 'puller' socket draining the queue, we'll eat up memory as the 'pusher' just keeps enqueueing messages. On my laptop, for instance, the python process reached 3GB of virtual memory size in two minutes. Of course, in a real system you'll have a pulling socket, but in many cases your 'pulling' socket may not be able to pull has fast as your pusher can push.
High water mark to the rescue
To handle situations like this, ZeroMQ provides a socket option called a high water mark, accessed as zmq.HWM. This tells us how many messages we want ZeroMQ to buffer in RAM before blocking the 'pushing' socket. To set the high water mark, we just need to use the .setsockopt method:
... sock = context.socket(zmq.PUSH) sock.setsockopt(zmq.HWM, 1000) sock.connect(sys.argv) while True: sock.send(sys.argv + ':' + time.ctime())
The modified pusher will send 1000 messages and then block, using a maximum of 2.3MB on my laptop. Note that the high water mark must be set before you connect to any clients, as ZeroMQ uses a queue-per-client, and fixes the queue size on connect.
Queueing messages on disk
There is one case in which a sending socket may exceed its high water mark. When you set the zmq.SWAP option on a socket, ZeroMQ will use a local swapfile to store messages that exceed the high water mark. In order to set up an 200KB on-disk swap file, for instance, we could use the following code:
... sock = context.socket(zmq.PUSH) sock.setsockopt(zmq.HWM, 1000) sock.setsockopt(zmq.SWAP, 200*2**10) sock.connect(sys.argv) while True: sock.send(sys.argv + ':' + time.ctime())
ZeroMQ is designed to deliver messages as reliably as possible by default. One way it does this is by allowing outgoing messages to 'linger' in their queues even when the socket that sent them has been closed. For instance, suppose we have the following single-message pusher:
import sys import time import zmq context = zmq.Context() sock = context.socket(zmq.PUSH) sock.connect(sys.argv) sock.send(sys.argv + ':' + time.ctime()) print 'Exiting...'
Now if we run this without a corresponding "pull" socket, our program will simply sit there saying it's exiting, but never really exiting. This is because by default, the ZeroMQ communication thread will hang around until all its outgoing messages have been sent even if the socket is closed. To modify this behavior, we can set the zmq.LINGER on the socket, setting a maximum amount of time in milliseconds that the thread will try to send messages after its socket has been closed (the default value of -1 means to linger forever):
... sock = context.socket(zmq.PUSH) sock.setsockopt(zmq.LINGER, 1000) sock.connect(sys.argv) sock.send(sys.argv + ':' + time.ctime()) print 'Exiting...'
In the previous article, we already saw the zmq.SUBSCRIBE and zmq.UNSUBSCRIBE options. There are also a number of other socket options available for use with setsockopt. Several of these (zmq.RATE, zmq.RECOVERY_IVL,zmq.RECOVERY_IVL_MSEC, zmq.MCAST_LOOP, zmq.RECONNECT_IVL, and zmq.RECONNECT_IVL_MAX) have to do with multicast sockets (zmq.PGM and zmq.EPGM), so I won't go into them here. Others (zmq.SNDBUF, zmq.RCVBUF, and zmq.BACKLOG) have to do with the underlying OS sockets.
There is one option that's potentially a bit more interesting: zmq.IDENTITY. From the ZeroMQ docs on setsockopt:
If the socket has no identity, each run of an application is completely separate from other runs. However, with identity set the socket shall re-use any existing ØMQ infrastructure configured by the previous run(s). Thus the application may receive messages that were sent in the meantime, message queue limits shall be shared with previous run(s) and so on.
So if you create a socket and set its identity, it will pick up all the other settings you've set on it before. Beware of overusing this setting, however, since it's may be removed soon.
In the setsockopt method, ZeroMQ provides a way to control ZeroMQ sockets at a lower level than the "fire-and-forget" model at the higher level. Particularly if you're building a "pipeline" style application, you need to be aware of its flow control features. (I have particular experience with errors caused by not understanding the zmq.HWM option, for example.)
There is, of course, still more to cover here. In particular, I'll cover the use of devices to construct more elaborate network topologies and the use of ZeroMQ with gevent in future articles. I'd also love to hear about other topics you'd like to read about, so let me know in the comments below!