summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorJamis Buck <jamis@37signals.com>2008-03-29 13:13:16 -0600
committerJamis Buck <jamis@37signals.com>2008-03-29 13:13:16 -0600
commit606a0a2092c2b605d03722efb14cb9039bb8dcb8 (patch)
tree281882567edb45c82d2b6b463f3c08b6a3941c62 /lib
parentd911a09547df9ec3d2d39b4eebdc5f79a2091070 (diff)
downloadnet-ssh-multi-606a0a2092c2b605d03722efb14cb9039bb8dcb8.tar.gz
push server management logic into its own class
Diffstat (limited to 'lib')
-rw-r--r--lib/net/ssh/multi/server.rb92
-rw-r--r--lib/net/ssh/multi/session.rb189
2 files changed, 145 insertions, 136 deletions
diff --git a/lib/net/ssh/multi/server.rb b/lib/net/ssh/multi/server.rb
new file mode 100644
index 0000000..e6f337b
--- /dev/null
+++ b/lib/net/ssh/multi/server.rb
@@ -0,0 +1,92 @@
+require 'net/ssh'
+
+module Net; module SSH; module Multi
+ class Server
+ attr_reader :host
+ attr_reader :user
+ attr_reader :options
+ attr_reader :gateway
+
+ def initialize(host, user, options={})
+ @host = host
+ @user = user
+ @options = options.dup
+ @gateway = @options.delete(:via)
+ end
+
+ def port
+ options[:port] || 22
+ end
+
+ def eql?(server)
+ host == server.host &&
+ user == server.user &&
+ port == server.port
+ end
+
+ alias :== :eql?
+
+ def hash
+ @hash ||= [host, user, port].hash
+ end
+
+ def to_s
+ @to_s ||= begin
+ s = "#{user}@#{host}"
+ s << ":#{options[:port]}" if options[:port]
+ s
+ end
+ end
+
+ def session(ensure_open=false)
+ return @session if @session || !ensure_open
+ @session ||= begin
+ session = if gateway
+ gateway.ssh(host, user, options)
+ else
+ Net::SSH.start(host, user, options)
+ end
+
+ session[:server] = self
+ session
+ end
+ rescue Net::SSH::AuthenticationFailed => error
+ raise Net::SSH::AuthenticationFailed.new("#{error.message}@#{host}")
+ end
+
+ def close_channels
+ session.channels.each { |id, channel| channel.close } if session
+ end
+
+ def close
+ session.transport.close if session
+ end
+
+ def busy?(include_invisible=false)
+ session && session.busy?(include_invisible)
+ end
+
+ def preprocess(&block)
+ return true unless session
+ session.preprocess(&block)
+ end
+
+ def readers
+ return [] unless session
+ session.listeners.keys
+ end
+
+ def writers
+ return [] unless session
+ session.listeners.keys.select do |io|
+ io.respond_to?(:pending_write?) && io.pending_write?
+ end
+ end
+
+ def postprocess(readers, writers)
+ return true unless session
+ listeners = session.listeners.keys
+ session.postprocess(listeners & readers, listeners & writers)
+ end
+ end
+end; end; end \ No newline at end of file
diff --git a/lib/net/ssh/multi/session.rb b/lib/net/ssh/multi/session.rb
index 497d391..efcd7fe 100644
--- a/lib/net/ssh/multi/session.rb
+++ b/lib/net/ssh/multi/session.rb
@@ -1,16 +1,16 @@
require 'thread'
-require 'net/ssh'
require 'net/ssh/gateway'
+require 'net/ssh/multi/server'
require 'net/ssh/multi/channel'
module Net; module SSH; module Multi
class Session
- attr_reader :connections
- attr_reader :gateway
+ attr_reader :servers
+ attr_reader :default_gateway
attr_reader :groups
def initialize
- @connections = []
+ @servers = []
@groups = {}
@gateway = nil
@connections_mutex = Mutex.new
@@ -21,58 +21,37 @@ module Net; module SSH; module Multi
def group(*args)
mapping = args.last.is_a?(Hash) ? args.pop : {}
- begin
- saved_groups = active_groups.dup
- active_groups.concat(args.map { |a| a.to_sym }).uniq!
-
+ if mapping.any? && block_given?
+ raise ArgumentError, "must provide group mapping OR block, not both"
+ elsif block_given?
+ begin
+ saved_groups = active_groups.dup
+ active_groups.concat(args.map { |a| a.to_sym }).uniq!
+ yield self if block_given?
+ ensure
+ active_groups.replace(saved_groups)
+ end
+ else
mapping.each do |key, value|
(active_groups + Array(key)).uniq.each do |grp|
(groups[grp.to_sym] ||= []).concat(Array(value))
end
end
-
- yield self if block_given?
- ensure
- active_groups.replace(saved_groups)
- end
- end
-
- def via(*args)
- if connection_specification?(args)
- @gateway = Net::SSH::Gateway.new(*args)
- elsif args.length == 1
- @gateway = args.first
- else
- raise ArgumentError, "expected either a connection specification or a Net::SSH::Gateway instance"
end
- self
end
- def use(*list)
- connections.concat(list.each { |c| c[:host] = c.host })
- group(active_groups => list)
+ def via(host, user, options={})
+ @default_gateway = Net::SSH::Gateway.new(host, user, options)
self
end
- def connect(*args)
- if connection_specification?(args)
- establish_connection(*args)
- elsif args.any?
- raise ArgumentError, "expected either a connection specification or a block"
+ def use(host, user, options={})
+ server = Server.new(host, user, {:via => default_gateway}.merge(options))
+ unless servers.include?(server)
+ servers << server
+ group [] => server
end
-
- if block_given?
- collector = Collector.new
- yield collector
-
- threads = collector.specifications.map do |spec|
- Thread.new { establish_connection(spec.host, spec.user, spec.options, spec.groups) }
- end
-
- threads.each { |t| t.join }
- end
-
- self
+ server
end
def with(*groups)
@@ -83,23 +62,30 @@ module Net; module SSH; module Multi
active_groups.replace(saved_groups)
end
- def active_connections
- if active_groups.empty?
- connections
+ def active_sessions
+ list = if active_groups.empty?
+ servers
else
active_groups.map { |group| groups[group] }.flatten.uniq
end
+
+ sessions_for(list)
+ end
+
+ def connect!
+ active_sessions
+ self
end
def close
- connections.each { |connection| connection.channels.each { |id, channel| channel.close } }
+ servers.each { |server| server.close_channels }
loop(0) { busy?(true) }
- connections.each { |connection| connection.transport.close }
- gateway.shutdown! if gateway
+ servers.each { |server| server.close }
+ default_gateway.shutdown! if default_gateway
end
def busy?(include_invisible=false)
- @connections.any? { |connection| connection.busy?(include_invisible) }
+ servers.any? { |server| server.busy?(include_invisible) }
end
alias :loop_forever :loop
@@ -110,40 +96,26 @@ module Net; module SSH; module Multi
end
def process(wait=nil, &block)
- @connections.each { |c| return false unless c.preprocess(&block) }
-
- writers_by_connection, readers_by_connection = {}, {}
-
- writers = @connections.map do |c|
- c.listeners.keys.select do |w|
- writers_by_connection[c] ||= []
- writers_by_connection[c] << w
- w.respond_to?(:pending_write?) && w.pending_write?
- end
- end.flatten
+ return false if servers.any? { |server| !server.preprocess(&block) }
- readers = @connections.map { |c| readers_by_connection[c] = c.listeners.keys }.flatten
+ readers = servers.map { |s| s.readers }.flatten
+ writers = servers.map { |s| s.writers }.flatten
- readers, writers = IO.select(readers, writers, nil, wait)
+ readers, writers, = IO.select(readers, writers, nil, wait)
- @connections.each do |c|
- readers_for_this = readers_by_connection[c] & (readers || [])
- writers_for_this = writers_by_connection[c] & (writers || [])
- return false unless c.postprocess(readers_for_this, writers_for_this)
- end
-
- return true
+ return servers.all? { |server| server.postprocess(readers, writers) }
end
def send_global_request(type, *extra, &callback)
- active_connections.each { |connection| connection.send_global_request(type, *extra, &callback) }
+ active_sessions.each { |ssh| ssh.send_global_request(type, *extra, &callback) }
self
end
def open_channel(type="session", *extra, &on_confirm)
- channels = active_connections.map do |connection|
- channel = connection.open_channel(type, *extra, &on_confirm)
- channel[:host] = connection[:host]
+ channels = active_sessions.map do |ssh|
+ channel = ssh.open_channel(type, *extra, &on_confirm)
+ channel[:server] = ssh[:server]
+ channel[:host] = ssh[:server].host
channel
end
Multi::Channel.new(self, channels)
@@ -184,8 +156,8 @@ module Net; module SSH; module Multi
def exec!(command, &block)
block ||= Proc.new do |ch, type, data|
ch[:result] ||= {}
- ch[:result][ch.connection[:host]] ||= ""
- ch[:result][ch.connection[:host]] << data
+ ch[:result][ch[:server]] ||= ""
+ ch[:result][ch[:server]] << data
end
channel = exec(command, &block)
@@ -200,65 +172,10 @@ module Net; module SSH; module Multi
@active_groups
end
- def connection_specification?(args)
- args.length == 2 || (args.length == 3 && args.last.is_a?(Hash))
- end
-
- def establish_connection(host, user, options, groups=[])
- connection = gateway ? gateway.ssh(host, user, options) :
- Net::SSH.start(host, user, options)
- connection[:host] = host
- @connections_mutex.synchronize { connections.push(connection) }
- @groups_mutex.synchronize { group((active_groups + groups).uniq => connection) }
- return connection
- rescue Net::SSH::AuthenticationFailed => error
- error.message << "@#{host}"
- raise
- end
-
- class Collector
- class Specification
- attr_reader :host, :user, :options
- attr_reader :groups
-
- def initialize(host, user, options, groups)
- @host, @user, @options = host, user, options.dup
- @groups = groups.dup
- end
- end
-
- attr_reader :specifications
-
- def initialize
- @specifications = []
- @active_groups = []
- end
-
- def to(host, user, options={})
- @specifications << Specification.new(host, user, options, @active_groups)
- @specifications.length - 1
- end
-
- def group(*args)
- mapping = args.last.is_a?(Hash) ? args.pop : {}
-
- begin
- saved_groups = @active_groups.dup
- @active_groups.concat(args.map { |a| a.to_sym }).uniq!
-
- mapping.each do |key, value|
- groups = (Array(key).map { |v| v.to_sym } + @active_groups).uniq
-
- Array(value).each do |id|
- @specifications[id].groups.concat(groups).uniq!
- end
- end
-
- yield self if block_given?
- ensure
- @active_groups.replace(saved_groups)
- end
- end
+ def sessions_for(servers)
+ threads = servers.map { |server| Thread.new { server.session(true) } }
+ threads.each { |thread| thread.join }
+ servers.map { |server| server.session }
end
end
end; end; end \ No newline at end of file