summaryrefslogtreecommitdiff
path: root/lib/chef/chef_fs/parallelizer.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/chef/chef_fs/parallelizer.rb')
-rw-r--r--lib/chef/chef_fs/parallelizer.rb102
1 files changed, 0 insertions, 102 deletions
diff --git a/lib/chef/chef_fs/parallelizer.rb b/lib/chef/chef_fs/parallelizer.rb
deleted file mode 100644
index c4d17a842d..0000000000
--- a/lib/chef/chef_fs/parallelizer.rb
+++ /dev/null
@@ -1,102 +0,0 @@
-require_relative "parallelizer/parallel_enumerable"
-
-class Chef
- module ChefFS
- # Tries to balance several guarantees, in order of priority:
- # - don't get deadlocked
- # - provide results in desired order
- # - provide results as soon as they are available
- # - process input as soon as possible
- class Parallelizer
- @@parallelizer = nil
- @@threads = 0
-
- def self.threads=(value)
- @@threads = value
- @@parallelizer.resize(value) if @@parallelizer
- end
-
- def self.parallelizer
- @@parallelizer ||= Parallelizer.new(@@threads)
- end
-
- def self.parallelize(enumerable, options = {}, &block)
- parallelizer.parallelize(enumerable, options, &block)
- end
-
- def self.parallel_do(enumerable, options = {}, &block)
- parallelizer.parallel_do(enumerable, options, &block)
- end
-
- def initialize(num_threads)
- @tasks = Queue.new
- @threads = []
- @stop_thread = {}
- resize(num_threads)
- end
-
- def num_threads
- @threads.size
- end
-
- def parallelize(enumerable, options = {}, &block)
- ParallelEnumerable.new(@tasks, enumerable, options, &block)
- end
-
- def parallel_do(enumerable, options = {}, &block)
- ParallelEnumerable.new(@tasks, enumerable, options.merge(ordered: false), &block).wait
- end
-
- def stop(wait = true, timeout = nil)
- resize(0, wait, timeout)
- end
-
- def resize(to_threads, wait = true, timeout = nil)
- if to_threads < num_threads
- threads_to_stop = @threads[to_threads..num_threads - 1]
- @threads = @threads.slice(0, to_threads)
- threads_to_stop.each do |thread|
- @stop_thread[thread] = true
- end
-
- if wait
- start_time = Time.now
- threads_to_stop.each do |thread|
- thread_timeout = timeout ? timeout - (Time.now - start_time) : nil
- thread.join(thread_timeout)
- end
- end
-
- else
- num_threads.upto(to_threads - 1) do |i|
- @threads[i] = Thread.new(&method(:worker_loop))
- end
- end
- end
-
- def kill
- @threads.each do |thread|
- Thread.kill(thread)
- @stop_thread.delete(thread)
- end
- @threads = []
- end
-
- private
-
- def worker_loop
- until @stop_thread[Thread.current]
- begin
- task = @tasks.pop
- task.call
- rescue
- puts "ERROR #{$!}"
- puts $!.backtrace
- end
- end
- ensure
- @stop_thread.delete(Thread.current)
- end
- end
- end
-end