Over a million developers have joined DZone.

Python 201: A Multiprocessing Tutorial

How to get started using the multiprocessing module in Python, which lets you avoid the GIL and take full advantage of multiple processors on a machine.

· Web Dev Zone

Start coding today to experience the powerful engine that drives data application’s development, brought to you in partnership with Qlik.

The multiprocessing module was added to Python in version 2.6. It was originally defined in PEP 371 by Jesse Noller and Richard Oudkerk. The multiprocessing module allows you to spawn processes in much that same manner than you can spawn threads with the threading module. The idea here is that because you are now spawning processes, you can avoid the Global Interpreter Lock (GIL) and take full advantage of multiple processors on a machine.

The multiprocessing package also includes some APIs that are not in the threading module at all. For example, there is a neat Pool class that you can use to parallelize executing a function across multiple inputs. We will be looking at Pool in a later section. We will start with the multiprocessing module’s Process class.

Getting Started With Multiprocessing

The Process class is very similar to the threading module’s Thread class. Let’s try creating a series of processes that call the same function and see how that works:

import os
 
from multiprocessing import Process
 
def doubler(number):
    """
    A doubling function that can be used by a process
    """
    result = number * 2
    proc = os.getpid()
    print('{0} doubled to {1} by process id: {2}'.format(
        number, result, proc))
 
if __name__ == '__main__':
    numbers = [5, 10, 15, 20, 25]
    procs = []
 
    for index, number in enumerate(numbers):
        proc = Process(target=doubler, args=(number,))
        procs.append(proc)
        proc.start()
 
    for proc in procs:
        proc.join()

For this example, we import Process and create a doubler function. Inside the function, we double the number that was passed in. We also use Python’s os module to get the current process’s ID (or pid). This will tell us which process is calling the function. Then in the block of code at the bottom, we create a series of Processes and start them. The very last loop just calls the join() method on each process, which tells Python to wait for the process to terminate. If you need to stop a process, you can call its terminate() method.

When you run this code, you should see output that is similar to the following:

5 doubled to 10 by process id: 10468
10 doubled to 20 by process id: 10469
15 doubled to 30 by process id: 10470
20 doubled to 40 by process id: 10471
25 doubled to 50 by process id: 10472

Sometimes it’s nicer to have a more human readable name for your process though. Fortunately, the Process class does allow you to access the same of your process. Let’s take a look:

import os
 
from multiprocessing import Process, current_process
 
 
def doubler(number):
    """
    A doubling function that can be used by a process
    """
    result = number * 2
    proc_name = current_process().name
    print('{0} doubled to {1} by: {2}'.format(
        number, result, proc_name))
 
 
if __name__ == '__main__':
    numbers = [5, 10, 15, 20, 25]
    procs = []
    proc = Process(target=doubler, args=(5,))
 
    for index, number in enumerate(numbers):
        proc = Process(target=doubler, args=(number,))
        procs.append(proc)
        proc.start()
 
    proc = Process(target=doubler, name='Test', args=(2,))
    proc.start()
    procs.append(proc)
 
    for proc in procs:
        proc.join()

This time around, we import something extra: current_process. The current_process is basically the same thing as the threading module’s current_thread. We use it to grab the name of the thread that is calling our function. You will note that for the first five processes, we don’t set a name. Then for the sixth, we set the process name to “Test”. Let’s see what we get for output:

5 doubled to 10 by: Process-2
10 doubled to 20 by: Process-3
15 doubled to 30 by: Process-4
20 doubled to 40 by: Process-5
25 doubled to 50 by: Process-6
2 doubled to 4 by: Test

The output demonstrates that the multiprocessing module assigns a number to each process as a part of its name by default. Of course, when we specify a name, a number isn’t going to get added to it.

Locks

The multiprocessing module supports locks in much the same way as the threading module does. All you need to do is import **Lock**, acquire it, do something and release it. Let’s take a look:

from multiprocessing import Process, Lock
 
lock = Lock()
 
 
def printer(item):
    """
    Prints out the item that was passed in
    """
    lock.acquire()
    try:
        print(item)
    finally:
        lock.release()
 
if __name__ == '__main__':
    items = ['tango', 'foxtrot', 10]
    for item in items:
        p = Process(target=printer, args=(item,))
        p.start()

Here we create a simple printing function that prints whatever you pass to it. To prevent the threads from interfering with each other, we use a Lock object. This code will loop over our list of three items and create a process for each of them. Each process will call our function and pass it one of the items from the iterable. Because we’re using locks, the next process in line will wait for the lock to release before it can continue.

Logging

Logging processes is a little different than logging threads. The reason for this is that Python’s logging packages doesn’t use process shared locks, so it’s possible for you to end up with messages from different processes getting mixed up. Let’s try adding basic logging to the previous example. Here’s the code:

import logging
import multiprocessing
 
from multiprocessing import Process, Lock
 
lock = Lock()
 
def printer(item):
    """
    Prints out the item that was passed in
    """
    lock.acquire()
    try:
        print(item)
    finally:
        lock.release()
 
if __name__ == '__main__':
    items = ['tango', 'foxtrot', 10]
    multiprocessing.log_to_stderr()
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)
    for item in items:
        p = Process(target=printer, args=(item,))
        p.start()

The simplest way to log is to send it all to stdout. We can do this by calling the log_to_stderr() function. Then we call the get_logger function to get access to a logger and set its logging level to INFO. The rest of the code is the same. When you do this, you should get output like the following:

