Using ZeroMQ Devices to Support Complex Network Topologies
And if you want some background on Gevent, you might want to check out that series at the following links:
- Introduction to Gevent
- Gevent, Threads, and Benchmarks
- Gevent and Greenlets
- Greening the Python Standard Library with Gevent
- Building TCP Servers with Gevent
- Building Web Applications with Gevent's WSGI Server
ZeroMQ "devices"One of the nice aspects of ZeroMQ is that it decouples the communication pattern with the connection pattern of your endpoints. Without ZeroMQ, you'd commonly have a "server" socket which binds to a port and receives requests, while "clients" connect to that socket and send requests. With ZeroMQ, it's perfectly acceptable to have the "server" connect to the "client" to receive requests (and in fact you can have multiple "servers" connected to the same "client" socket).
While this is handy, in some cases you may want to have both client and server relatively dynamic, with both connecting and neitherbinding to a particular port. For this use case, ZeroMQ provides so-called "devices" that bind a couple of sockets and perform forwarding operations between them to support common communication patterns. The "client" and "server" then both connect to the device. In this section, I'll cover the devices provided by the ZeroMQ library.
The Queue device is responsible for mediating the REQ/REPcommunication pattern. Suppose we have the following request code:
import sys import zmq context = zmq.Context() for x in xrange(10): sock = context.socket(zmq.REQ) sock.connect(sys.argv) print 'REQ is', x, sock.send(str(x))and the following response code:
import sys import zmq context = zmq.Context() while True: sock = context.socket(zmq.REP) sock.connect(sys.argv) x = sock.recv() print 'REQ is', x, reply = 'x-%s' % x sock.send(reply)
To set up a broker that forwards between the two, you need a tiny script:
import sys import zmq context = zmq.Context() s1 = context.socket(zmq.ROUTER) s2 = context.socket(zmq.DEALER) s1.bind(sys.argv) s2.bind(sys.argv)Note that we've used a couple of new socket types above:zmq.ROUTER and zmq.DEALER. These are similar to zmq.REP and zmq.REQ, respectively, but they allow us to break the strict "request-response" lockstep of REQ/REP. The way they work is the following:
- The ROUTER socket receives a message on a particular connection. It adds a message ID to the beginning of the message that identifies the sending REQ socket.
- The FORWARDER device sends the message received on theROUTER to the DEALER socket.
- The DEALER picks a connection to send the message to, stripping the prefix but noting the message ID and linking it to the REP socket handling the message.
- The DEALER receives the response message, pairs it up with the message ID it's responding to, and adds the message ID to the beginning of the mssage.
- The FORWARDER device sends the message received on theDEALER to the ROUTER socket.
- The ROUTER socket strips the message ID and sends the message to the REQ socket that sent the initial request.
By using message IDs as above, we can have multiple messages 'in flight', being handled by various servers. If you'd rather not use the built-in ZeroMQ device, you can build your own fairly simply. The code below shows a device that that uses a pair of threads to relay messages from one socket to another:
import sys import zmq import time context = zmq.Context() s1 = context.socket(zmq.ROUTER) s2 = context.socket(zmq.DEALER) s1.bind(sys.argv) s2.bind(sys.argv) def zeromq_relay(a, b): '''Copy data from zeromq socket a to zeromq socket b''' while True: msg = a.recv() more = a.getsockopt(zmq.RCVMORE) if more: b.send(msg, zmq.SNDMORE) else: b.send(msg) def zmq_queue_device(s1, s2): import threading t1 = threading.Thread(target=zeromq_relay, args=(s1,s2)) t2 = threading.Thread(target=zeromq_relay, args=(s2,s1)) t1.daemon = t2.daemon = True t1.start() t2.start() while True: time.sleep(10)You can also build a device using nonblocking IO, but that's a bit beyond the scope of what I want to cover here.
In the same way the QUEUE device mediates the REQ/REP pattern, theFORWARDER device mediates the PUB/SUB pattern. Since PUB/SUB doesn't require the lockstep operation of REQ/REP, we can use the regularPUB/SUB sockets in our device:
import sys import zmq context = zmq.Context() s1 = context.socket(zmq.SUB) s2 = context.socket(zmq.PUB) s1.bind(sys.argv) s2.bind(sys.argv) s1.setsockopt(zmq.SUBSCRIBE, '')
Now we can connect one or more publishers to our device's "upstream" port (sys.argv), and one or more subscribers to the device's "downstream" port (sys.argv) to provide a PUB/SUBbroker. Note in particular that we had to subscribe to all mesages in the device code since we don't know which messages our downstream sockets are interested in.
If you'd rather filter messages that get fowarded, you can subscribe to some subset of messages. Unfortunately, I'm not aware of any way to forward only messages that the downstream clients have subscribed to using built-in ZeroMQ functionality.
If we want to write our own FORWARDER device, it's even simpler than the QUEUE device since it only handles unidirectional communication. Assuming we have the zeromq_relay function as defined above, ourFORWARDER device is just the following:
def zmq_forwarder_device(upstream, downstream):
Similar to the FORWARDER device, the STREAMER device just sends upstream packets downstream, but in this case in support of thePUSH/PULL pattern rather than PUB/SUB. To make a STREAMER broker, then, we just need the following code:
import sys import zmq context = zmq.Context() s1 = context.socket(zmq.PULL) s2 = context.socket(zmq.PUSH) s1.bind(sys.argv) s2.bind(sys.argv)And once again, if we wanted to create the device manually, it's just a relay:
def zmq_streaming_device(upstream, downstream):
Integration with geventYou may have noticed that the gevent device function doesn't return. If you want to create multiple devices within your Python program, then, you'll need to wrap the devices in threads.
Another approach that you might use if you, like me, prefer the lightweight threading and asynchronous I/O of gevent, is to use thegevent-zeromq package available from PyPI:
$ pip install gevent-zeromq
Now we can use a 'green' version of ZeroMQ by just importing from the gevent-zeromq wrapper in our scripts. A "pusher" script would then look like this:
import sys import time from gevent_zeromq import zmq context = zmq.Context.instance() sock = context.socket(zmq.PUSH) sock.connect(sys.argv) while True: time.sleep(1)Simple, right? The reason I throw this in this seemingly-unrelated post is that if you want to use gevent, you can't use the built-in devices. This is because the built-in devices block, and they block in the ZeroMQ C library, not in Python where they could be "greened". So if you want a device with gevent, you'll have to write your own (which as, after all, pretty straightforward).
ConclusionZeroMQ devices provide a handy way to stitch together complex routing topologies and allow you to decouple the various components of your architecture. Though the built-in devices are quite simple, they provide insight into how you could build more complex devices yourself to fulfill the role of "broker" in a distributed architecture.
I'd be interested in hearing from you how you are using devices (or whether you find them useful at all) in ZeroMQ programming. Do you use the built-in devices? Any devices you write yourself? Do you prefer the multithreaded device approach I've used here, or using the zmq.Poller object? Let me know in the comments below!