-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconcurrent.rb
48 lines (43 loc) · 1.02 KB
/
concurrent.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#! /usr/bin/ruby19
require 'revactor'
require 'mapreduce'
class Concurrent < MapReduce
def run(input)
@input = input
current = Actor.current
Actor.spawn { do_reduce(current) }
Actor.receive do |filter|
filter.when(T[current, Object]) {|_, obj| @output = obj}
end
end
#######
private
#######
def do_reduce(parent)
reduce_actor = Actor.current
reduce_actor.trap_exit = true
@input.each do |element|
Actor.spawn_link { @map.call(reduce_actor, element) }
end
dictionary = collect_partials
result = {}
dictionary.each do |key, value|
result[key] = @reduce.call(key, value)[1]
end
parent << [parent, result]
end
def collect_partials
n = @input.size
dictionary = {}
while n > 0
Actor.receive do |filter|
filter.when(Case[:exit, Actor, Object]) { n -= 1 }
filter.when(T[String, Fixnum]) do |key, value|
dictionary[key] ||= []
dictionary[key] << value
end
end
end
dictionary
end
end