There are limits on the speedup offered by multiprocessing. Once utilization gets to 100%×cores, we can't go any faster. However, there are numerous processes that do a lot of I/O or a lot of network access; we can use Python's multiprocessing module to make more effective use of our CPU.
The easiest approach to multiprocessing is to use the shell's pipeline philosophy. Break the processing up into small steps, each of which reads from a source stream and writes to an output stream. The long-standing tradition here is to read from `sys.stdin` and write to `sys.stdout`. The multiprocessing module, however, gives us tools to achieve this with relatively little pain.
Rather than use a simple pipe, however, we need to use a multiprocessing.Queue. In shell parlance we might have `func1 | func2 | func3`.
For multiprocessing purposes, we'd have something a hair more complex looking.
q1 = Queue() q2 = Queue() p1 = Process( target=func1, kwargs=dict(output=q1)) p2 = Process( target=func2, kwargs=dict(input=q1, output=q2)) p3 = Process( target=func3, kwargs=dict(input=q2)) p1.start() p2.start() p3.start()
While wordy, it hints at a more generalized approach to have three processes passing data.
TerminationThe issue is one of termination. Most multiprocessing packages (like multiprocessing and celery) presume that your processing pipeline has a fairly long lifetime. Because of this, it presumes that you can determine that it's idle and kill it off one process at a time.
This isn't a bad assumption, and probably covers a large number of use cases.
It doesn't, however, cover the simple shell-like `func1 | func2 | func3` use case very well at all.
Because we can't easily tell when a queue is shutdown for good and all. A producing process can close a queue, but that's not a piece of information that shows up at the consumer end of the queue.
Queues are designed to be durable and have multiple produces. There's no easy way to know a Queue is no longer needed. Each producer would have to attempt to close the Queue and the Queue would have to know the intended number of producers. If processes are dynamic, then the number of producers may not have a fixed, known-in-advance limit.
The approach, therefore, is to put a sentinel object in the queue. This way, a consumer knows that production has finished. It can release resources and exit politely.
Fan-Out and Fan-InThe problem with a sentinel on a multi-producer queue is that there will be multiple sentinels, one from each producer. And, of course, with a multi-consumer queue, there must be one sentinels for each consumer.
If producers adhere to a sentinel-per-consumer rule, and consumers know to expect a sentinel-per-producer, then we can easily create dynamic multi-processing networks that startup and shutdown quickly and cleanly.
Use CaseHere's a use case. We want to do whois analysis on IP addresses in a log.
If we have a simple loop to parse the log and do a whois request on each host IP address, the processing will be slow. It uses approximately no CPU, since it spends almost all of it's time waiting for input from the log, waiting for whois, or waiting for buffers to be written in the output file.
If we make a simple three-step pipeline (parse | whois | report) then we get some improvement in elapsed time, but -- really -- the whois step is killing the throughput.
What we need is a way to run a dozen whois requests concurrently. This leads us to multiprocessing, fan-out and fan-in.
Here's what we want.
def analyze_ip( logs ): user_queue = Queue() report_queue= Queue() user_from_log= ProducerProcess( name='book_users', target=book_users, args=(logs,), output_queue=user_queue, consumers=12 ) user_from_log.start() workers=  for worker in range(12): get_details= ConsumerProducerProcess( name='user_whois', target=user_whois, kwargs=dict(LIVE=False), input_queue=user_queue, output_queue=report_queue, producers=1, consumers=1 ) get_details.start() workers.append(get_details) report= ConsumerProcess( name='final_report', target=final_report, input_queue=report_queue, producers=12 ) report.start() user_from_log.join() for w in workers: w.join() report.join()
This will do a number of concurrent whois requests, tying up lots and lots of resources and (hopefully) saturating the CPU with real work.
This shows a fan-out from one ProducerProcess to a dozen ConsumerProducerProcess instances. It shows a fan-in from the ConsumerProducerProcess to a single ConsumerProcess that writes the final report.
This is trivially scaled up (or done) by changing the number of processes in the middle.
What's important is that the actual functions involved (book_users, user_whois and final_report) are relatively trivial generator functions that consume source data (log files or queue entries) and produce results (queue entries or a report file.)
Also important is the fact that it closes down cleanly. When the input reaches end-of-file, sentinel values are put into the queues to trickle through and lead to orderly process shutdown.