Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Parallelism and Concurrency in Python

DZone's Guide to

Parallelism and Concurrency in Python

In a real production environment, you have to take care of many factors. Combining parallelism and concurrency is a viable and helpful option.

· Big Data Zone ·
Free Resource

Hortonworks Sandbox for HDP and HDF is your chance to get started on learning, developing, testing and trying out new features. Each download comes preconfigured with interactive tutorials, sample data and developments from the Apache community.

In Python, you can have multiple thread pools inside each other. In other words, if you have a pool of threads and each thread has its own pool of threads, these multiple pools can have cascading functionalities. This model can be used in many applications (i.e., web scraping).

Image title

For example, if you have a text file that contains multiple links, we need to download these links, and each file has a large size and can be downloaded in many small files, and if the file is HTML, you need to download the files of these links. The best example is web scraping. In this article, we will show an example of multiple thread pools that were downloaded from text files as links of files, and download each chunk of files in a separate thread.

The code can be found here.

In a real production environment, you have to take care of many factors. For instance, if the file is large, you will download multiple chunks of the file and you have to check if the server support downloads parts of the file.

This article shows an example of combining parallelism and concurrency.

Parallelism

Parallelism means that all the threads start from the same point and each thread finishes independently on each other thread.

For example, let's discuss downloading links. Say that a program has a file containing a number of file links to be downloaded. The concept of parallelism is applied by applying thread pools and downloading each file link independently.

Concurrency

C0ncurrency means that all the working threads don’t finish a task until all the other threads finish. In other words, all the threads start and finish at the same time.

Each thread that downloads a file has a pool of threads to download part of the file and combines these chunks in one file. So, each thread is responsible for downloading part of the file. Therefore, threads don’t finish the task till all other threads finish their tasks and accumulate the file chunks into one file.

The sub-thread pool has a number of threads equal to the number of file parts.

Workers and Tasks

TheadPool:

shared_bytes_var = multiprocessing.Value(c_int, 0) # a ctypes var that counts the bytes already downloaded

class ThreadPool:
    """ Pool of threads consuming tasks from a queue """
    def __init__(self, num_threads):
        self.tasks = Queue(num_threads)
        for _ in range(num_threads):
            Worker(self.tasks)

    def add_task(self, func, *args, **kargs):
        """ Add a task to the queue """
        self.tasks.put((func, args, kargs))

    def map(self, func, args_list):
        """ Add a list of tasks to the queue """
        for args in args_list:
            self.add_task(func, args)

    def wait_completion(self):
        """ Wait for completion of all the tasks in the queue """
        self.tasks.join()

Worker:

shared_bytes_var = multiprocessing.Value(c_int, 0) # a ctypes var that counts the bytes already downloaded

