Parallelism and Concurrency in Python

DZone 's Guide to

Parallelism and Concurrency in Python

In a real production environment, you have to take care of many factors. Combining parallelism and concurrency is a viable and helpful option.

· Big Data Zone ·
Free Resource

In Python, you can have multiple thread pools inside each other. In other words, if you have a pool of threads and each thread has its own pool of threads, these multiple pools can have cascading functionalities. This model can be used in many applications (i.e., web scraping).

Image title

For example, if you have a text file that contains multiple links, we need to download these links, and each file has a large size and can be downloaded in many small files, and if the file is HTML, you need to download the files of these links. The best example is web scraping. In this article, we will show an example of multiple thread pools that were downloaded from text files as links of files, and download each chunk of files in a separate thread.

The code can be found here.

In a real production environment, you have to take care of many factors. For instance, if the file is large, you will download multiple chunks of the file and you have to check if the server support downloads parts of the file.

This article shows an example of combining parallelism and concurrency.


Parallelism means that all the threads start from the same point and each thread finishes independently on each other thread.

For example, let's discuss downloading links. Say that a program has a file containing a number of file links to be downloaded. The concept of parallelism is applied by applying thread pools and downloading each file link independently.


C0ncurrency means that all the working threads don’t finish a task until all the other threads finish. In other words, all the threads start and finish at the same time.

Each thread that downloads a file has a pool of threads to download part of the file and combines these chunks in one file. So, each thread is responsible for downloading part of the file. Therefore, threads don’t finish the task till all other threads finish their tasks and accumulate the file chunks into one file.

The sub-thread pool has a number of threads equal to the number of file parts.

Workers and Tasks


shared_bytes_var = multiprocessing.Value(c_int, 0) # a ctypes var that counts the bytes already downloaded

class ThreadPool:
    """ Pool of threads consuming tasks from a queue """
    def __init__(self, num_threads):
        self.tasks = Queue(num_threads)
        for _ in range(num_threads):

    def add_task(self, func, *args, **kargs):
        """ Add a task to the queue """
        self.tasks.put((func, args, kargs))

    def map(self, func, args_list):
        """ Add a list of tasks to the queue """
        for args in args_list:
            self.add_task(func, args)

    def wait_completion(self):
        """ Wait for completion of all the tasks in the queue """


shared_bytes_var = multiprocessing.Value(c_int, 0) # a ctypes var that counts the bytes already downloaded

class Worker(Thread):
    """ Thread executing tasks from a given tasks queue """
    def __init__(self, tasks):
        self.tasks = tasks
        self.daemon = True

    def run(self):
        while True:
            func, args, kargs = self.tasks.get()
                func(*args, **kargs)
            except Exception as e:
                # An exception happened in this thread
                # Mark this task as done, whether an exception happened or not


In task file, we define the methods that will be executed by the thread pool. This file contains two methods:

  1. Download, which is executed by the first thread pool.

  2. DownloadChunk, which is executed by thread pool for each executor thread of the parent pool.

