summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorJamis Buck <jamis@37signals.com>2008-04-09 14:23:06 -0600
committerJamis Buck <jamis@37signals.com>2008-04-09 14:23:06 -0600
commit81d215f0764e473842dccd9b98b98aa08383cc7d (patch)
treed0caedf48ed711119857d91a6d7ac81c323716dc /lib
parentb97e7c775049cd5eac3656af04eab5dc7f42ee92 (diff)
downloadnet-ssh-multi-81d215f0764e473842dccd9b98b98aa08383cc7d.tar.gz
deferred server evaluation via session.use(&block)
Diffstat (limited to 'lib')
-rw-r--r--lib/net/ssh/multi/dynamic_server.rb37
-rw-r--r--lib/net/ssh/multi/server_list.rb61
-rw-r--r--lib/net/ssh/multi/session.rb68
3 files changed, 136 insertions, 30 deletions
diff --git a/lib/net/ssh/multi/dynamic_server.rb b/lib/net/ssh/multi/dynamic_server.rb
new file mode 100644
index 0000000..92af7a5
--- /dev/null
+++ b/lib/net/ssh/multi/dynamic_server.rb
@@ -0,0 +1,37 @@
+require 'net/ssh/multi/server'
+
+module Net; module SSH; module Multi
+
+ class DynamicServer
+ attr_reader :master
+ attr_reader :callback
+ attr_reader :options
+
+ def initialize(master, options, callback)
+ @master, @options, @callback = master, options, callback
+ @servers = nil
+ end
+
+ def [](key)
+ (options[:properties] || {})[key]
+ end
+
+ def each
+ (@servers || []).each { |server| yield server }
+ end
+
+ def evaluate!
+ @servers ||= Array(callback[options]).map do |server|
+ case server
+ when String then Net::SSH::Multi::Server.new(master, server, options)
+ else server
+ end
+ end
+ end
+
+ def to_ary
+ evaluate!
+ end
+ end
+
+end; end; end \ No newline at end of file
diff --git a/lib/net/ssh/multi/server_list.rb b/lib/net/ssh/multi/server_list.rb
new file mode 100644
index 0000000..1f83be3
--- /dev/null
+++ b/lib/net/ssh/multi/server_list.rb
@@ -0,0 +1,61 @@
+require 'net/ssh/multi/server'
+require 'net/ssh/multi/dynamic_server'
+
+module Net; module SSH; module Multi
+
+ class ServerList
+ include Enumerable
+
+ def initialize(list=[])
+ @list = list.uniq
+ end
+
+ def add(server)
+ index = @list.index(server)
+ if index
+ server = @list[index]
+ else
+ @list.push(server)
+ end
+ server
+ end
+
+ def concat(servers)
+ servers.each { |server| add(server) }
+ self
+ end
+
+ def each
+ @list.each do |server|
+ case server
+ when Server then yield server
+ when DynamicServer then server.each { |item| yield item }
+ else raise ArgumentError, "server list contains non-server: #{server.class}"
+ end
+ end
+ self
+ end
+
+ def select
+ subset = @list.select { |i| yield i }
+ ServerList.new(subset)
+ end
+
+ def flatten
+ result = @list.inject([]) do |aggregator, server|
+ case server
+ when Server then aggregator.push(server)
+ when DynamicServer then aggregator.concat(server)
+ else raise ArgumentError, "server list contains non-server: #{server.class}"
+ end
+ end
+
+ result.uniq
+ end
+
+ def to_ary
+ flatten
+ end
+ end
+
+end; end; end \ No newline at end of file
diff --git a/lib/net/ssh/multi/session.rb b/lib/net/ssh/multi/session.rb
index 0f9df0b..103e1c4 100644
--- a/lib/net/ssh/multi/session.rb
+++ b/lib/net/ssh/multi/session.rb
@@ -1,6 +1,8 @@
require 'thread'
require 'net/ssh/gateway'
require 'net/ssh/multi/server'
+require 'net/ssh/multi/dynamic_server'
+require 'net/ssh/multi/server_list'
require 'net/ssh/multi/channel'
require 'net/ssh/multi/pending_connection'
require 'net/ssh/multi/session_actions'
@@ -45,15 +47,15 @@ module Net; module SSH; module Multi
class Session
include SessionActions
- # The list of Net::SSH::Multi::Server definitions managed by this session.
- attr_reader :servers
+ # The Net::SSH::Multi::ServerList managed by this session.
+ attr_reader :server_list
# The default Net::SSH::Gateway instance to use to connect to the servers.
# If +nil+, no default gateway will be used.
attr_reader :default_gateway
- # The hash of group definitions, mapping each group name to the list of
- # corresponding Net::SSH::Multi::Server definitions.
+ # The hash of group definitions, mapping each group name to a corresponding
+ # Net::SSH::Multi::ServerList.
attr_reader :groups
# The number of allowed concurrent connections. No more than this number
@@ -92,8 +94,8 @@ module Net; module SSH; module Multi
# session.use ...
# end
def initialize(options={})
- @servers = []
- @groups = {}
+ @server_list = ServerList.new
+ @groups = Hash.new { |h,k| h[k] = ServerList.new }
@gateway = nil
@open_groups = []
@connect_threads = []
@@ -112,7 +114,7 @@ module Net; module SSH; module Multi
#
# First, you can use it to associate a group (or array of groups) with a
# server definition (or array of server definitions). The server definitions
- # must already exist in the #servers array (typically by calling #use):
+ # must already exist in the #server_list array (typically by calling #use):
#
# server1 = session.use('host1', 'user1')
# server2 = session.use('host2', 'user2')
@@ -149,7 +151,7 @@ module Net; module SSH; module Multi
else
mapping.each do |key, value|
(open_groups + Array(key)).uniq.each do |grp|
- (groups[grp.to_sym] ||= []).concat(Array(value)).uniq!
+ groups[grp.to_sym].concat(Array(value))
end
end
end
@@ -187,24 +189,29 @@ module Net; module SSH; module Multi
# server instances will be returned.
#
# server1, server2 = session.use "host1", "host2"
- def use(*hosts)
+ def use(*hosts, &block)
options = hosts.last.is_a?(Hash) ? hosts.pop : {}
+ options = { :via => default_gateway }.merge(options)
results = hosts.map do |host|
- server = Server.new(self, host, {:via => default_gateway}.merge(options))
- exists = servers.index(server)
- if exists
- server = servers[exists]
- else
- servers << server
- end
- server
+ server_list.add(Server.new(self, host, options))
+ end
+
+ if block
+ results << server_list.add(DynamicServer.new(self, options, block))
end
group [] => results
results.length > 1 ? results : results.first
end
+ # Essentially an alias for #servers_for without any arguments. This is used
+ # primarily to satistfy the expectations of the Net::SSH::Multi::SessionActions
+ # module.
+ def servers
+ servers_for
+ end
+
# Returns the set of servers that match the given criteria. It can be used
# in any (or all) of three ways.
#
@@ -244,7 +251,7 @@ module Net; module SSH; module Multi
# servers = session.servers_for(:app, :web, :db => { :only => { :primary => true } })
def servers_for(*criteria)
if criteria.empty?
- servers
+ server_list.flatten
else
# normalize the criteria list, so that every entry is a key to a
# criteria hash (possibly empty).
@@ -255,16 +262,17 @@ module Net; module SSH; module Multi
end
end
- list = criteria.inject([]) do |server_list, (group, properties)|
+ list = criteria.inject([]) do |aggregator, (group, properties)|
raise ArgumentError, "the value for any group must be a Hash, but got a #{properties.class} for #{group.inspect}" unless properties.is_a?(Hash)
bad_keys = properties.keys - [:only, :except]
raise ArgumentError, "unknown constraint(s) #{bad_keys.inspect} for #{group.inspect}" unless bad_keys.empty?
- servers = (groups[group] || []).select do |server|
+ servers = groups[group].select do |server|
(properties[:only] || {}).all? { |prop, value| server[prop] == value } &&
!(properties[:except] || {}).any? { |prop, value| server[prop] == value }
end
- server_list.concat(servers)
+
+ aggregator.concat(servers)
end
list.uniq
@@ -310,9 +318,9 @@ module Net; module SSH; module Multi
# gateway connections (e.g., those passed to #use directly) will _not_ be
# closed by this method, and must be managed externally.
def close
- servers.each { |server| server.close_channels }
+ server_list.each { |server| server.close_channels }
loop(0) { busy?(true) }
- servers.each { |server| server.close }
+ server_list.each { |server| server.close }
default_gateway.shutdown! if default_gateway
end
@@ -339,8 +347,8 @@ module Net; module SSH; module Multi
return false unless preprocess(&block)
- readers = servers.map { |s| s.readers }.flatten
- writers = servers.map { |s| s.writers }.flatten
+ readers = server_list.map { |s| s.readers }.flatten
+ writers = server_list.map { |s| s.writers }.flatten
readers, writers, = IO.select(readers, writers, nil, wait)
@@ -356,14 +364,14 @@ module Net; module SSH; module Multi
# This is called as part of the #process method.
def preprocess(&block) #:nodoc:
return false if block && !block[self]
- servers.each { |server| server.preprocess }
+ server_list.each { |server| server.preprocess }
block.nil? || block[self]
end
# Runs the postprocess stage on all servers. Always returns true. This is
# called as part of the #process method.
def postprocess(readers, writers) #:nodoc:
- servers.each { |server| server.postprocess(readers, writers) }
+ server_list.each { |server| server.postprocess(readers, writers) }
true
end
@@ -437,9 +445,9 @@ module Net; module SSH; module Multi
def realize_pending_connections! #:nodoc:
return unless concurrent_connections
- servers.each do |s|
- s.close if !s.busy?(true)
- s.update_session!
+ server_list.each do |server|
+ server.close if !server.busy?(true)
+ server.update_session!
end
@connect_threads.delete_if { |t| !t.alive? }