diff options
author | Andre Arko <andre@arko.net> | 2014-07-22 22:01:03 -0700 |
---|---|---|
committer | Andre Arko <andre@arko.net> | 2014-07-22 22:01:04 -0700 |
commit | b5444a3e50524c703062865badce8124f8957c48 (patch) | |
tree | d45f8c407e282f7bfd415fdd9e34331c5c1199ee /lib/bundler/worker.rb | |
parent | fb65ebe5aec8f67617031a0f1232b8ee5744411b (diff) | |
download | bundler-b5444a3e50524c703062865badce8124f8957c48.tar.gz |
Refactor workers completely.
Since we no longer have multiple types of workers, it was possible to
collapse the entire set of worker classes into a single Worker class.
While I was in there, I tried to simplify the structure of the worker by
breaking out individual tasks into methods.
I have no idea if this actually works in complex cases in the real
world, but it worked for me to install some gemfiles with threads.
Diffstat (limited to 'lib/bundler/worker.rb')
-rw-r--r-- | lib/bundler/worker.rb | 71 |
1 files changed, 71 insertions, 0 deletions
diff --git a/lib/bundler/worker.rb b/lib/bundler/worker.rb new file mode 100644 index 0000000000..3922a57674 --- /dev/null +++ b/lib/bundler/worker.rb @@ -0,0 +1,71 @@ +module Bundler + class Worker + POISON = Object.new + + class WrappedException < StandardError + attr_reader :exception + def initialize(exn) + @exception = exn + end + end + + # Creates a worker pool of specified size + # + # @param size [Integer] Size of pool + # @param func [Proc] job to run in inside the worker pool + def initialize(size, func) + @request_queue = Queue.new + @response_queue = Queue.new + @func = func + @threads = size.times.map { |i| Thread.start { process_queue(i) } } + trap("INT") { abort_threads } + end + + # Enqueue a request to be executed in the worker pool + # + # @param obj [String] mostly it is name of spec that should be downloaded + def enq(obj) + @request_queue.enq obj + end + + # Retrieves results of job function being executed in worker pool + def deq + result = @response_queue.deq + raise result.exception if result.is_a?(WrappedException) + result + end + + def stop + stop_threads + end + + private + + def process_queue(i) + loop do + obj = @request_queue.deq + break if obj.equal? POISON + @response_queue.enq apply_func(obj, i) + end + end + + def apply_func(obj, i) + @func.call(obj, i) + rescue Exception => e + WrappedException.new(e) + end + + # Stop the worker threads by sending a poison object down the request queue + # so as worker threads after retrieving it, shut themselves down + def stop_threads + @threads.each { @request_queue.enq POISON } + @threads.each { |thread| thread.join } + end + + def abort_threads + @threads.each {|i| i.exit } + exit 1 + end + + end +end |