[INFO/Process-1] child process calling self.run()
tango
[INFO/Process-1] process shutting down
[INFO/Process-1] process exiting with exitcode 0
[INFO/Process-2] child process calling self.run()
[INFO/MainProcess] process shutting down
foxtrot
[INFO/Process-2] process shutting down
[INFO/Process-3] child process calling self.run()
[INFO/Process-2] process exiting with exitcode 0
10
[INFO/MainProcess] calling join() for process Process-3
[INFO/Process-3] process shutting down
[INFO/Process-3] process exiting with exitcode 0
[INFO/MainProcess] calling join() for process Process-2

Now if you want to save the log to disk, then it gets a little trickier. Let’s find out how by looking at the next example:

import logging
import multiprocessing
 
from multiprocessing import Process, Lock
 
lock = Lock()
 
def create_logger():
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)
 
    fh = logging.FileHandler("process.log")
 
    fmt = '%(asctime)s - %(levelname)s - %(message)s'
    formatter = logging.Formatter(fmt)
    fh.setFormatter(formatter)
 
    logger.addHandler(fh)
    return logger
 
def printer(item):
    """
    Prints out the item that was passed in
    """
    lock.acquire()
    try:
        print(item)
    finally:
        lock.release()
 
if __name__ == '__main__':
    items = ['tango', 'foxtrot', 10]
    logger = create_logger()
    for item in items:
        p = Process(target=printer, args=(item,))
        p.start()

Here we just set up a create_logger() function and put the multiprocessing module’s call to get_logger inside of it. Now that we have a logger object, we can add a FileHandler and even a Formatter, which we do. Now if you run the code, the logging data will be saved to disk and you should only see the items passed to the printer function getting printed to stdout. On my machine, the log ended up containing the following:

2016-07-28 16:23:28,877 - INFO - child process calling self.run()
2016-07-28 16:23:28,878 - INFO - process shutting down
2016-07-28 16:23:28,878 - INFO - process exiting with exitcode 0
2016-07-28 16:23:28,878 - INFO - child process calling self.run()
2016-07-28 16:23:28,878 - INFO - process shutting down
2016-07-28 16:23:28,878 - INFO - process exiting with exitcode 0
2016-07-28 16:23:28,879 - INFO - process shutting down
2016-07-28 16:23:28,879 - INFO - child process calling self.run()
2016-07-28 16:23:28,879 - INFO - calling join() for process Process-3
2016-07-28 16:23:28,879 - INFO - process shutting down
2016-07-28 16:23:28,879 - INFO - process exiting with exitcode 0
2016-07-28 16:23:28,880 - INFO - calling join() for process Process-2

Your log file will be different as each time I ran this code, the order of process execution would change slightly.

The Pool Class

The Pool class is used to represent a pool of worker processes. It has methods which can allow you to offload tasks to the worker processes. Let’s look at a really simple example:

from multiprocessing import Pool
 
def doubler(number):
    return number * 2
 
if __name__ == '__main__':
    numbers = [5, 10, 20]
    pool = Pool(processes=3)
    print(pool.map(doubler, numbers))

Basically what’s happening here is that we create an instance of Pool and tell it to create three worker processes. Then we use the map method to map a function and an iterable to each process. Finally we print the result, which in this case is actually a list: [10, 20, 40].

You can also get the result of your process in a pool by using the apply_async method:

from multiprocessing import Pool
 
def doubler(number):
    return number * 2
 
if __name__ == '__main__':
    pool = Pool(processes=3)
    result = pool.apply_async(doubler, (25,))
    print(result.get(timeout=1))

What this allows us to do is actually ask for the result of the process. That is what the get function is all about. It tries to get our result. You will note that we also have a timeout set just in case something happened to the function we were calling. We don’t want it to block indefinitely after all.

Process Communication

When it comes to communicating between processes, the multiprocessing modules has two primary methods: Queues and Pipes. The Queue implementation is actually both thread and process safe. Let’s take a look at a fairly simple example that’s based on the Queue code from one of my threading articles:

from multiprocessing import Process, Queue
 
sentinel = -1
 
def creator(data, q):
    """
    Creates data to be consumed and waits for the consumer
    to finish processing
    """
    print('Creating data and putting it on the queue')
    for item in data:
 
        q.put(item)
 
 
def my_consumer(q):
    """
    Consumes some data and works on it
 
    In this case, all it does is double the input
    """
    while True:
        data = q.get()
        print('data found to be processed: {}'.format(data))
        processed = data * 2
        print(processed)
 
        if data is sentinel:
            break
 
 
if __name__ == '__main__':
    q = Queue()
    data = [5, 10, 13, -1]
    process_one = Process(target=creator, args=(data, q))
    process_two = Process(target=my_consumer, args=(q,))
    process_one.start()
    process_two.start()
 
    q.close()
    q.join_thread()
 
    process_one.join()
    process_two.join()

Here we just need to import Queue and Process. Then we two functions, one to create data and add it to the queue and the second to consume the data and process it. Adding data to the Queue is done by using the Queue’s put() method whereas getting data from the Queue is done via the get method. The last chunk of code just creates the Queue object and a couple of Processes and then runs them. You will note that we call join() on our process objects rather than the Queue itself.

Wrapping Up

We have a lot of material here. You have learned how to use the multiprocessing module to target regular functions, communicate between processes using Queues, naming threads and much more. There is also a lot more in the Python documentation that isn’t even touched in this article, so be sure to dive into that as well. In the meantime, you now know how to utilize all your computer’s processing power with Python!

Create data driven applications in Qlik’s free and easy to use coding environment, brought to you in partnership with Qlik.

Topics:
python ,multiprocessing

Published at DZone with permission of Mike Driscoll, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

The best of DZone straight to your inbox.

SEE AN EXAMPLE
Please provide a valid email address.

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.
Subscribe

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}