DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones AWS Cloud
by AWS Developer Relations
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones
AWS Cloud
by AWS Developer Relations
The Latest "Software Integration: The Intersection of APIs, Microservices, and Cloud-Based Systems" Trend Report
Get the report
  1. DZone
  2. Data Engineering
  3. AI/ML
  4. Multiprocessing Goodness Part 2: Class Definitions

Multiprocessing Goodness Part 2: Class Definitions

Steven Lott user avatar by
Steven Lott
·
Feb. 13, 12 · Interview
Like (0)
Save
Tweet
Share
3.96K Views

Join the DZone community and get the full member experience.

Join For Free
This post is the second part of Steven Lott's series on Multiprocessing Goodness.  Read Part 1.

The multiprocessing module includes a generic Process class, which can be used to wrap a simple function.

The function must be design to work with Queues or Pipelines or other synchronization techniques.

There's an advantage, however, to defining a class which gracefully handles generator functions.  If we have Generator-Aware multi-processing, we can (1) write our algorithms as generators and then (2) trivially connect Processes with Queues to improve scalability.

We're looking at creating processing "pipelines" using Queues.  That way we can easily handle multiple-producer and multiple-consumer (fan-in, fan-out) processing that enhances concurrency.

We have three use cases:  Producer, Consumer and Consumer-Producer.

Producer

A Producer gets data from somewhere and populates a queue with it.  This is the source that feeds data into the pipeline.

class ProducerProcess( Process ):
    """Produces items into a Queue.
    
    The "target" must be a generator function which yields
    pickable items.
    """
    def __init__( self, group=None, target=None, name=None, args=None, kwargs=None, output_queue=None, consumers=0 ):
        super( ProducerProcess, self ).__init__( name=name )
        self.target= target
        self.args= args if args is not None else []
        self.kwargs= kwargs if kwargs is not None else {}
        self.output_queue= output_queue
        self.consumers= consumers
    def run( self ):
        target= self.target
        for item in target(*self.args, **self.kwargs):
            self.output_queue.put( item )
        for x in range(self.consumers):
            self.output_queue.put( None )
        self.output_queue.close()

This class will wrap a "target" function which must be a generator.   Every value yielded is put into the "output_queue".  When the source data runs out, enough sentinel tokens are put into the queue to satisfy all consumers.

Consumer

A Consumer gets data from a queue and does some final processing.  Perhaps it loads a database, or writes a file.  It is the sink that consumes data on the pipeline.

class ConsumerProcess( Process ):
    """Consumes items from a Queue.
    
    The "target" must be a function which expects an iterable as it's
    only argument.  Therefore, the args value is not used here.
    """
    def __init__( self, group=None, target=None, name=None, kwargs=None, input_queue=None, producers=0 ):
        super( ConsumerProcess, self ).__init__( name=name )
        self.target= target
        self.kwargs= kwargs if kwargs is not None else {}
        self.input_queue= input_queue
        self.producers= producers
    def items( self ):
        while self.producers != 0:
            for item in iter( self.input_queue.get, None ):
                yield item
            self.producers -= 1
    def run( self ):
        target= self.target
        target( self.items(), **self.kwargs )

This class will wrap a "target" function which must be ready to work with any iterable.  Every value from the queue will be provided to the target function for processing.  When enough sentinel tokens have been consumed from producers, it terminates processing.

Consumer-Producer

The middle of a processing pipeline is consumer-producer processes which consume from one queue and the produce to another queue.

class ConsumerProducerProcess( Process ):
    """Consumes items from a Queue and produces items onto a Queue.
    
    The "target" must be a generator function which yields
    pickable items and which expects an iterable as it's
    only argument.  Therefore, the args value is not used here.
    """
    def __init__( self, group=None, target=None, name=None, kwargs=None, input_queue=None, producers=0, output_queue=None, consumers=0 ):
        super( ConsumerProducerProcess, self ).__init__( name=name )
        self.target= target
        self.kwargs= kwargs if kwargs is not None else {}
        self.input_queue= input_queue
        self.producers= producers
        self.output_queue= output_queue
        self.consumers= consumers
    def items( self ):
        while self.producers != 0:
            for item in iter( self.input_queue.get, None ):
                yield item
            self.producers -= 1
    def run( self ):
        target= self.target
        for item in target(self.items(), **self.kwargs):
            self.output_queue.put( item )
        for x in range(self.consumers):
            self.output_queue.put( None )
        self.output_queue.close()

This class will wrap a "target" function which must be a generator function that consumes an iterable.
Every value from the queue is provided to the target generator.  Every value yielded by the generator is sent to the output queue.  The input side counts sentinels to know when to stop.  The output side produces enough sentinels to alert downstream processes.

Target Functions

A producer function must be a generator function of this form

def prod( *args ):
    for item in some_function(*args):
       yield item

A consumer function looks like this:

def cons( source ):
    for item in source:
       final_disposition(item)

Finally, a consumer-producer function looks like this.

def cons_prod( source ):
    for item in source:
       next_value= transform(item)
       yield next_value

These functions can be tested and debugged like this.

for final in consumer( cons_prod( producer( *args ) ) ):
    print( final )

That way we're confident that our algorithm is correct before attempting to scale it with multiprocessing.



Source: http://slott-softwarearchitect.blogspot.com/2012/02/multiprocessing-goodness-part-2-class.html

Processing Database Data (computing) producer consumer Pipeline (software) Fan-out (software) Algorithm Scalability

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • Keep Your Application Secrets Secret
  • Application Architecture Design Principles
  • Kubernetes-Native Development With Quarkus and Eclipse JKube
  • Introduction To OpenSSH

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends: