summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Keiser <jkeiser@opscode.com>2014-05-19 15:05:25 -0700
committerJohn Keiser <jkeiser@opscode.com>2014-05-19 15:05:25 -0700
commitd6f24954a16aa5e2e60050e1344265353a381428 (patch)
tree7f65ec13eb62982f64ce4477976c084370ea7a16
parent15d4ff065806478d1c20bf78ce7c2a4c2fb74022 (diff)
downloadchef-d6f24954a16aa5e2e60050e1344265353a381428.tar.gz
Fix tests on 1.8.7
-rw-r--r--lib/chef/chef_fs/parallelizer.rb19
-rw-r--r--lib/chef/chef_fs/parallelizer/parallel_enumerable.rb13
-rw-r--r--spec/unit/chef_fs/parallelizer.rb213
3 files changed, 114 insertions, 131 deletions
diff --git a/lib/chef/chef_fs/parallelizer.rb b/lib/chef/chef_fs/parallelizer.rb
index 8e49e155df..116a626869 100644
--- a/lib/chef/chef_fs/parallelizer.rb
+++ b/lib/chef/chef_fs/parallelizer.rb
@@ -86,16 +86,19 @@ class Chef
private
def worker_loop
- while !@stop_thread[Thread.current]
- begin
- task = @tasks.pop
- task.call
- rescue
- puts "ERROR #{$!}"
- puts $!.backtrace
+ begin
+ while !@stop_thread[Thread.current]
+ begin
+ task = @tasks.pop
+ task.call
+ rescue
+ puts "ERROR #{$!}"
+ puts $!.backtrace
+ end
end
+ ensure
+ @stop_thread.delete(Thread.current)
end
- @stop_thread.delete(Thread.current)
end
end
end
diff --git a/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb b/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb
index 7354bc5c82..8e50f361db 100644
--- a/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb
+++ b/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb
@@ -24,7 +24,7 @@ class Chef
@block = block
@unconsumed_input = Queue.new
- @in_process = 0
+ @in_process = {}
@unconsumed_output = Queue.new
end
@@ -191,7 +191,7 @@ class Chef
# If no one is working on our tasks and we're allowed to
# work on them in the main thread, process an input to
# move things forward.
- if @in_process == 0 && !(@options[:main_thread_processing] == false)
+ if @in_process.size == 0 && !(@options[:main_thread_processing] == false)
process_one
end
end
@@ -225,7 +225,7 @@ class Chef
def stop
@unconsumed_input.clear
- while @in_process > 0
+ while @in_process.size > 0
sleep(0.05)
end
@unconsumed_output.clear
@@ -244,11 +244,11 @@ class Chef
# existing outputs to the user.
#
def finished?
- @unconsumed_input.empty? && @in_process == 0 && @unconsumed_output.empty?
+ @unconsumed_input.empty? && @in_process.size == 0 && @unconsumed_output.empty?
end
def process_one
- @in_process += 1
+ @in_process[Thread.current] = true
begin
begin
input, index = @unconsumed_input.pop(true)
@@ -256,7 +256,7 @@ class Chef
rescue ThreadError
end
ensure
- @in_process -= 1
+ @in_process.delete(Thread.current)
end
end
@@ -264,7 +264,6 @@ class Chef
begin
output = @block.call(input)
@unconsumed_output.push([ output, index, input, :result ])
-
rescue
if @options[:stop_on_exception]
@unconsumed_input.clear
diff --git a/spec/unit/chef_fs/parallelizer.rb b/spec/unit/chef_fs/parallelizer.rb
index 5bc2ecfd17..a871b60e98 100644
--- a/spec/unit/chef_fs/parallelizer.rb
+++ b/spec/unit/chef_fs/parallelizer.rb
@@ -2,21 +2,6 @@ require 'spec_helper'
require 'chef/chef_fs/parallelizer'
describe Chef::ChefFS::Parallelizer do
- class EnumerableWithException
- include Enumerable
-
- def initialize(*results)
- @results = results
- end
-
- def each
- @results.each do |x|
- yield x
- end
- raise 'hi'
- end
- end
-
before :each do
@start_time = Time.now
end
@@ -68,37 +53,39 @@ describe Chef::ChefFS::Parallelizer do
enum = parallelize([0.5,0.3,0.1], :ordered => false) do |val|
sleep val
val
- end.enum_for(:each_with_index)
- enum.next.should == [ 0.1, 2 ]
- elapsed_time.should < 0.2
- enum.next.should == [ 0.3, 1 ]
- elapsed_time.should < 0.4
- enum.next.should == [ 0.5, 0 ]
- elapsed_time.should < 0.6
+ end
+ enum.map do |value|
+ elapsed_time.should < value+0.1
+ value
+ end.should == [ 0.1, 0.3, 0.5 ]
end
it "An exception in input is passed through but does NOT stop processing" do
- enum = parallelize(EnumerableWithException.new(0.5,0.3,0.1), :ordered => false) { |x| sleep(x); x }.enum_for(:each)
- enum.next.should == 0.1
- enum.next.should == 0.3
- enum.next.should == 0.5
- expect { enum.next }.to raise_error 'hi'
+ input = TestEnumerable.new(0.5,0.3,0.1) do
+ raise 'hi'
+ end
+ enum = parallelize(input, :ordered => false) { |x| sleep(x); x }
+ results = []
+ expect { enum.each { |value| results << value } }.to raise_error 'hi'
+ results.should == [ 0.1, 0.3, 0.5 ]
elapsed_time.should < 0.6
end
it "Exceptions in output are raised after all processing is done" do
processed = 0
- enum = parallelize([0.2,0.1,'x',0.3], :ordered => false) do |x|
- sleep(x)
+ enum = parallelize([1,2,'x',3], :ordered => false) do |x|
+ if x == 'x'
+ sleep 0.1
+ raise 'hi'
+ end
+ sleep 0.2
processed += 1
x
- end.enum_for(:each)
- enum.next.should == 0.1
- enum.next.should == 0.2
+ end
+ results = []
+ expect { enum.each { |value| results << value } }.to raise_error 'hi'
+ results.sort.should == [ 1, 2, 3 ]
elapsed_time.should < 0.3
- enum.next.should == 0.3
- expect { enum.next }.to raise_error
- elapsed_time.should < 0.4
processed.should == 3
end
@@ -116,7 +103,6 @@ describe Chef::ChefFS::Parallelizer do
expect { parallelized.to_a }.to raise_error 'hi'
processed.should == 4
end
-
end
context "With :ordered => true (ordered output)" do
@@ -147,33 +133,31 @@ describe Chef::ChefFS::Parallelizer do
end
it "Exceptions in input are raised in the correct sequence but do NOT stop processing" do
- enum = parallelize(EnumerableWithException.new(0.5,0.3,0.1)) { |x| sleep(x); x }.enum_for(:each)
- enum.next.should == 0.5
- elapsed_time.should < 0.7
- enum.next.should == 0.3
- enum.next.should == 0.1
- expect { enum.next }.to raise_error 'hi'
- elapsed_time.should < 0.7
+ input = TestEnumerable.new(0.5,0.3,0.1) do
+ raise 'hi'
+ end
+ results = []
+ enum = parallelize(input) { |x| sleep(x); x }
+ expect { enum.each { |value| results << value } }.to raise_error 'hi'
+ elapsed_time.should < 0.6
+ results.should == [ 0.5, 0.3, 0.1 ]
end
it "Exceptions in output are raised in the correct sequence and running processes do NOT stop processing" do
processed = 0
- enum = parallelize([0.2,0.1,'x',0.3]) do |x|
+ enum = parallelize([1,2,'x',3]) do |x|
if x == 'x'
- while processed < 3
- sleep(0.05)
- end
+ sleep(0.1)
raise 'hi'
end
- sleep(x)
+ sleep(0.2)
processed += 1
x
- end.enum_for(:each)
- enum.next.should == 0.2
- enum.next.should == 0.1
- expect { enum.next }.to raise_error 'hi'
- elapsed_time.should > 0.25
- elapsed_time.should < 0.55
+ end
+ results = []
+ expect { enum.each { |value| results << value } }.to raise_error 'hi'
+ results.should == [ 1, 2 ]
+ elapsed_time.should < 0.3
processed.should == 3
end
@@ -193,28 +177,20 @@ describe Chef::ChefFS::Parallelizer do
end
end
- class SlowEnumerable
- def initialize(*values)
- @values = values
- end
- include Enumerable
- def each
- @values.each do |value|
- yield value
- sleep 0.1
- end
- end
- end
-
it "When the input is slow, output still proceeds" do
- enum = parallelize(SlowEnumerable.new(1,2,3)) { |x| x }.enum_for(:each)
- enum.next.should == 1
- elapsed_time.should < 0.2
- enum.next.should == 2
- elapsed_time.should < 0.3
- enum.next.should == 3
- elapsed_time.should < 0.4
- expect { enum.next }.to raise_error StopIteration
+ input = TestEnumerable.new do |&block|
+ block.call(1)
+ sleep 0.1
+ block.call(2)
+ sleep 0.1
+ block.call(3)
+ sleep 0.1
+ end
+ enum = parallelize(input) { |x| x }
+ enum.map do |value|
+ elapsed_time.should < (value+1)*0.1
+ value
+ end.should == [ 1, 2, 3 ]
end
end
@@ -229,11 +205,14 @@ describe Chef::ChefFS::Parallelizer do
started = false
@occupying_job_finished = occupying_job_finished = [ false ]
@thread = Thread.new do
- parallelizer.parallelize([0], :main_thread_processing => false) do |x|
- started = true
- sleep(0.3)
- occupying_job_finished[0] = true
- end.wait
+ begin
+ parallelizer.parallelize([0], :main_thread_processing => false) do |x|
+ started = true
+ sleep(0.3)
+ occupying_job_finished[0] = true
+ end.wait
+ ensure
+ end
end
while !started
sleep(0.01)
@@ -241,7 +220,9 @@ describe Chef::ChefFS::Parallelizer do
end
after :each do
- Thread.kill(@thread)
+ if RUBY_VERSION.to_f > 1.8
+ Thread.kill(@thread)
+ end
end
it "parallelize with :main_thread_processing = true does not block" do
@@ -286,35 +267,10 @@ describe Chef::ChefFS::Parallelizer do
end
end
- class InputMapper
- include Enumerable
-
- def initialize(*values, &block)
- @values = values
- @block = block
- @num_processed = 0
- end
-
- attr_reader :num_processed
-
- def each
- @values.each do |value|
- @num_processed += 1
- yield value
- end
- if @block
- @block.call do |value|
- @num_processed += 1
- yield value
- end
- end
- end
- end
-
context "enumerable methods should run efficiently" do
it ".count does not process anything" do
outputs_processed = 0
- input_mapper = InputMapper.new(1,2,3,4,5,6)
+ input_mapper = TestEnumerable.new(1,2,3,4,5,6)
enum = parallelizer.parallelize(input_mapper) do |x|
outputs_processed += 1
sleep(0.05) # Just enough to yield and get other inputs in the queue
@@ -327,7 +283,7 @@ describe Chef::ChefFS::Parallelizer do
it ".count with arguments works normally" do
outputs_processed = 0
- input_mapper = InputMapper.new(1,1,1,1,2,2,2,3,3,4)
+ input_mapper = TestEnumerable.new(1,1,1,1,2,2,2,3,3,4)
enum = parallelizer.parallelize(input_mapper) do |x|
outputs_processed += 1
x
@@ -340,7 +296,7 @@ describe Chef::ChefFS::Parallelizer do
it ".first does not enumerate anything other than the first result(s)" do
outputs_processed = 0
- input_mapper = InputMapper.new(1,2,3,4,5,6)
+ input_mapper = TestEnumerable.new(1,2,3,4,5,6)
enum = parallelizer.parallelize(input_mapper) do |x|
outputs_processed += 1
sleep(0.05) # Just enough to yield and get other inputs in the queue
@@ -354,7 +310,7 @@ describe Chef::ChefFS::Parallelizer do
it ".take does not enumerate anything other than the first result(s)" do
outputs_processed = 0
- input_mapper = InputMapper.new(1,2,3,4,5,6)
+ input_mapper = TestEnumerable.new(1,2,3,4,5,6)
enum = parallelizer.parallelize(input_mapper) do |x|
outputs_processed += 1
sleep(0.05) # Just enough to yield and get other inputs in the queue
@@ -367,7 +323,7 @@ describe Chef::ChefFS::Parallelizer do
it ".drop does not process anything other than the last result(s)" do
outputs_processed = 0
- input_mapper = InputMapper.new(1,2,3,4,5,6)
+ input_mapper = TestEnumerable.new(1,2,3,4,5,6)
enum = parallelizer.parallelize(input_mapper) do |x|
outputs_processed += 1
sleep(0.05) # Just enough to yield and get other inputs in the queue
@@ -381,7 +337,7 @@ describe Chef::ChefFS::Parallelizer do
if Enumerable.method_defined?(:lazy)
it ".lazy.take does not enumerate anything other than the first result(s)" do
outputs_processed = 0
- input_mapper = InputMapper.new(1,2,3,4,5,6)
+ input_mapper = TestEnumerable.new(1,2,3,4,5,6)
enum = parallelizer.parallelize(input_mapper) do |x|
outputs_processed += 1
sleep(0.05) # Just enough to yield and get other inputs in the queue
@@ -394,7 +350,7 @@ describe Chef::ChefFS::Parallelizer do
it ".drop does not process anything other than the last result(s)" do
outputs_processed = 0
- input_mapper = InputMapper.new(1,2,3,4,5,6)
+ input_mapper = TestEnumerable.new(1,2,3,4,5,6)
enum = parallelizer.parallelize(input_mapper) do |x|
outputs_processed += 1
sleep(0.05) # Just enough to yield and get other inputs in the queue
@@ -407,7 +363,7 @@ describe Chef::ChefFS::Parallelizer do
it "lazy enumerable is actually lazy" do
outputs_processed = 0
- input_mapper = InputMapper.new(1,2,3,4,5,6)
+ input_mapper = TestEnumerable.new(1,2,3,4,5,6)
enum = parallelizer.parallelize(input_mapper) do |x|
outputs_processed += 1
sleep(0.05) # Just enough to yield and get other inputs in the queue
@@ -425,7 +381,7 @@ describe Chef::ChefFS::Parallelizer do
context "running enumerable multiple times should function correctly" do
it ".map twice on the same parallel enumerable returns the correct results and re-processes the input" do
outputs_processed = 0
- input_mapper = InputMapper.new(1,2,3)
+ input_mapper = TestEnumerable.new(1,2,3)
enum = parallelizer.parallelize(input_mapper) do |x|
outputs_processed += 1
x
@@ -438,7 +394,7 @@ describe Chef::ChefFS::Parallelizer do
it ".first and then .map on the same parallel enumerable returns the correct results and re-processes the input" do
outputs_processed = 0
- input_mapper = InputMapper.new(1,2,3)
+ input_mapper = TestEnumerable.new(1,2,3)
enum = parallelizer.parallelize(input_mapper) do |x|
outputs_processed += 1
x
@@ -498,4 +454,29 @@ describe Chef::ChefFS::Parallelizer do
outputs.each { |output| output.sort.should == 2.upto(501).to_a }
end
end
+
+ class TestEnumerable
+ include Enumerable
+
+ def initialize(*values, &block)
+ @values = values
+ @block = block
+ @num_processed = 0
+ end
+
+ attr_reader :num_processed
+
+ def each(&each_block)
+ @values.each do |value|
+ @num_processed += 1
+ each_block.call(value)
+ end
+ if @block
+ @block.call do |value|
+ @num_processed += 1
+ each_block.call(value)
+ end
+ end
+ end
+ end
end