def DownloadChunk(url, path, startByte=0, endByte=None):

        Function downloads file.
        @param url: File url address.
        @param path: Destination file path.
        @param startByte: Start byte.
        @param endByte: End byte. Will work only if server supports HTTPRange headers.
        @return path: Destination file path.
    url = url.replace(' ', '%20')
    headers={'User-Agent' : "Magic Browser"}
    if endByte is not None:
        headers['Range'] = 'bytes=%d-%d' % (startByte,endByte)
    req = urllib2.Request(url, headers=headers)
        urlObj = DownloaderHelper.urlopen_with_retry(req)
    except urllib2.HTTPError, e:

        if "HTTP Error 416" in str(e):
            # HTTP 416 Error: Requested Range Not Satisfiable. Happens when we ask
            # for a range that is not available on the server. It will happen when
            # the server will try to send us a .html page that means something like
            # "you opened too many connections to our server". If this happens, we
            # will wait for the other threads to finish their connections and try again.

            #log.warning("Thread didn't got the file it was expecting. Retrying...")
            print "\n"
            print "Thread didn't got the file it was expecting. Retrying..."           
            return DownloadChunk(url, path, startByte, endByte)
            print "\n"
            print e            
            raise e

    f = open(path, 'wb')
    meta = urlObj.info()
        filesize = int(meta.getheaders("Content-Length")[0])
    except IndexError:
        print "\n"
        print"Server did not send Content-Length."   
        logging.error("Server did not send Content-Length.")

    filesize_dl = 0
    block_sz = 8192
    while True:

            buff = urlObj.read(block_sz)
        except (socket.timeout, socket.error, urllib2.HTTPError), e:
            settings.shared_bytes_var.value -= filesize_dl
            print "\n"
            print 'Retrying to download chunk file..'
            DownloadChunk(url, path, startByte, endByte)             
            raise e

        if not buff:

        filesize_dl += len(buff)
            settings.shared_bytes_var.value += len(buff)
        except AttributeError:
            print AttributeError
            print "\n"
            print 'Retrying to download chunk file..' 
            DownloadChunk(url, path, startByte, endByte)

        settings.status = r"%.2f MB / %.2f MB %s [%3.2f%%]" % (filesize_dl / 1024.0 / 1024.0,
                                                           filesize / 1024.0 / 1024.0, DownloaderHelper.progress_bar(1.0*filesize_dl/filesize,remotename if (filesize_dl * 100.0 / filesize==100) else filename),
                                                           filesize_dl * 100.0 / filesize)
        settings.status += chr(8)*(len(settings.status)+1)
        print settings.status,
    print "\n"
    return path 

def download(url):
    Function downloads file parally.
    @param url: File url address.
    @param path: Destination file path.

    @param minChunkFile: Minimum chunk file in bytes.

    @return mapObj: Only if nonBlocking is True. A multiprocessing.pool.AsyncResult object.
    @return pool: Only if nonBlocking is True. A multiprocessing.pool object.
    settings.shared_bytes_var.value = 0
    url = url.replace(' ', '%20')
    if not path:
        path = DownloaderHelper.get_rand_filename(os.environ['temp'])
        if not os.path.exists(os.path.dirname(path)):

    req = urllib2.Request(url, headers={'User-Agent' : "Magic Browser","Connection": "keep-alive"});
    urlObj = DownloaderHelper.urlopen_with_retry(req)
    meta = urlObj.info()
    filesize = int(meta.getheaders("Content-Length")[0])

    if( filesize/processes > minChunkFile) and DownloaderHelper.Is_ServerSupportHTTPRange(url):
        args1 = []
        pos = 0
        chunk = filesize/processes
        for i in range(processes):
            startByte = pos
            endByte = pos + chunk
            if endByte > filesize-1:
                endByte = filesize-1 
            args1.append([url, path+".%.3d" % i, startByte, endByte])
            tempfilelist.append(path+".%.3d" % i);
            pos += chunk+1
        args1 = [[url, path+".000", None, None]]

    #print 'Downloading... ',filename
    #Thread pool for handling download image chunk file
    pool2 = ThreadPool(settings.no_thread_per_file)
    pool2.map(lambda x: DownloadChunk(*x) , args1)
    while not pool2.tasks.all_tasks_done:
        settings.status = r"%.2f MB / %.2f MB %s [%3.2f%%]" % (settings.shared_bytes_var.value / 1024.0 / 1024.0,
                                                      filesize / 1024.0 / 1024.0, DownloaderHelper.progress_bar(1.0*settings.shared_bytes_var.value/filesize ,filename),
                                                     settings.shared_bytes_var.value * 100.0 / filesize)
        settings.status = settings.status + chr(8)*(len(settings.status)+1)
        print settings.status,

    file_parts = tempfilelist
    DownloaderHelper.combine_files(file_parts, filename)
    return 1


    print 'starting.....';
    lines = [line.rstrip('\n') for line in open('links.txt')]
    pool.map(Tasks.download, lines)
    print 'Total files downloaded',settings.download_counter
    logging.info('Total files downloaded %d',settings.download_counter)  

Now, you know about combining parallelism and concurrency in Python!

big data, concurrency, parallelism, python

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}