Multiprocessing Goodness Part 2: Class Definitions
Join the DZone community and get the full member experience.
Join For FreeThis post is the second part of Steven Lott's series on Multiprocessing Goodness. Read Part 1.
The multiprocessing module includes a generic Process class, which can be used to wrap a simple function.
The function must be design to work with Queues or Pipelines or other synchronization techniques.
There's an advantage, however, to defining a class which gracefully handles generator functions. If we have Generator-Aware multi-processing, we can (1) write our algorithms as generators and then (2) trivially connect Processes with Queues to improve scalability.
We're looking at creating processing "pipelines" using Queues. That way we can easily handle multiple-producer and multiple-consumer (fan-in, fan-out) processing that enhances concurrency.
We have three use cases: Producer, Consumer and Consumer-Producer.
This class will wrap a "target" function which must be a generator. Every value yielded is put into the "output_queue". When the source data runs out, enough sentinel tokens are put into the queue to satisfy all consumers.
This class will wrap a "target" function which must be ready to work with any iterable. Every value from the queue will be provided to the target function for processing. When enough sentinel tokens have been consumed from producers, it terminates processing.
This class will wrap a "target" function which must be a generator function that consumes an iterable.
Every value from the queue is provided to the target generator. Every value yielded by the generator is sent to the output queue. The input side counts sentinels to know when to stop. The output side produces enough sentinels to alert downstream processes.
A consumer function looks like this:
Finally, a consumer-producer function looks like this.
These functions can be tested and debugged like this.
That way we're confident that our algorithm is correct before attempting to scale it with multiprocessing.
Source: http://slott-softwarearchitect.blogspot.com/2012/02/multiprocessing-goodness-part-2-class.html
The multiprocessing module includes a generic Process class, which can be used to wrap a simple function.
The function must be design to work with Queues or Pipelines or other synchronization techniques.
There's an advantage, however, to defining a class which gracefully handles generator functions. If we have Generator-Aware multi-processing, we can (1) write our algorithms as generators and then (2) trivially connect Processes with Queues to improve scalability.
We're looking at creating processing "pipelines" using Queues. That way we can easily handle multiple-producer and multiple-consumer (fan-in, fan-out) processing that enhances concurrency.
We have three use cases: Producer, Consumer and Consumer-Producer.
Producer
A Producer gets data from somewhere and populates a queue with it. This is the source that feeds data into the pipeline.class ProducerProcess( Process ): """Produces items into a Queue. The "target" must be a generator function which yields pickable items. """ def __init__( self, group=None, target=None, name=None, args=None, kwargs=None, output_queue=None, consumers=0 ): super( ProducerProcess, self ).__init__( name=name ) self.target= target self.args= args if args is not None else [] self.kwargs= kwargs if kwargs is not None else {} self.output_queue= output_queue self.consumers= consumers def run( self ): target= self.target for item in target(*self.args, **self.kwargs): self.output_queue.put( item ) for x in range(self.consumers): self.output_queue.put( None ) self.output_queue.close()
This class will wrap a "target" function which must be a generator. Every value yielded is put into the "output_queue". When the source data runs out, enough sentinel tokens are put into the queue to satisfy all consumers.
Consumer
A Consumer gets data from a queue and does some final processing. Perhaps it loads a database, or writes a file. It is the sink that consumes data on the pipeline.class ConsumerProcess( Process ): """Consumes items from a Queue. The "target" must be a function which expects an iterable as it's only argument. Therefore, the args value is not used here. """ def __init__( self, group=None, target=None, name=None, kwargs=None, input_queue=None, producers=0 ): super( ConsumerProcess, self ).__init__( name=name ) self.target= target self.kwargs= kwargs if kwargs is not None else {} self.input_queue= input_queue self.producers= producers def items( self ): while self.producers != 0: for item in iter( self.input_queue.get, None ): yield item self.producers -= 1 def run( self ): target= self.target target( self.items(), **self.kwargs )
This class will wrap a "target" function which must be ready to work with any iterable. Every value from the queue will be provided to the target function for processing. When enough sentinel tokens have been consumed from producers, it terminates processing.
Consumer-Producer
The middle of a processing pipeline is consumer-producer processes which consume from one queue and the produce to another queue.class ConsumerProducerProcess( Process ): """Consumes items from a Queue and produces items onto a Queue. The "target" must be a generator function which yields pickable items and which expects an iterable as it's only argument. Therefore, the args value is not used here. """ def __init__( self, group=None, target=None, name=None, kwargs=None, input_queue=None, producers=0, output_queue=None, consumers=0 ): super( ConsumerProducerProcess, self ).__init__( name=name ) self.target= target self.kwargs= kwargs if kwargs is not None else {} self.input_queue= input_queue self.producers= producers self.output_queue= output_queue self.consumers= consumers def items( self ): while self.producers != 0: for item in iter( self.input_queue.get, None ): yield item self.producers -= 1 def run( self ): target= self.target for item in target(self.items(), **self.kwargs): self.output_queue.put( item ) for x in range(self.consumers): self.output_queue.put( None ) self.output_queue.close()
This class will wrap a "target" function which must be a generator function that consumes an iterable.
Every value from the queue is provided to the target generator. Every value yielded by the generator is sent to the output queue. The input side counts sentinels to know when to stop. The output side produces enough sentinels to alert downstream processes.
Target Functions
A producer function must be a generator function of this formdef prod( *args ): for item in some_function(*args): yield item
A consumer function looks like this:
def cons( source ): for item in source: final_disposition(item)
Finally, a consumer-producer function looks like this.
def cons_prod( source ): for item in source: next_value= transform(item) yield next_value
These functions can be tested and debugged like this.
for final in consumer( cons_prod( producer( *args ) ) ): print( final )
That way we're confident that our algorithm is correct before attempting to scale it with multiprocessing.
Source: http://slott-softwarearchitect.blogspot.com/2012/02/multiprocessing-goodness-part-2-class.html
Processing
Database
Data (computing)
producer
consumer
Pipeline (software)
Fan-out (software)
Algorithm
Scalability
Opinions expressed by DZone contributors are their own.
Comments