DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones AWS Cloud
by AWS Developer Relations
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones
AWS Cloud
by AWS Developer Relations

Multiprocessing Goodness Part 1: Use Case

Steven Lott user avatar by
Steven Lott
·
Feb. 03, 12 · Interview
Like (0)
Save
Tweet
Share
5.70K Views

Join the DZone community and get the full member experience.

Join For Free
The advantage of multiprocessing is to have multiple processes working on a problem.  If we break a big problem into small, concurrent steps, we can often get results in less elapsed time by making more effective use of the CPU.  Specifically, we want to make use of non-user time where our process might be waiting for something on the network or waiting for physical I/O to finish.

There are limits on the speedup offered by multiprocessing.  Once utilization gets to 100%×cores, we can't go any faster.  However, there are numerous processes that do a lot of I/O or a lot of network access; we can use Python's multiprocessing module to make more effective use of our CPU.

The easiest approach to multiprocessing is to use the shell's pipeline philosophy.  Break the processing up into small steps, each of which reads from a source stream and writes to an output stream.  The long-standing tradition here is to read from `sys.stdin` and write to `sys.stdout`.  The multiprocessing module, however, gives us tools to achieve this with relatively little pain.

Rather than use a simple pipe, however, we need to use a multiprocessing.Queue.  In shell parlance we might have `func1 | func2 | func3`.

For multiprocessing purposes, we'd have something a hair more complex looking.

    q1 = Queue()
    q2 = Queue()
    p1 = Process( target=func1, kwargs=dict(output=q1))
    p2 = Process( target=func2, kwargs=dict(input=q1, output=q2))
    p3 = Process( target=func3, kwargs=dict(input=q2))
    p1.start()
    p2.start()
    p3.start()

While wordy, it hints at a more generalized approach to have three processes passing data.

Termination

The issue is one of termination.  Most multiprocessing packages (like multiprocessing and celery) presume that your processing pipeline has a fairly long lifetime.  Because of this, it presumes that you can determine that it's idle and kill it off one process at a time.

This isn't a bad assumption, and probably covers a large number of use cases.

It doesn't, however, cover the simple shell-like `func1 | func2 | func3` use case very well at all.

Why not?

Because we can't easily tell when a queue is shutdown for good and all.  A producing process can close a queue, but that's not a piece of information that shows up at the consumer end of the queue.

Queues are designed to be durable and have multiple produces.  There's no easy way to know a Queue is no longer needed.  Each producer would have to attempt to close the Queue and the Queue would have to know the intended number of producers.  If processes are dynamic, then the number of producers may not have a fixed, known-in-advance limit.

The approach, therefore, is to put a sentinel object in the queue.  This way, a consumer knows that production has finished.  It can release resources and exit politely.

Fan-Out and Fan-In

The problem with a sentinel on a multi-producer queue is that there will be multiple sentinels, one from each producer.  And, of course, with a multi-consumer queue, there must be one sentinels for each consumer.

If producers adhere to a sentinel-per-consumer rule, and consumers know to expect a sentinel-per-producer, then we can easily create dynamic multi-processing networks that startup and shutdown quickly and cleanly.

Use Case

Here's a use case.  We want to do whois analysis on IP addresses in a log.

If we have a simple loop to parse the log and do a whois request on each host IP address, the processing will be slow.  It uses approximately no CPU, since it spends almost all of it's time waiting for input from the log, waiting for whois, or waiting for buffers to be written in the output file.

If we make a simple three-step pipeline (parse | whois | report) then we get some improvement in elapsed time, but -- really -- the whois step is killing the throughput.

What we need is a way to run a dozen whois requests concurrently.  This leads us to multiprocessing, fan-out and fan-in.

Here's what we want.

def analyze_ip( logs ):
    user_queue = Queue()
    report_queue= Queue()
    
    user_from_log= ProducerProcess( name='book_users', target=book_users, args=(logs,), output_queue=user_queue, consumers=12 )
    user_from_log.start()
    
    workers= []
    for worker in range(12):
        get_details= ConsumerProducerProcess( name='user_whois', target=user_whois, kwargs=dict(LIVE=False),
        input_queue=user_queue, output_queue=report_queue, producers=1, consumers=1 )
        get_details.start()
        workers.append(get_details)
    
    report= ConsumerProcess( name='final_report', target=final_report, 
        input_queue=report_queue, producers=12 )
    report.start()
                
    user_from_log.join()
    for w in workers:
        w.join()
    report.join()

This will do a number of concurrent whois requests, tying up lots and lots of resources and (hopefully) saturating the CPU with real work.

This shows a fan-out from one ProducerProcess to a dozen ConsumerProducerProcess instances.  It shows a fan-in from the ConsumerProducerProcess to a single ConsumerProcess that writes the final report.

This is trivially scaled up (or done) by changing the number of processes in the middle.

What's important is that the actual functions involved (book_users, user_whois and final_report) are relatively trivial generator functions that consume source data (log files or queue entries) and produce results (queue entries or a report file.)

Also important is the fact that it closes down cleanly.  When the input reaches end-of-file, sentinel values are put into the queues to trickle through and lead to orderly process shutdown.



Source: http://slott-softwarearchitect.blogspot.com/2012/02/multiprocessing-goodness-part-1-use.html
Use case

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • What Are the Different Types of API Testing?
  • Use Golang for Data Processing With Amazon Kinesis and AWS Lambda
  • Java REST API Frameworks
  • Apache Kafka Is NOT Real Real-Time Data Streaming!

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends: