diff options
Diffstat (limited to 'lib/net/ssh/multi/session.rb')
-rw-r--r-- | lib/net/ssh/multi/session.rb | 108 |
1 files changed, 103 insertions, 5 deletions
diff --git a/lib/net/ssh/multi/session.rb b/lib/net/ssh/multi/session.rb index cffe972..99819e1 100644 --- a/lib/net/ssh/multi/session.rb +++ b/lib/net/ssh/multi/session.rb @@ -1,6 +1,8 @@ +require 'thread' require 'net/ssh/gateway' require 'net/ssh/multi/server' require 'net/ssh/multi/channel' +require 'net/ssh/multi/pending_connection' module Net; module SSH; module Multi # Represents a collection of connections to various servers. It provides an @@ -50,6 +52,13 @@ module Net; module SSH; module Multi # corresponding Net::SSH::Multi::Server definitions. attr_reader :groups + # The number of allowed concurrent connections. No more than this number + # of sessions will be open at any given time. + attr_accessor :concurrent_connections + + # The number of connections that are currently open. + attr_reader :open_connections #:nodoc: + # The list of "open" groups, which will receive subsequent server definitions. # See #use and #group. attr_reader :open_groups #:nodoc: @@ -61,12 +70,30 @@ module Net; module SSH; module Multi # Creates a new Net::SSH::Multi::Session instance. Initially, it contains # no server definitions, no group definitions, and no default gateway. - def initialize + # + # You can set the #concurrent_connections property in the options. Setting + # it to +nil+ (the default) will cause Net::SSH::Multi to ignore any + # concurrent connection limit and allow all defined sessions to be open + # simultaneously. Setting it to an integer will cause Net::SSH::Multi to + # allow no more than that number of concurrently open sessions, opening + # subsequent sessions only when other sessions finish and close. + # + # Net::SSH::Multi.start(:concurrent_connections => 10) do |session| + # session.use ... + # end + def initialize(options={}) @servers = [] @groups = {} @gateway = nil @active_groups = {} @open_groups = [] + @connect_threads = [] + + @open_connections = 0 + @pending_sessions = [] + @session_mutex = Mutex.new + + options.each { |opt, value| send("#{opt}=", value) } end # At its simplest, this associates a named group with a server definition. @@ -144,7 +171,7 @@ module Net; module SSH; module Multi # session.use 'host2', 'user2', :via => nil # session.use 'host3', 'user3', :via => Net::SSH::Gateway.new('gateway.host', 'user') def use(host, user, options={}) - server = Server.new(host, user, {:via => default_gateway}.merge(options)) + server = Server.new(self, host, user, {:via => default_gateway}.merge(options)) exists = servers.index(server) if exists server = servers[exists] @@ -313,6 +340,9 @@ module Net; module SSH; module Multi # whether #process returns +true+ (the block did not return +false+), or # +false+ (the block returned +false+). def process(wait=nil, &block) + realize_pending_connections! + wait = @connect_threads.any? ? 0 : wait + return false unless preprocess(&block) readers = servers.map { |s| s.readers }.flatten @@ -320,7 +350,11 @@ module Net; module SSH; module Multi readers, writers, = IO.select(readers, writers, nil, wait) - return postprocess(readers, writers) + if readers + return postprocess(readers, writers) + else + return true + end end # Sends a global request to all active sessions (see #active_sessions). @@ -365,8 +399,8 @@ module Net; module SSH; module Multi def open_channel(type="session", *extra, &on_confirm) channels = active_sessions.map do |ssh| ssh.open_channel(type, *extra) do |c| - c[:server] = ssh[:server] - c[:host] = ssh[:server].host + c[:server] = c.connection[:server] + c[:host] = c.connection[:server].host on_confirm[c] if on_confirm end end @@ -448,5 +482,69 @@ module Net; module SSH; module Multi servers.each { |server| server.postprocess(readers, writers) } true end + + # Takes the #concurrent_connections property into account, and tries to + # return a new session for the given server. If the concurrent connections + # limit has been reached, then a Net::SSH::Multi::PendingConnection instance + # will be returned instead, which will be realized into an actual session + # as soon as a slot opens up. + # + # If +force+ is true, the concurrent_connections check is skipped and a real + # connection is always returned. + def next_session(server, force=false) #:nodoc: + @session_mutex.synchronize do + if !force && concurrent_connections && concurrent_connections <= open_connections + connection = PendingConnection.new(server) + @pending_sessions << connection + return connection + end + + @open_connections += 1 + end + + begin + server.new_session + rescue Exception => e + @session_mutex.synchronize { @open_connections -= 1 } + raise + end + end + + # Tells the session that the given server has closed its connection. The + # session indicates that a new connection slot is available, which may be + # filled by the next pending connection on the next event loop iteration. + def server_closed(server) #:nodoc: + @session_mutex.synchronize do + unless @pending_sessions.delete(server.session) + @open_connections -= 1 + end + end + end + + # Invoked by the event loop. If there is a concurrent_connections limit in + # effect, this will close any non-busy sessions and try to open as many + # new sessions as it can. It does this in threads, so that existing processing + # can continue. + # + # If there is no concurrent_connections limit in effect, then this method + # does nothing. + def realize_pending_connections! #:nodoc: + return unless concurrent_connections + + servers.each do |s| + s.close if !s.busy?(true) + s.update_session! + end + + @connect_threads.delete_if { |t| !t.alive? } + + count = concurrent_connections ? (concurrent_connections - open_connections) : @pending_sessions.length + count.times do + session = @pending_sessions.pop or break + @connect_threads << Thread.new do + session.replace_with(next_session(session.server, true)) + end + end + end end end; end; end
\ No newline at end of file |