diff options
| author | Ted Ross <tross@apache.org> | 2009-03-27 20:46:24 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2009-03-27 20:46:24 +0000 |
| commit | d66d576676f8fcc0ee3fc11fb34322f499c43ca8 (patch) | |
| tree | e8dc1950a16336c085cc128176e9e15d7a8a91ac /qpid/ruby/lib | |
| parent | c44a4323bad9d2774e8d26049fd36c02441baede (diff) | |
| download | qpid-python-d66d576676f8fcc0ee3fc11fb34322f499c43ca8.tar.gz | |
QPID-1702 QPID-1706
Updated qmf console in Python and Ruby
- Added support for asynchronous method invocation
- Added option to override timeout for method request and get request
- Added exception handler in delegates.rb to catch Sasl errors
- Added tests for the async method features
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@759341 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/ruby/lib')
| -rw-r--r-- | qpid/ruby/lib/qpid/delegates.rb | 14 | ||||
| -rw-r--r-- | qpid/ruby/lib/qpid/qmf.rb | 61 |
2 files changed, 58 insertions, 17 deletions
diff --git a/qpid/ruby/lib/qpid/delegates.rb b/qpid/ruby/lib/qpid/delegates.rb index 171f310e48..8d866e895f 100644 --- a/qpid/ruby/lib/qpid/delegates.rb +++ b/qpid/ruby/lib/qpid/delegates.rb @@ -200,10 +200,16 @@ module Qpid start.mechanisms.each do |m| mech_list += m + " " end - resp = Sasl.client_start(@saslConn, mech_list) - ch.connection_start_ok(:client_properties => PROPERTIES, - :mechanism => resp[2], - :response => resp[1]) + begin + resp = Sasl.client_start(@saslConn, mech_list) + ch.connection_start_ok(:client_properties => PROPERTIES, + :mechanism => resp[2], + :response => resp[1]) + rescue exception + ch.connection_close(:message => $!.message) + @connection.failed = true + @connection.signal + end end def connection_secure(ch, secure) diff --git a/qpid/ruby/lib/qpid/qmf.rb b/qpid/ruby/lib/qpid/qmf.rb index ee165305c3..ebca7ee5ab 100644 --- a/qpid/ruby/lib/qpid/qmf.rb +++ b/qpid/ruby/lib/qpid/qmf.rb @@ -58,9 +58,14 @@ module Qpid::Qmf # Invoked when an event is raised def event(broker, event); end + # Invoked when an agent heartbeat is received. def heartbeat(agent, timestamp); end + # Invoked when the connection sequence reaches the point where broker information is available. def broker_info(broker); end + + # Invoked when a method response from an asynchronous method call is received. + def method_response(broker, seq, response); end end class BrokerURL @@ -105,7 +110,7 @@ module Qpid::Qmf CONTEXT_STARTUP = 2 CONTEXT_MULTIGET = 3 - GET_WAIT_TIME = 60 + DEFAULT_GET_WAIT_TIME = 60 include MonitorMixin @@ -305,11 +310,17 @@ module Qpid::Qmf # Otherwise, the query will go to all agents. # # :agent = <agent> - supply an agent from the list returned by getAgents. + # # If the get query is to be restricted to one broker (as opposed to # all connected brokers), add the following argument: # # :broker = <broker> - supply a broker as returned by addBroker. # + # The default timeout for this synchronous operation is 60 seconds. To change the timeout, + # use the following argument: + # + # :_timeout = <time in seconds> + # # If additional arguments are supplied, they are used as property # selectors, as long as their keys are strings. For example, if # the argument "name" => "test" is supplied, only objects whose @@ -389,9 +400,13 @@ module Qpid::Qmf end timeout = false + if kwargs.include?(:_timeout) + wait_time = kwargs[:_timeout] + else + wait_time = DEFAULT_GET_WAIT_TIME + end synchronize do - unless @cv.wait_for(GET_WAIT_TIME) { - @sync_sequence_list.empty? || @error } + unless @cv.wait_for(wait_time) { @sync_sequence_list.empty? || @error } @sync_sequence_list.each do |pending_seq| @seq_mgr.release(pending_seq) end @@ -504,10 +519,11 @@ module Qpid::Qmf def handle_method_resp(broker, codec, seq) code = codec.read_uint32 - text = codec.read_str16 out_args = {} - method, synchronous = @seq_mgr.release(seq) + pair = @seq_mgr.release(seq) + return unless pair + method, synchronous = pair if code == 0 method.arguments.each do |arg| if arg.dir.index(?O) @@ -1054,7 +1070,7 @@ module Qpid::Qmf private - def send_method_request(method, name, args, synchronous = false) + def send_method_request(method, name, args, synchronous = false, time_wait = nil) @schema.methods.each do |schema_method| if name == schema_method.name send_codec = Qpid::StringCodec.new(@broker.conn.spec) @@ -1077,9 +1093,9 @@ module Qpid::Qmf @session.encode_value(send_codec, actual, formal.type) end + ttl = time_wait ? time_wait * 1000 : nil smsg = @broker.message(send_codec.encoded, - "agent.#{object_id.broker_bank}.#{object_id.agent_bank}") - + "agent.#{object_id.broker_bank}.#{object_id.agent_bank}", ttl=ttl) @broker.sync_start if synchronous @broker.emit(smsg) @@ -1089,8 +1105,25 @@ module Qpid::Qmf end def invoke(method, name, args) - if send_method_request(method, name, args, synchronous = true) - unless @broker.wait_for_sync_done + kwargs = args[args.size - 1] + sync = true + timeout = nil + + if kwargs.class == Hash + if kwargs.include?(:_timeout) + timeout = kwargs[:_timeout] + end + + if kwargs.include?(:_async) + sync = !kwargs[:_async] + end + args.pop + end + + seq = send_method_request(method, name, args, synchronous = sync) + if seq + return seq unless sync + unless @broker.wait_for_sync_done(timeout) @session.seq_mgr.release(seq) raise "Timed out waiting for method to respond" end @@ -1284,9 +1317,10 @@ module Qpid::Qmf end end - def wait_for_sync_done + def wait_for_sync_done(timeout=nil) + wait_time = timeout ? timeout : SYNC_TIME synchronize do - return @cv.wait_for(SYNC_TIME) { ! @sync_in_flight || @error } + return @cv.wait_for(wait_time) { ! @sync_in_flight || @error } end end @@ -1309,9 +1343,10 @@ module Qpid::Qmf codec.write_uint32(seq) end - def message(body, routing_key="broker") + def message(body, routing_key="broker", ttl=nil) dp = @amqp_session.delivery_properties dp.routing_key = routing_key + dp.ttl = ttl if ttl mp = @amqp_session.message_properties mp.content_type = "x-application/qmf" mp.reply_to = amqp_session.reply_to("amq.direct", @reply_name) |
