diff options
Diffstat (limited to 'cpp/bindings/qpid/ruby/examples/drain.rb')
-rw-r--r-- | cpp/bindings/qpid/ruby/examples/drain.rb | 111 |
1 files changed, 111 insertions, 0 deletions
diff --git a/cpp/bindings/qpid/ruby/examples/drain.rb b/cpp/bindings/qpid/ruby/examples/drain.rb new file mode 100644 index 0000000000..a6cf35e189 --- /dev/null +++ b/cpp/bindings/qpid/ruby/examples/drain.rb @@ -0,0 +1,111 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +$:.unshift File.join(File.dirname(__FILE__), "..", "lib") + +require 'qpid' +require 'optparse' + +options = { + :broker => "localhost", + :timeout => Qpid::Messaging::Duration::IMMEDIATE, + :count => 1, + :forever => false, + :connection_options => "" +} + +opts = OptionParser.new do |opts| + opts.banner = "Usage: drain.rb [OPTIONS] ADDRESS" + + opts.separator "" + opts.separator "Drains messages from the specified address" + opts.separator "" + + opts.on("-h", "--help", + "show this message") do + puts opts + exit + end + + opts.on("-b", "--broker VALUE", + "url of broker to connect to") do |broker| + options[:broker] = broker + end + + opts.on("-t", "--timeout VALUE", Integer, + "timeout in seconds to wait before exiting") do |timeout| + options[:timeout] = Qpid::Messaging::Duration.new timeout * 1000 + end + + opts.on("-f", "--forever", + "ignore timeout and wait forever") do + options[:forever] = true + end + + opts.on("--connection-options VALUE", + "connection options string in the form {name1:value,name2:value2}") do |conopts| + options[:connection_options] = conopts + end + + opts.on("-c", "--count VALUE", Integer, + "number of messages to read before exiting") do |count| + options[:count] = count + end +end + +opts.parse!(ARGV) + +options[:address] = ARGV[0] || "" + +connection = Qpid::Messaging::Connection.new options[:broker], options[:connection_options] +connection.open + +def render_map map + print "{" + map.keys.sort.each_with_index {|key,index| print "#{index > 0 ? ', ' : ''}#{key}:#{map[key]}"} + print "}" +end + +begin + session = connection.create_session + receiver = session.create_receiver options[:address] + done = false + count = 0 + options[:timeout] = Qpid::Messaging::Duration::FOREVER if options[:forever] + + while !done && (count < options[:count]) + message = receiver.fetch(options[:timeout]) + print "Message(properties=" + render_map message.properties + print ", content=" + if message.content_type == "amqp/map" + print "'#{render_map message.content}')" + else + print "'#{message.content}'" + end + print ")\n" + session.acknowledge message + count += 1 + end +rescue Exception => error + puts "Exception: #{error.to_s}" +end + +connection.close + |