summaryrefslogtreecommitdiff
path: root/qpid/ruby/lib
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2009-03-27 20:46:24 +0000
committerTed Ross <tross@apache.org>2009-03-27 20:46:24 +0000
commitd66d576676f8fcc0ee3fc11fb34322f499c43ca8 (patch)
treee8dc1950a16336c085cc128176e9e15d7a8a91ac /qpid/ruby/lib
parentc44a4323bad9d2774e8d26049fd36c02441baede (diff)
downloadqpid-python-d66d576676f8fcc0ee3fc11fb34322f499c43ca8.tar.gz
QPID-1702 QPID-1706
Updated qmf console in Python and Ruby - Added support for asynchronous method invocation - Added option to override timeout for method request and get request - Added exception handler in delegates.rb to catch Sasl errors - Added tests for the async method features git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@759341 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/ruby/lib')
-rw-r--r--qpid/ruby/lib/qpid/delegates.rb14
-rw-r--r--qpid/ruby/lib/qpid/qmf.rb61
2 files changed, 58 insertions, 17 deletions
diff --git a/qpid/ruby/lib/qpid/delegates.rb b/qpid/ruby/lib/qpid/delegates.rb
index 171f310e48..8d866e895f 100644
--- a/qpid/ruby/lib/qpid/delegates.rb
+++ b/qpid/ruby/lib/qpid/delegates.rb
@@ -200,10 +200,16 @@ module Qpid
start.mechanisms.each do |m|
mech_list += m + " "
end
- resp = Sasl.client_start(@saslConn, mech_list)
- ch.connection_start_ok(:client_properties => PROPERTIES,
- :mechanism => resp[2],
- :response => resp[1])
+ begin
+ resp = Sasl.client_start(@saslConn, mech_list)
+ ch.connection_start_ok(:client_properties => PROPERTIES,
+ :mechanism => resp[2],
+ :response => resp[1])
+ rescue exception
+ ch.connection_close(:message => $!.message)
+ @connection.failed = true
+ @connection.signal
+ end
end
def connection_secure(ch, secure)
diff --git a/qpid/ruby/lib/qpid/qmf.rb b/qpid/ruby/lib/qpid/qmf.rb
index ee165305c3..ebca7ee5ab 100644
--- a/qpid/ruby/lib/qpid/qmf.rb
+++ b/qpid/ruby/lib/qpid/qmf.rb
@@ -58,9 +58,14 @@ module Qpid::Qmf
# Invoked when an event is raised
def event(broker, event); end
+ # Invoked when an agent heartbeat is received.
def heartbeat(agent, timestamp); end
+ # Invoked when the connection sequence reaches the point where broker information is available.
def broker_info(broker); end
+
+ # Invoked when a method response from an asynchronous method call is received.
+ def method_response(broker, seq, response); end
end
class BrokerURL
@@ -105,7 +110,7 @@ module Qpid::Qmf
CONTEXT_STARTUP = 2
CONTEXT_MULTIGET = 3
- GET_WAIT_TIME = 60
+ DEFAULT_GET_WAIT_TIME = 60
include MonitorMixin
@@ -305,11 +310,17 @@ module Qpid::Qmf
# Otherwise, the query will go to all agents.
#
# :agent = <agent> - supply an agent from the list returned by getAgents.
+ #
# If the get query is to be restricted to one broker (as opposed to
# all connected brokers), add the following argument:
#
# :broker = <broker> - supply a broker as returned by addBroker.
#
+ # The default timeout for this synchronous operation is 60 seconds. To change the timeout,
+ # use the following argument:
+ #
+ # :_timeout = <time in seconds>
+ #
# If additional arguments are supplied, they are used as property
# selectors, as long as their keys are strings. For example, if
# the argument "name" => "test" is supplied, only objects whose
@@ -389,9 +400,13 @@ module Qpid::Qmf
end
timeout = false
+ if kwargs.include?(:_timeout)
+ wait_time = kwargs[:_timeout]
+ else
+ wait_time = DEFAULT_GET_WAIT_TIME
+ end
synchronize do
- unless @cv.wait_for(GET_WAIT_TIME) {
- @sync_sequence_list.empty? || @error }
+ unless @cv.wait_for(wait_time) { @sync_sequence_list.empty? || @error }
@sync_sequence_list.each do |pending_seq|
@seq_mgr.release(pending_seq)
end
@@ -504,10 +519,11 @@ module Qpid::Qmf
def handle_method_resp(broker, codec, seq)
code = codec.read_uint32
-
text = codec.read_str16
out_args = {}
- method, synchronous = @seq_mgr.release(seq)
+ pair = @seq_mgr.release(seq)
+ return unless pair
+ method, synchronous = pair
if code == 0
method.arguments.each do |arg|
if arg.dir.index(?O)
@@ -1054,7 +1070,7 @@ module Qpid::Qmf
private
- def send_method_request(method, name, args, synchronous = false)
+ def send_method_request(method, name, args, synchronous = false, time_wait = nil)
@schema.methods.each do |schema_method|
if name == schema_method.name
send_codec = Qpid::StringCodec.new(@broker.conn.spec)
@@ -1077,9 +1093,9 @@ module Qpid::Qmf
@session.encode_value(send_codec, actual, formal.type)
end
+ ttl = time_wait ? time_wait * 1000 : nil
smsg = @broker.message(send_codec.encoded,
- "agent.#{object_id.broker_bank}.#{object_id.agent_bank}")
-
+ "agent.#{object_id.broker_bank}.#{object_id.agent_bank}", ttl=ttl)
@broker.sync_start if synchronous
@broker.emit(smsg)
@@ -1089,8 +1105,25 @@ module Qpid::Qmf
end
def invoke(method, name, args)
- if send_method_request(method, name, args, synchronous = true)
- unless @broker.wait_for_sync_done
+ kwargs = args[args.size - 1]
+ sync = true
+ timeout = nil
+
+ if kwargs.class == Hash
+ if kwargs.include?(:_timeout)
+ timeout = kwargs[:_timeout]
+ end
+
+ if kwargs.include?(:_async)
+ sync = !kwargs[:_async]
+ end
+ args.pop
+ end
+
+ seq = send_method_request(method, name, args, synchronous = sync)
+ if seq
+ return seq unless sync
+ unless @broker.wait_for_sync_done(timeout)
@session.seq_mgr.release(seq)
raise "Timed out waiting for method to respond"
end
@@ -1284,9 +1317,10 @@ module Qpid::Qmf
end
end
- def wait_for_sync_done
+ def wait_for_sync_done(timeout=nil)
+ wait_time = timeout ? timeout : SYNC_TIME
synchronize do
- return @cv.wait_for(SYNC_TIME) { ! @sync_in_flight || @error }
+ return @cv.wait_for(wait_time) { ! @sync_in_flight || @error }
end
end
@@ -1309,9 +1343,10 @@ module Qpid::Qmf
codec.write_uint32(seq)
end
- def message(body, routing_key="broker")
+ def message(body, routing_key="broker", ttl=nil)
dp = @amqp_session.delivery_properties
dp.routing_key = routing_key
+ dp.ttl = ttl if ttl
mp = @amqp_session.message_properties
mp.content_type = "x-application/qmf"
mp.reply_to = amqp_session.reply_to("amq.direct", @reply_name)