summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Keiser <jkeiser@opscode.com>2014-05-17 22:41:15 -0700
committerJohn Keiser <jkeiser@opscode.com>2014-05-17 22:41:15 -0700
commit31b553dcbcc9623eb75a8faa11c01e627851bd37 (patch)
tree55e5c699b0917d709cb033d055469114f9c8ef4e
parentede39a9532b56f24a77efe34086d0e8e13082cc2 (diff)
downloadchef-31b553dcbcc9623eb75a8faa11c01e627851bd37.tar.gz
Add each_with_exceptions to allow all results to be known
-rw-r--r--lib/chef/chef_fs/parallelizer.rb103
-rw-r--r--spec/unit/chef_fs/parallelizer.rb28
2 files changed, 66 insertions, 65 deletions
diff --git a/lib/chef/chef_fs/parallelizer.rb b/lib/chef/chef_fs/parallelizer.rb
index 2c58323543..5b575bb207 100644
--- a/lib/chef/chef_fs/parallelizer.rb
+++ b/lib/chef/chef_fs/parallelizer.rb
@@ -78,7 +78,6 @@ class Chef
# NOTE: If you set this to false, parallelizer.kill will stop each()
# in its tracks, so you need to know for sure that won't happen.
def initialize(parent_task_queue, enumerable, options, &block)
-
@parent_task_queue = parent_task_queue
@enumerable = enumerable
@options = options
@@ -91,72 +90,71 @@ class Chef
end
def each
- if @options[:ordered] == false
- each_with_input_unordered do |input, output, index|
- yield output
- end
- else
- each_with_input_ordered do |input, output, index|
- yield output
- end
+ each_with_input do |output, index, input, type|
+ yield output
end
end
def each_with_index
- if @options[:ordered] == false
- each_with_input_unordered do |input, output, index|
- yield output, index
- end
- else
- each_with_input_ordered do |input, output, index|
- yield output, index
+ each_with_input do |output, index, input|
+ yield output, index
+ end
+ end
+
+ def each_with_input
+ exception = nil
+ each_with_exceptions do |output, index, input, type|
+ if type == :exception
+ if @options[:ordered] == false
+ exception ||= output
+ else
+ raise output
+ end
+ else
+ yield output, index, input
end
end
+ raise exception if exception
end
- def each_with_input(&block)
+ def each_with_exceptions(&block)
if @options[:ordered] == false
- each_with_input_unordered(&block)
+ each_with_exceptions_unordered(&block)
else
- each_with_input_ordered(&block)
+ each_with_exceptions_ordered(&block)
end
end
def wait
- each_with_input_unordered do |type, input, output, index|
+ exception = nil
+ each_with_exceptions_unordered do |output, index, input, type|
+ exception ||= output if type == :exception
end
+ raise exception if exception
end
- def each_with_exceptions
- end
+ private
- def each_with_input_unordered
+ def each_with_exceptions_unordered
# Grab all the inputs, yielding any responses during enumeration
# in case the enumeration itself takes time
- exception = nil
-
begin
@enumerable.each_with_index do |input, index|
@unconsumed_input.push([ input, index ])
@parent_task_queue.push(method(:process_one))
no_more_inputs = false
while !@unconsumed_output.empty?
- type, input, output, index = @unconsumed_output.pop
- if type == :exception
- exception ||= output
- if @options[:stop_on_exception]
- @unconsumed_input.clear
- no_more_inputs = true
- end
- else
- yield input, output, index
+ output, index, input, type = @unconsumed_output.pop
+ yield output, index, input, type
+ if type == :exception && @options[:stop_on_exception]
+ no_more_inputs = true
end
end
break if no_more_inputs
end
rescue
# We still want to wait for the rest of the outputs to process
- @unconsumed_output.push([:exception, nil, $!, nil])
+ @unconsumed_output.push([$!, nil, nil, :exception])
if @options[:stop_on_exception]
@unconsumed_input.clear
end
@@ -169,15 +167,7 @@ class Chef
end
while !@unconsumed_output.empty?
- type, input, output, index = @unconsumed_output.pop
- if type == :exception
- exception ||= output
- if @options[:stop_on_exception]
- @unconsumed_input.clear
- end
- else
- yield input, output, index
- end
+ yield @unconsumed_output.pop
end
# If no one is working on our tasks and we're allowed to
@@ -187,27 +177,25 @@ class Chef
process_one
end
end
-
- if exception
- raise exception
- end
end
- def each_with_input_ordered
+ def each_with_exceptions_ordered
next_to_yield = 0
unconsumed = {}
- each_with_input_unordered do |input, output, index|
- unconsumed[index] = [ input, output ]
+ each_with_exceptions_unordered do |output, index, input, type|
+ unconsumed[index] = [ output, input, type ]
while unconsumed[next_to_yield]
input_output = unconsumed.delete(next_to_yield)
- yield input_output[0], input_output[1], next_to_yield
+ yield input_output[0], next_to_yield, input_output[1], input_output[2]
next_to_yield += 1
end
end
+ input_exception = unconsumed.delete(nil)
+ if input_exception
+ yield input_exception[0], next_to_yield, input_exception[1], input_exception[2]
+ end
end
- private
-
def process_one
@in_process += 1
begin
@@ -224,9 +212,12 @@ class Chef
def process_input(input, index)
begin
output = @block.call(input)
- @unconsumed_output.push([ :result, input, output, index ])
+ @unconsumed_output.push([ output, index, input, :result ])
rescue
- @unconsumed_output.push([ :exception, input, $!, index ])
+ if @options[:stop_on_exception]
+ @unconsumed_input.clear
+ end
+ @unconsumed_output.push([ $!, index, input, :exception ])
end
index
diff --git a/spec/unit/chef_fs/parallelizer.rb b/spec/unit/chef_fs/parallelizer.rb
index 7823c307f9..5821721703 100644
--- a/spec/unit/chef_fs/parallelizer.rb
+++ b/spec/unit/chef_fs/parallelizer.rb
@@ -105,14 +105,16 @@ describe Chef::ChefFS::Parallelizer do
it "Exceptions with :stop_on_exception are raised after all processing is done" do
processed = 0
parallelized = parallelize([0.3,0.3,'x',0.3,0.3,0.3,0.3,0.3], :ordered => false, :stop_on_exception => true) do |x|
- raise 'hi' if x == 'x'
+ if x == 'x'
+ sleep(0.1)
+ raise 'hi'
+ end
sleep(x)
processed += 1
x
end
expect { parallelized.to_a }.to raise_error 'hi'
- processed.should <= 5
- processed.should >= 2
+ processed.should == 4
end
end
@@ -154,16 +156,22 @@ describe Chef::ChefFS::Parallelizer do
elapsed_time.should < 0.7
end
- it "Exceptions in output are raised in the correct sequence but do NOT stop processing" do
+ it "Exceptions in output are raised in the correct sequence and running processes do NOT stop processing" do
processed = 0
enum = parallelize([0.2,0.1,'x',0.3]) do |x|
+ if x == 'x'
+ while processed < 3
+ sleep(0.05)
+ end
+ raise 'hi'
+ end
sleep(x)
processed += 1
x
end.enum_for(:each)
enum.next.should == 0.2
enum.next.should == 0.1
- expect { enum.next }.to raise_error
+ expect { enum.next }.to raise_error 'hi'
elapsed_time.should > 0.25
elapsed_time.should < 0.55
processed.should == 3
@@ -171,15 +179,17 @@ describe Chef::ChefFS::Parallelizer do
it "Exceptions with :stop_on_exception are raised after all processing is done" do
processed = 0
- parallelized = parallelize([0.3,0.3,'x',0.3,0.3,0.3,0.3,0.3], :stop_on_exception => true) do |x|
- raise 'hi' if x == 'x'
+ parallelized = parallelize([0.3,0.3,'x',0.3,0.3,0.3,0.3,0.3], :ordered => false, :stop_on_exception => true) do |x|
+ if x == 'x'
+ sleep(0.1)
+ raise 'hi'
+ end
sleep(x)
processed += 1
x
end
expect { parallelized.to_a }.to raise_error 'hi'
- processed.should <= 5
- processed.should >= 2
+ processed.should == 4
end
end
end