class Worker(Thread):
    """ Thread executing tasks from a given tasks queue """
    def __init__(self, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True
        self.start()

    def run(self):
        while True:
            func, args, kargs = self.tasks.get()
            try:
                func(*args, **kargs)
            except Exception as e:
                # An exception happened in this thread
                print(e)
                logging.error(e)
            finally:
                # Mark this task as done, whether an exception happened or not
                self.tasks.task_done()

Tasks

In task file, we define the methods that will be executed by the thread pool. This file contains two methods:

  1. Download, which is executed by the first thread pool.

  2. DownloadChunk, which is executed by thread pool for each executor thread of the parent pool.

def DownloadChunk(url, path, startByte=0, endByte=None):

    '''
        Function downloads file.
        @param url: File url address.
        @param path: Destination file path.
        @param startByte: Start byte.
        @param endByte: End byte. Will work only if server supports HTTPRange headers.
        @return path: Destination file path.
         '''
    remotename=url.split('/')[-1]
    filename=path.split('\\')[-1]
    url = url.replace(' ', '%20')
    headers={'User-Agent' : "Magic Browser"}
    if endByte is not None:
        headers['Range'] = 'bytes=%d-%d' % (startByte,endByte)
    req = urllib2.Request(url, headers=headers)
    try:
        urlObj = DownloaderHelper.urlopen_with_retry(req)
    except urllib2.HTTPError, e:

        if "HTTP Error 416" in str(e):
            # HTTP 416 Error: Requested Range Not Satisfiable. Happens when we ask
            # for a range that is not available on the server. It will happen when
            # the server will try to send us a .html page that means something like
            # "you opened too many connections to our server". If this happens, we
            # will wait for the other threads to finish their connections and try again.

            #log.warning("Thread didn't got the file it was expecting. Retrying...")
            print "\n"
            print "Thread didn't got the file it was expecting. Retrying..."           
            time.sleep(5)
            return DownloadChunk(url, path, startByte, endByte)
        else:
            print "\n"
            print e            
            raise e

    f = open(path, 'wb')
    meta = urlObj.info()
    try:
        filesize = int(meta.getheaders("Content-Length")[0])
    except IndexError:
        print "\n"
        print"Server did not send Content-Length."   
        logging.error("Server did not send Content-Length.")
        ShowProgress=False

    filesize_dl = 0
    block_sz = 8192
    while True:

        try:
            buff = urlObj.read(block_sz)
        except (socket.timeout, socket.error, urllib2.HTTPError), e:
            settings.shared_bytes_var.value -= filesize_dl
            print "\n"
            print 'Retrying to download chunk file..'
            time.sleep(5)
            DownloadChunk(url, path, startByte, endByte)             
            raise e


        if not buff:
            break

        filesize_dl += len(buff)
        try:
            settings.shared_bytes_var.value += len(buff)
        except AttributeError:
            print AttributeError
            pass
        try:
            f.write(buff)
        except:
            f.close()
            print "\n"
            print 'Retrying to download chunk file..' 
            DownloadChunk(url, path, startByte, endByte)


        settings.status = r"%.2f MB / %.2f MB %s [%3.2f%%]" % (filesize_dl / 1024.0 / 1024.0,
                                                           filesize / 1024.0 / 1024.0, DownloaderHelper.progress_bar(1.0*filesize_dl/filesize,remotename if (filesize_dl * 100.0 / filesize==100) else filename),
                                                           filesize_dl * 100.0 / filesize)
        settings.status += chr(8)*(len(settings.status)+1)
        print settings.status,
    print "\n"
    f.close()
    logging.info(path)
    return path 



def download(url):
    '''
    Function downloads file parally.
    @param url: File url address.
    @param path: Destination file path.

    @param minChunkFile: Minimum chunk file in bytes.

    @return mapObj: Only if nonBlocking is True. A multiprocessing.pool.AsyncResult object.
    @return pool: Only if nonBlocking is True. A multiprocessing.pool object.
    '''
    processes=settings.no_thread_per_file
    path=None;
    minChunkFile=1024**2;
    nonBlocking=False;
    filename=url.split('/')[-1]
    settings.shared_bytes_var.value = 0
    url = url.replace(' ', '%20')
    if not path:
        path = DownloaderHelper.get_rand_filename(os.environ['temp'])
        if not os.path.exists(os.path.dirname(path)):
            os.makedirs(os.path.dirname(path))

    req = urllib2.Request(url, headers={'User-Agent' : "Magic Browser","Connection": "keep-alive"});
    urlObj = DownloaderHelper.urlopen_with_retry(req)
    meta = urlObj.info()
    filesize = int(meta.getheaders("Content-Length")[0])


    if( filesize/processes > minChunkFile) and DownloaderHelper.Is_ServerSupportHTTPRange(url):
        args1 = []
        tempfilelist=[]
        pos = 0
        chunk = filesize/processes
        for i in range(processes):
            startByte = pos
            endByte = pos + chunk
            if endByte > filesize-1:
                endByte = filesize-1 
            args1.append([url, path+".%.3d" % i, startByte, endByte])
            tempfilelist.append(path+".%.3d" % i);
            pos += chunk+1
    else:
        args1 = [[url, path+".000", None, None]]
        tempfilelist=[path+".000"];

    #print 'Downloading... ',filename
    logging.info(url)
    logging.info(tempfilelist)
    #Thread pool for handling download image chunk file
    pool2 = ThreadPool(settings.no_thread_per_file)
    pool2.map(lambda x: DownloadChunk(*x) , args1)
    while not pool2.tasks.all_tasks_done:
        settings.status = r"%.2f MB / %.2f MB %s [%3.2f%%]" % (settings.shared_bytes_var.value / 1024.0 / 1024.0,
                                                      filesize / 1024.0 / 1024.0, DownloaderHelper.progress_bar(1.0*settings.shared_bytes_var.value/filesize ,filename),
                                                     settings.shared_bytes_var.value * 100.0 / filesize)
        settings.status = settings.status + chr(8)*(len(settings.status)+1)
        print settings.status,
        time.sleep(0.1)


    file_parts = tempfilelist
    pool2.wait_completion()
    settings.download_counter+=1
    DownloaderHelper.combine_files(file_parts, filename)
    return 1

Main.py:

    print 'starting.....';
    logging.info('Starting..');
    lines = [line.rstrip('\n') for line in open('links.txt')]
    pool.map(Tasks.download, lines)
    pool.wait_completion()
    print 'Total files downloaded',settings.download_counter
    logging.info('Total files downloaded %d',settings.download_counter)  

Now, you know about combining parallelism and concurrency in Python!

Hortonworks Sandbox for HDP and HDF is your chance to get started on learning, developing, testing and trying out new features. Each download comes preconfigured with interactive tutorials, sample data and developments from the Apache community.

Topics:
concurrency ,python ,parallelism ,big data

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}