DZone Snippets is a public source code repository. Easily build up your personal collection of code snippets, categorize them with tags / keywords, and share them with the world

Snippets has posted 5883 posts at DZone. View Full User Profile

Concurrent Map

06.17.2011
| 2108 views |
  • submit to reddit
        A simplified Map/Reduce in mono-process/multithread
Seem useful if generator and Map traitements use much IO input.
'reduce' is not parallelized.

require 'thread'
require 'timeout'



######################## Parallel Map engine ######################################

class ParalleMap 
 def initialize(options)
   @nbThread = options[:nbThread]   || 4
   @generator = options[:generator] || raise("missing generator proc")
   @mapper = options[:mapper]       || raise("missing mapper proc")
   @reducer = options[:reducer]     || nil
   @debug = options[:debug] || false

   @query=Queue.new
   @result=Queue.new
   @lthread=(1..@nbThread).to_a.map { |no| Thread.new(no) { mapping(@query,@result) } }
   @th=Thread.new() { generating() ; do_synthese() }
 end
 def generating()
     # invoque generor and pip result for numering each request
      mbx=Queue.new
      t=Thread.new { @generator.call(mbx) ; mbx << :eend }
      no=0
      loop do
          mess=mbx.pop
          break if mess==:eend
          @query << [no,mess] 
          no+=1
      end
      t.join                              # wait end of query pipe 
      puts "nb file #{no}" if @debug
      @nbThread.times { @query << :eend } # stop message for each mapper
  end
  def do_synthese
      @lthread.each { |th| th.join }      # wait each mapper has ended
      # pop parelle result in one array, sort it by creation order

      res=[] ; res << @result.pop while @result.size > 0
      res.sort! { |a,b| a[0]==b[0] ? a[1]<=>b[1] : a[0] <=> b[0] }
      res.map! {|a| a[2] }
      res=  @reducer.call( res ) if @reducer     
      @resultats=res
 end
 def get_result()
    @th.join  # wait end on generator+synthese
    @resultats
 end
  
 def mapping(queue,result)
   nom=0
   loop {
      mess=queue.shift
      return if mess==:eend
      no,mess = *mess
      begin
        r=[]
        @mapper.call(r,mess)
        r.each { |elem| result << [no,nom,elem] ; nom+=1 } 
      rescue
        puts "ERROR in mapper on %s : %s" % [mess.inspect,$!.to_s]
      end
   }
 end
end


And here a use for recursive file grep :

############################# invoke bloc foreach filename matching file
def rfind(root,filter,&blk)
  $nbfile=0
  Dir.glob("#{root}/*").each do |en|
    bn=File.basename(en)
    next if bn =~ /^\.\.?$/
    if File.directory?(en)
      rfind(en,filter,&blk)
    else
      blk.call(en) if File.fnmatch( filter, bn.downcase())
    end
  end
end

####################### Map : grep on one file  #####################

def selectLine(out,matcher,file) 
  result=[]
  File.open(file,"r") do |f|
   f.readlines.each_with_index { |line,nol|
       out <<  "%s:%09d:%s" % [file,nol,line] if matcher =~ line
   }
  end
end

####################### Reduce : sort result by filena	me/noline

def reduce(l) 
 l.map { |s|  
   a=s.split(":",3)
   a[1].gsub!(/^0+/,'')
   "%s:%s :: %s" % [a[0],a[1],a[2]]
  } 
end


####################################################################################
#                                M A I N                                           #
####################################################################################

debug=false
debug=ARGV.shift if ARGV[0]=="-v"

raise("Usage : > pgrep regexp path 'file-filter'") if ARGV.length != 3 
query= /#{ARGV[0]}/
path =  ARGV[1]
ext  = ARGV[2]
ext= (ext =~ /\*/) ? ext : '*.'+ext

starting=Time.now.to_f
pm=ParalleMap.new(
       :nbThread   => 5, 
       :generator  => proc { |res| rfind(path,ext.downcase) { |file| res << file ; } },
       :mapper     => proc { |out,in_file_name|  selectLine(out,query,in_file_name) },
       :reducer    =>proc { |rr| reduce(rr) },
       :debug => (debug!=nil)
		 )
result=pm.get_result()

ending=Time.now.to_f
result.each { |s| puts s }
puts "\n Duration: #{ending-starting} secs" if debug


Benchmark
=========
with a core i7 / 8 cores /8GB ram (!)  / Windows 7

Ruby 1.9.2 :

>ruby pgrep.rb -v TestHtml  . '*.rb'
nb file 2805
./wiki/instiki/attic/vendor/plugins/HTML5lib/test/test_input_stream.rb:4 :: class TestHtml5Inputstream < Test::Unit::TestCase

 Duration: 3.6732099056243896 secs

With msys find/grep :
>chrono gfind . -name '*.rb'  -exec grep TestHtml  {} /dev/null ;
gfind . -name '*.rb' -exec grep TestHtml {} /dev/null ;
./wiki/instiki/attic/vendor/plugins/HTML5lib/test/test_input_stream.rb:class TestHtml5Inputstream < Test::Unit::TestCase
Duree 41619.0 ms

With grep -r  :
>chrono grep -r --include=*.rb TestHtml  .
grep -r '--include=*.rb' TestHtml .
./wiki/instiki/attic/vendor/plugins/HTML5lib/test/test_input_stream.rb:class TestHtml5Inputstream < Test::Unit::TestCase
Duree 1639.0 ms

Jruby (1.5, java 6) :
>jruby pgrep.rb -v TestHtml  . '*.rb'
nb file 2805
./wiki/instiki/attic/vendor/plugins/HTML5lib/test/test_input_stream.rb:4 :: class TestHtml5Inputstream < Test::Unit::TestCase

 Duration: 0.977999925613403 secs

IronRuby (1.1.3 .NET 4.0);
>ir pgrep.rb -v TestHtml  . rb
nb file 2805
./wiki/instiki/attic/vendor/plugins/HTML5lib/test/test_input_stream.rb:4 :: class TestHtml5Inputstream < Test::Unit::TestCase

 Duration: 0.908051013946533 secs


(with chrono.rb as :
ARGV.map! { |a| a=~/\*/ ? "'"+a+"'" : a}
puts ARGV.join(" ")
date_start=Time.now
system(*ARGV)
puts  "Duree " + ((Time.now-date_start)*1000).to_s + " ms"
)