Gevent, ZeroMQ, WebSockets, and Flot FTW!
Join the DZone community and get the full member experience.
Join For FreeAs part of the work I've been doing on Zarkov I've had the opportunity to play around with a lot of cool technologies, among which are gevent, ZeroMQ, WebSockets, and Flot. It took a while to get to the point where I could actually get things done, but once I was there, things were incredibly simple. In this post I'll show you how to use these three techologies together to build a simple web app with realtime server push data.
Prerequisites
Before you can run the example program, you'll need to install the following Python libraries:
- pyzmq
- gevent
- gevent-zeromq
- gevent-websocket
- paste (for a simple WSGI http static file server)
The easiest way to do this is to:
pip install pyzmq gevent gevent_zeromq paste
You may see messages about missing system libraries; you'll need to have libzmq-dev 2.1+ and libevent-dev installed to build the Python packages.
You'll also need to download Flot JQuery plugin and untar it in your project directory.
Greenlets
One of the things you'll need to understand before running the code is the concept of green threads or greenlets. These behave just like threads except for two caveats:
- they never run concurrently with one another
- they only yield to one another at specified points
What this ends up meaning is that they're very lightweight to
create compared to threads and you can actually be a little sloppier
with shared data between greenlets since you can deterministically know
when a greenlet might yield to another greenlet. The gevent library
provides our greenlet support, and makes greenlets even more useful by
turning blocking system calls into yield points so that another greenlet
can run while one is waiting on data.
ZeroMQ
ZeroMQ sounds like it's a message queue like RabbitMQ, but it's really not. ZeroMQ is a library that provides many message queue-like features, but requires no broker and very little setup. We'll be using ZeroMQ pub/sub sockets to manage communication in this demo. Pub/sub sockets have the nice feature that published messages get distributed to all subscribers currently connected, or if no subscribers are connected, the message simply gets dropped.
ZeroMQ was designed to work with threads rather than greenlets, so we'll also need to use the excellent gevent_zeromq library to "greenletize" ZeroMQ. For this example, we'll write a loop that pushes (x,y) point values out on a zmq.PUB socket:
def zmq_producer(context): '''Produce a nice time series sine wave''' socket = context.socket(zmq.PUB) socket.connect('tcp://127.0.0.1:5000') while True: x = time.time() y = 2.5 * (1 + math.sin(x / 500)) socket.send(json.dumps(dict(x=x, y=y))) gevent.sleep(0.1)
The context being passed in is created in our main() function with the call zmq.Context(). The context is just a place for ZeroMQ to store some global state; you generally only create one per application.
OK, now that we have our producer, let's look at the ZeroMQ server. What we'll do here is just relay messages received on an incoming tcp zmq.SUB socket and publish them on an outgong inproc zmq.PUB socket:
def zmq_server(context): '''Funnel messages coming from the external tcp socket to an inproc socket''' sock_incoming = context.socket(zmq.SUB) sock_outgoing = context.socket(zmq.PUB) sock_incoming.bind('tcp://*:5000') sock_outgoing.bind('inproc://queue') sock_incoming.setsockopt(zmq.SUBSCRIBE, "") while True: msg = sock_incoming.recv() sock_outgoing.send(msg)
The only thing to note here is that we have to tell ZeroMQ that we want to subscribe to all messages on the zmq.SUB socket.
Gevent WSGI Servers
Gevent provides a fast implementation of the WSGI standard in the gevent.pywsgi module. We'll be using two instances of the server (greenlets are lightweight) in this demo. One will simply serve up static files, while the other one will provide our WebSocket connectivity. Here's the code to set up these two servers:
# websocket server: copies inproc zmq messages to websocket ws_server = gevent.pywsgi.WSGIServer( ('', 9999), WebSocketApp(context), handler_class=WebSocketHandler) # http server: serves up static files http_server = gevent.pywsgi.WSGIServer( ('', 8000), paste.urlparser.StaticURLParser(os.path.dirname(__file__)))
Pretty simple; just specify the bind address for the server, the WSGI
app to call, and in the case of the WebSocket server, the WebSocket
handler class. The http_server above is mostly
uninteresting, as it just serves up static files via paste. The
ws_server has a (slightly) more interesting implementation:
class WebSocketApp(object): '''Funnel messages coming from an inproc zmq socket to the websocket''' def __init__(self, context): self.context = context def __call__(self, environ, start_response): ws = environ['wsgi.websocket'] sock = self.context.socket(zmq.SUB)/span> sock.setsockopt(zmq.SUBSCRIBE, "") sock.connect('inproc://queue') while True: msg = sock.recv() ws.send(msg)
What's happening here is that when we get a connection to the websocket address (port 9999 in our example), we do the following:
- Subscribe to the inproc socket that our zmq_server is publishing messages to
- Grab the websocket from the environ
- Relay messages from the zmq socket to the websocket
Client-side
So the server-side stuff is straightforward; what about the client-side? Well, it's pretty easy as well. Here's the (static) HTML page we'll use:
<html> <head> <title>ZWS Example</title> <script type="text/javascript" src="/flot/jquery.min.js"></script> <script type="text/javascript" src="/flot/jquery.flot.min.js"></script> <script type="text/javascript" src="graph.js"></script> </head> <body> <h1>ZMQ - WebSocket Example</h1> <div id="conn_status">Not Connected</div> <div id="placeholder" style="width:600px;height:300px;"></div> </body> </html>
Here, all we're doing is pulling in the JQuery and Flot libraries as
well as our custom graph.js and setting up a couple of placeholders. The
Javascript is also pretty straightforward. I've tried to provide inline
commentary:
$(function() { // Open up a connection to our server var ws = new WebSocket("ws://localhost:9999/"); // Save our plot placeholder var $placeholder = $('#placeholder'); // Maximum # of data points to plot var datalen = 100; // This will be the plot object var plot = null; // Set up some options on our data series var series = { label: "Value", lines: { show: true, fill: true }, points: { show:true }, data: [] }; // What do we do when we get a message? ws.onmessage = function(evt) { var d = $.parseJSON(evt.data); series.data.push([d.x, d.y]); // Keep the data series a manageable length while (series.data.length > datalen) { series.data.shift(); } if(plot) { // Create the plot if it's not there already plot.setData([series]); plot.setupGrid(); plot.draw(); } else if(series.data.length > 10) { // Update the plot plot = $.plot($placeholder, [series], { xaxis:{ mode: "time", timeformat: "%H:%M:%S", minTickSize: [2, "second"], }, yaxis: { min: 0, max: 5 } }); plot.draw(); } } // Just update our conn_status field with the connection status ws.onopen = function(evt) { $('#conn_status').html('<b>Connected</b>'); } ws.onerror = function(evt) { $('#conn_status').html('<b>Error</b>'); } ws.onclose = function(evt) { $('#conn_status').html('<b>Closed</b>'); } });
Putting it all together
To put everything together, here's the main function I used:
def main(): '''Set up zmq context and greenlets for all the servers, then launch the web browser and run the data producer''' context = zmq.Context() # zeromq: tcp to inproc gateway gevent.spawn(zmq_server, context) # websocket server: copies inproc zmq messages to websocket ws_server = gevent.pywsgi.WSGIServer( ('', 9999), WebSocketApp(context), handler_class=WebSocketHandler) # http server: serves up static files http_server = gevent.pywsgi.WSGIServer( ('', 8000), paste.urlparser.StaticURLParser(os.path.dirname(__file__))) # Start the server greenlets http_server.start() ws_server.start() # Open a couple of webbrowsers webbrowser.open('http://localhost:8000/graph.html') webbrowser.open('http://localhost:8000/graph.html') # Kick off the producer zmq_producer(context)
For fun, I threw a couple of webbrowser calls at the end so you can
see the data getting distributed to all the clients that connect to our
server. If you'd like to see the full program including flot et. al.,
here's a download. Hope you enjoy playing around with gevent, ZeroMQ, and WebSockets as much as I did!
Source: http://blog.pythonisito.com/2011/07/gevent-zeromq-websockets-and-flot-ftw.html
Opinions expressed by DZone contributors are their own.
Comments