diff options
Diffstat (limited to 'tools')
-rwxr-xr-x | tools/setup.py | 5 | ||||
-rwxr-xr-x | tools/src/py/qmf-tool | 148 | ||||
-rwxr-xr-x | tools/src/py/qpid-cluster | 9 | ||||
-rwxr-xr-x | tools/src/py/qpid-config | 145 | ||||
-rwxr-xr-x | tools/src/py/qpid-printevents | 28 | ||||
-rwxr-xr-x | tools/src/py/qpid-route | 62 | ||||
-rwxr-xr-x | tools/src/py/qpid-tool | 21 |
7 files changed, 323 insertions, 95 deletions
diff --git a/tools/setup.py b/tools/setup.py index 8811e49682..4231476c90 100755 --- a/tools/setup.py +++ b/tools/setup.py @@ -20,7 +20,7 @@ from distutils.core import setup setup(name="qpid-tools", - version="0.9", + version="0.13", author="Apache Qpid", author_email="dev@qpid.apache.org", scripts=["src/py/qpid-cluster", @@ -30,7 +30,8 @@ setup(name="qpid-tools", "src/py/qpid-queue-stats", "src/py/qpid-route", "src/py/qpid-stat", - "src/py/qpid-tool"], + "src/py/qpid-tool", + "src/py/qmf-tool"], url="http://qpid.apache.org/", license="Apache Software License", description="Diagnostic and management tools for Apache Qpid brokers.") diff --git a/tools/src/py/qmf-tool b/tools/src/py/qmf-tool index e366d04709..894dc9cc7d 100755 --- a/tools/src/py/qmf-tool +++ b/tools/src/py/qmf-tool @@ -31,6 +31,100 @@ from qpid.disp import Display import cqpid import qmf2 +class OptsAndArgs(object): + + def __init__(self, argv): + self.argv = argv + self.usage = """qmf-tool [OPTIONS] [<broker-host>[:<port>]]""" + self.option_parser = optparse.OptionParser(usage=self.usage) + self.conn_group = optparse.OptionGroup(self.option_parser, "Connection Options") + self.conn_group.add_option("-u", "--user", action="store", type="string", help="User name for authentication") + self.conn_group.add_option("-p", "--password", action="store", type="string", help="Password for authentication") + self.conn_group.add_option("-t", "--transport", action="store", type="string", help="Transport type (tcp, ssl, rdma)") + self.conn_group.add_option("-m", "--mechanism", action="store", type="string", help="SASL Mechanism for security") + self.conn_group.add_option("-s", "--service", action="store", type="string", default="qpidd", help="SASL Service name") + self.conn_group.add_option("--min-ssf", action="store", type="int", metavar="<n>", help="Minimum acceptable security strength factor") + self.conn_group.add_option("--max-ssf", action="store", type="int", metavar="<n>", help="Maximum acceptable security strength factor") + self.conn_group.add_option("--conn-option", action="append", default=[], metavar="<NAME=VALUE>", help="Additional connection option(s)") + self.option_parser.add_option_group(self.conn_group) + + self.qmf_group = optparse.OptionGroup(self.option_parser, "QMF Session Options") + self.qmf_group.add_option("--domain", action="store", type="string", help="QMF Domain") + self.qmf_group.add_option("--agent-age", action="store", type="int", metavar="<n>", help="Time, in minutes, to age out non-communicating agents") + self.qmf_group.add_option("--qmf-option", action="append", default=[], metavar="<NAME=VALUE>", help="Additional QMF session option(s)") + self.option_parser.add_option_group(self.qmf_group) + + def parse(self): + host = "localhost" + conn_options = [] + qmf_options = [] + + options, encArgs = self.option_parser.parse_args(args=self.argv) + try: + encoding = locale.getpreferredencoding() + args = [a.decode(encoding) for a in encArgs] + except: + args = encArgs + + if len(args) > 1: + host = args[1] + + if options.user: + conn_options.append("username:'%s'" % options.user) + if options.password: + conn_options.append("password:'%s'" % options.password) + if options.transport: + conn_options.append("transport:'%s'" % options.transport) + if options.mechanism: + conn_options.append("sasl_mechanisms:'%s'" % options.mechanism) + if options.service: + conn_options.append("sasl_service:'%s'" % options.service) + if options.min_ssf: + conn_options.append("sasl_min_ssf:%d" % options.min_ssf) + if options.max_ssf: + conn_options.append("sasl_max_ssf:%d" % options.max_ssf) + for x in options.conn_option: + try: + key, val = x.split('=') + conn_options.append("%s:%s" % (key, val)) + except: + raise Exception("Improperly formatted text for --conn-option: '%s'" % x) + + if options.domain: + qmf_options.append("domain:'%s'" % options.domain) + if options.agent_age: + qmf_options.append("max-agent-age:%d" % options.agent_age) + for x in options.qmf_option: + try: + key, val = x.split('=') + qmf_options.append("%s:%s" % (key, val)) + except: + raise Exception("Improperly formatted text for --qmf-option: '%s'" % x) + + conn_string = '{' + first = True + for x in conn_options: + if first: + first = None + else: + conn_string += ',' + conn_string += x + conn_string += '}' + + qmf_string = '{' + first = True + for x in qmf_options: + if first: + first = None + else: + qmf_string += ',' + qmf_string += x + qmf_string += '}' + + return host, conn_string, qmf_string + + + class Mcli(Cmd): """ Management Command Interpreter """ @@ -55,10 +149,11 @@ class Mcli(Cmd): print print "Agent Commands:" print " set filter <filter-string> - Filter the list of agents" - print " show filter - Show the agent filter currently in effect" print " list agents - Print a list of the known Agents" - print " show agent <item-number> - Print detailed information about an Agent" print " set default <item-number> - Set the default agent for operations" + print " show filter - Show the agent filter currently in effect" + print " show agent <item-number> - Print detailed information about an Agent" + print " show options - Show option strings used in the QMF session" print print "Schema Commands:" print " list packages - Print a list of packages supported by the default agent" @@ -112,7 +207,7 @@ class Mcli(Cmd): def complete_show(self, text, line, begidx, endidx): tokens = split(line[:begidx]) if len(tokens) == 1: - return [i for i in ('filter', 'agent ', 'class ') if i.startswith(text)] + return [i for i in ('options', 'filter', 'agent ', 'class ') if i.startswith(text)] return [] def do_show(self, data): @@ -175,13 +270,15 @@ class Mcli(Cmd): class QmfData: """ """ - def __init__(self, disp, url): + def __init__(self, disp, url, conn_options, qmf_options): self.disp = disp self.url = url + self.conn_options = conn_options + self.qmf_options = qmf_options self.agent_filter = '[]' - self.connection = cqpid.Connection(self.url) + self.connection = cqpid.Connection(self.url, self.conn_options) self.connection.open() - self.session = qmf2.ConsoleSession(self.connection) + self.session = qmf2.ConsoleSession(self.connection, self.qmf_options) self.session.setAgentFilter(self.agent_filter) self.session.open() self.lock = Lock() @@ -239,6 +336,12 @@ class QmfData: print "What do you want to show? Type 'help' for more information." return + if tokens[0] == 'options': + print "Options used in this session:" + print " Connection Options : %s" % self.scrubConnOptions() + print " QMF Session Options: %s" % self.qmf_options + return + if tokens[0] == 'agent': self.showAgent(tokens[1:]) return @@ -636,32 +739,33 @@ class QmfData: first = None return result -def Usage(): - print "Usage: qpid-tool [[<username>/<password>@]<target-host>[:<tcp-port>]]" - print + def scrubConnOptions(self): + pw = self.conn_options.find('password:') + if pw < 0: + return self.conn_options + scrubbed = self.conn_options[:pw + 9] + "***" + delim = self.conn_options[pw:].find(',') + if delim < 0: + delim = self.conn_options[pw:].find('}') + scrubbed += self.conn_options[pw + delim:] + return scrubbed + #========================================================= # Main Program #========================================================= - -# Get host name and port if specified on the command line -cargs = sys.argv[1:] -_host = "localhost" - -if len(cargs) > 0: - _host = cargs[0] - -if _host[0] == '-': - Usage() - if _host != '-h' and _host != "--help": - print "qpid-tool: error: no such option:", _host +try: + oa = OptsAndArgs(sys.argv) + host, conn_options, qmf_options = oa.parse() +except Exception, e: + print "Parse Error: %s" % e sys.exit(1) disp = Display() # Attempt to make a connection to the target broker try: - data = QmfData(disp, _host) + data = QmfData(disp, host, conn_options, qmf_options) except Exception, e: if str(e).find("Exchange not found") != -1: print "Management not enabled on broker: Use '-m yes' option on broker startup." diff --git a/tools/src/py/qpid-cluster b/tools/src/py/qpid-cluster index 312d59f670..d4f9391dcf 100755 --- a/tools/src/py/qpid-cluster +++ b/tools/src/py/qpid-cluster @@ -247,7 +247,7 @@ def main(argv=None): parser.add_option("-k", "--all-stop", action="store_true", default=False, help="Shut down the whole cluster") parser.add_option("-f", "--force", action="store_true", default=False, help="Suppress the 'are you sure' prompt") parser.add_option("-n", "--numeric", action="store_true", default=False, help="Don't resolve names") - + opts, args = parser.parse_args(args=argv) if args: @@ -268,12 +268,12 @@ def main(argv=None): if opts.del_connection: config._delConn = opts.del_connection - if len(config._delConn.split(":")) != 2: + if len(config._delConn.split(":")) != 2: parser.error("Member ID must be of form: <host or ip>:<number>") if opts.stop: - config._stopID = opts.stop - if len(config._stopId.split(":")) != 2: + config._stopId = opts.stop + if len(config._stopId.split(":")) != 2: parser.error("Member ID must be of form: <host or ip>:<number>") config._stopAll = opts.all_stop @@ -303,6 +303,7 @@ def main(argv=None): bm.Disconnect() except Exception, e: + raise print str(e) return 1 diff --git a/tools/src/py/qpid-config b/tools/src/py/qpid-config index 04b31e98ed..bb49b9d7c9 100755 --- a/tools/src/py/qpid-config +++ b/tools/src/py/qpid-config @@ -39,17 +39,12 @@ Usage: qpid-config [OPTIONS] qpid-config [OPTIONS] unbind <exchange-name> <queue-name> [binding-key]""" description = """ -ADDRESS syntax: - - [username/password@] hostname - ip-address [:<port>] - Examples: $ qpid-config add queue q -$ qpid-config add exchange direct d localhost:5672 -$ qpid-config exchanges 10.1.1.7:10000 -$ qpid-config queues guest/guest@broker-host:10000 +$ qpid-config add exchange direct d -a localhost:5672 +$ qpid-config exchanges -a 10.1.1.7:10000 +$ qpid-config queues -a guest/guest@broker-host:10000 Add Exchange <type> values: @@ -80,6 +75,7 @@ class Config: self._recursive = False self._host = "localhost" self._connTimeout = 10 + self._ignoreDefault = False self._altern_ex = None self._passive = False self._durable = False @@ -97,6 +93,14 @@ class Config: self._eventGeneration = None self._file = None self._sasl_mechanism = None + self._flowStopCount = None + self._flowResumeCount = None + self._flowStopSize = None + self._flowResumeSize = None + self._msgGroupHeader = None + self._sharedMsgGroup = False + self._extra_arguments = [] + self._returnCode = 0 config = Config() @@ -111,6 +115,20 @@ LVQNB = "qpid.last_value_queue_no_browse" MSG_SEQUENCE = "qpid.msg_sequence" IVE = "qpid.ive" QUEUE_EVENT_GENERATION = "qpid.queue_event_generation" +FLOW_STOP_COUNT = "qpid.flow_stop_count" +FLOW_RESUME_COUNT = "qpid.flow_resume_count" +FLOW_STOP_SIZE = "qpid.flow_stop_size" +FLOW_RESUME_SIZE = "qpid.flow_resume_size" +MSG_GROUP_HDR_KEY = "qpid.group_header_key" +SHARED_MSG_GROUP = "qpid.shared_msg_group" +#There are various arguments to declare that have specific program +#options in this utility. However there is now a generic mechanism for +#passing arguments as well. The SPECIAL_ARGS list contains the +#arguments for which there are specific program options defined +#i.e. the arguments for which there is special processing on add and +#list +SPECIAL_ARGS=[FILECOUNT,FILESIZE,MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE,CLUSTER_DURABLE,LVQ,LVQNB,MSG_SEQUENCE,IVE,QUEUE_EVENT_GENERATION,FLOW_STOP_COUNT,FLOW_STOP_SIZE,FLOW_RESUME_SIZE, + MSG_GROUP_HDR_KEY,SHARED_MSG_GROUP] class JHelpFormatter(IndentedHelpFormatter): """Format usage and description without stripping newlines from usage strings @@ -143,10 +161,14 @@ def OptionsAndArguments(argv): group1 = OptionGroup(parser, "General Options") group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="<secs>", help="Maximum time to wait for broker connection (in seconds)") group1.add_option("-b", "--bindings", action="store_true", help="Show bindings in queue or exchange list") - group1.add_option("-a", "--broker-addr", action="store", type="string", default="localhost:5672", metavar="<address>", help="Maximum time to wait for broker connection (in seconds)") + group1.add_option("-a", "--broker-addr", action="store", type="string", default="localhost:5672", metavar="<address>", help="Address of qpidd broker with syntax: [username/password@] hostname | ip-address [:<port>]") group1.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.") parser.add_option_group(group1) + group_ls = OptionGroup(parser, "Options for Listing Exchanges and Queues") + group_ls.add_option("--ignore-default", action="store_true", help="Ignore the default exchange in exchange or queue list") + parser.add_option_group(group_ls) + group2 = OptionGroup(parser, "Options for Adding Exchanges and Queues") group2.add_option("--alternate-exchange", action="store", type="string", metavar="<aexname>", help="Name of the alternate-exchange for the new queue or exchange. Exchanges route messages to the alternate exchange if they are unable to route them elsewhere. Queues route messages to the alternate exchange if they are rejected by a subscriber or orphaned by queue deletion.") group2.add_option("--passive", "--dry-run", action="store_true", help="Do not actually add the exchange or queue, ensure that all parameters and permissions are correct and would allow it to be created.") @@ -156,12 +178,26 @@ def OptionsAndArguments(argv): group3 = OptionGroup(parser, "Options for Adding Queues") group3.add_option("--cluster-durable", action="store_true", help="The new queue becomes durable if there is only one functioning cluster node") group3.add_option("--file-count", action="store", type="int", default=8, metavar="<n>", help="Number of files in queue's persistence journal") - group3.add_option("--file-size", action="store", type="int", default=24, metavar="<n>", help="File size in pages (64Kib/page)") - group3.add_option("--max-queue-size", action="store", type="int", metavar="<n>", help="Number of files in queue's persistence journal") - group3.add_option("--max-queue-count", action="store", type="int", metavar="<n>", help="Number of files in queue's persistence journal") + group3.add_option("--file-size", action="store", type="int", default=24, metavar="<n>", help="File size in pages (64KiB/page)") + group3.add_option("--max-queue-size", action="store", type="int", metavar="<n>", help="Maximum in-memory queue size as bytes") + group3.add_option("--max-queue-count", action="store", type="int", metavar="<n>", help="Maximum in-memory queue size as a number of messages") group3.add_option("--limit-policy", action="store", choices=["none", "reject", "flow-to-disk", "ring", "ring-strict"], metavar="<policy>", help="Action to take when queue limit is reached") group3.add_option("--order", action="store", choices=["fifo", "lvq", "lvq-no-browse"], metavar="<ordering>", help="Queue ordering policy") group3.add_option("--generate-queue-events", action="store", type="int", metavar="<n>", help="If set to 1, every enqueue will generate an event that can be processed by registered listeners (e.g. for replication). If set to 2, events will be generated for enqueues and dequeues.") + group3.add_option("--flow-stop-size", action="store", type="int", metavar="<n>", + help="Turn on sender flow control when the number of queued bytes exceeds this value.") + group3.add_option("--flow-resume-size", action="store", type="int", metavar="<n>", + help="Turn off sender flow control when the number of queued bytes drops below this value.") + group3.add_option("--flow-stop-count", action="store", type="int", metavar="<n>", + help="Turn on sender flow control when the number of queued messages exceeds this value.") + group3.add_option("--flow-resume-count", action="store", type="int", metavar="<n>", + help="Turn off sender flow control when the number of queued messages drops below this value.") + group3.add_option("--group-header", action="store", type="string", metavar="<header-name>", + help="Enable message groups. Specify name of header that holds group identifier.") + group3.add_option("--shared-groups", action="store_true", + help="Allow message group consumption across multiple consumers.") + group3.add_option("--argument", dest="extra_arguments", action="append", default=[], + metavar="<NAME=VALUE>", help="Specify a key-value pair to add to queue arguments") # no option for declaring an exclusive queue - which can only be used by the session that creates it. parser.add_option_group(group3) @@ -173,7 +209,7 @@ def OptionsAndArguments(argv): group5 = OptionGroup(parser, "Options for Deleting Queues") group5.add_option("--force", action="store_true", help="Force delete of queue even if it's currently used or it's not empty") group5.add_option("--force-if-not-empty", action="store_true", help="Force delete of queue even if it's not empty") - group5.add_option("--force-if-not-used", action="store_true", help="Force delete of queue even if it's currently used") + group5.add_option("--force-if-used", action="store_true", help="Force delete of queue even if it's currently used") parser.add_option_group(group5) group6 = OptionGroup(parser, "Options for Declaring Bindings") @@ -196,6 +232,8 @@ def OptionsAndArguments(argv): config._connTimeout = opts.timeout if config._connTimeout == 0: config._connTimeout = None + if opts.ignore_default: + config._ignoreDefault = True if opts.alternate_exchange: config._altern_ex = opts.alternate_exchange if opts.passive: @@ -210,7 +248,7 @@ def OptionsAndArguments(argv): config._fileCount = opts.file_count if opts.file_size: config._fileSize = opts.file_size - if opts.max_queue_size: + if opts.max_queue_size != None: config._maxQueueSize = opts.max_queue_size if opts.max_queue_count: config._maxQueueCount = opts.max_queue_count @@ -229,10 +267,24 @@ def OptionsAndArguments(argv): config._if_unused = False if opts.force_if_not_empty: config._if_empty = False - if opts.force_if_not_used: + if opts.force_if_used: config._if_unused = False if opts.sasl_mechanism: config._sasl_mechanism = opts.sasl_mechanism + if opts.flow_stop_size: + config._flowStopSize = opts.flow_stop_size + if opts.flow_resume_size: + config._flowResumeSize = opts.flow_resume_size + if opts.flow_stop_count: + config._flowStopCount = opts.flow_stop_count + if opts.flow_resume_count: + config._flowResumeCount = opts.flow_resume_count + if opts.group_header: + config._msgGroupHeader = opts.group_header + if opts.shared_groups: + config._sharedMsgGroup = True + if opts.extra_arguments: + config._extra_arguments = opts.extra_arguments return args @@ -323,9 +375,16 @@ class BrokerManager: caption1 = "Type " caption2 = "Exchange Name" maxNameLen = len(caption2) + found = False for ex in exchanges: if self.match(ex.name, filter): if len(ex.name) > maxNameLen: maxNameLen = len(ex.name) + found = True + if not found: + global config + config._returnCode = 1 + return + print "%s%-*s Attributes" % (caption1, maxNameLen, caption2) line = "" for i in range(((maxNameLen + len(caption1)) / 5) + 5): @@ -333,9 +392,11 @@ class BrokerManager: print line for ex in exchanges: + if config._ignoreDefault and not ex.name: continue if self.match(ex.name, filter): print "%-10s%-*s " % (ex.type, maxNameLen, ex.name), args = ex.arguments + if not args: args = {} if ex.durable: print "--durable", if MSG_SEQUENCE in args and args[MSG_SEQUENCE] == 1: print "--sequence", if IVE in args and args[IVE] == 1: print "--ive", @@ -348,6 +409,7 @@ class BrokerManager: bindings = self.qmf.getObjects(_class="binding", _agent=self.brokerAgent) queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) for ex in exchanges: + if config._ignoreDefault and not ex.name: continue if self.match(ex.name, filter): print "Exchange '%s' (%s)" % (ex.name, ex.type) for bind in bindings: @@ -361,12 +423,18 @@ class BrokerManager: def QueueList(self, filter): queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) - caption = "Queue Name" maxNameLen = len(caption) + found = False for q in queues: if self.match(q.name, filter): if len(q.name) > maxNameLen: maxNameLen = len(q.name) + found = True + if not found: + global config + config._returnCode = 1 + return + print "%-*s Attributes" % (maxNameLen, caption) line = "" for i in range((maxNameLen / 5) + 5): @@ -377,21 +445,28 @@ class BrokerManager: if self.match(q.name, filter): print "%-*s " % (maxNameLen, q.name), args = q.arguments + if not args: args = {} if q.durable: print "--durable", if CLUSTER_DURABLE in args and args[CLUSTER_DURABLE] == 1: print "--cluster-durable", if q.autoDelete: print "auto-del", if q.exclusive: print "excl", - if FILESIZE in args: print "--file-size=%d" % args[FILESIZE], - if FILECOUNT in args: print "--file-count=%d" % args[FILECOUNT], - if MAX_QUEUE_SIZE in args: print "--max-queue-size=%d" % args[MAX_QUEUE_SIZE], - if MAX_QUEUE_COUNT in args: print "--max-queue-count=%d" % args[MAX_QUEUE_COUNT], + if FILESIZE in args: print "--file-size=%s" % args[FILESIZE], + if FILECOUNT in args: print "--file-count=%s" % args[FILECOUNT], + if MAX_QUEUE_SIZE in args: print "--max-queue-size=%s" % args[MAX_QUEUE_SIZE], + if MAX_QUEUE_COUNT in args: print "--max-queue-count=%s" % args[MAX_QUEUE_COUNT], if POLICY_TYPE in args: print "--limit-policy=%s" % args[POLICY_TYPE].replace("_", "-"), if LVQ in args and args[LVQ] == 1: print "--order lvq", if LVQNB in args and args[LVQNB] == 1: print "--order lvq-no-browse", - if QUEUE_EVENT_GENERATION in args: print "--generate-queue-events=%d" % args[QUEUE_EVENT_GENERATION], + if QUEUE_EVENT_GENERATION in args: print "--generate-queue-events=%s" % args[QUEUE_EVENT_GENERATION], if q.altExchange: print "--alternate-exchange=%s" % q._altExchange_.name, - print + if FLOW_STOP_SIZE in args: print "--flow-stop-size=%s" % args[FLOW_STOP_SIZE], + if FLOW_RESUME_SIZE in args: print "--flow-resume-size=%s" % args[FLOW_RESUME_SIZE], + if FLOW_STOP_COUNT in args: print "--flow-stop-count=%s" % args[FLOW_STOP_COUNT], + if FLOW_RESUME_COUNT in args: print "--flow-resume-count=%s" % args[FLOW_RESUME_COUNT], + if MSG_GROUP_HDR_KEY in args: print "--group-header=%s" % args[MSG_GROUP_HDR_KEY], + if SHARED_MSG_GROUP in args and args[SHARED_MSG_GROUP] == 1: print "--shared-groups", + print " ".join(["--argument %s=%s" % (k, v) for k,v in args.iteritems() if not k in SPECIAL_ARGS]) def QueueListRecurse(self, filter): exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) @@ -407,6 +482,7 @@ class BrokerManager: if ex != None: ename = ex.name if ename == "": + if config._ignoreDefault: continue ename = "''" print " bind [%s] => %s" % (bind.bindingKey, ename) @@ -436,11 +512,17 @@ class BrokerManager: Usage() qname = args[0] declArgs = {} + for a in config._extra_arguments: + r = a.split("=", 1) + if len(r) == 2: value = r[1] + else: value = None + declArgs[r[0]] = value + if config._durable: declArgs[FILECOUNT] = config._fileCount declArgs[FILESIZE] = config._fileSize - if config._maxQueueSize: + if config._maxQueueSize != None: declArgs[MAX_QUEUE_SIZE] = config._maxQueueSize if config._maxQueueCount: declArgs[MAX_QUEUE_COUNT] = config._maxQueueCount @@ -468,11 +550,26 @@ class BrokerManager: if config._eventGeneration: declArgs[QUEUE_EVENT_GENERATION] = config._eventGeneration + if config._flowStopSize: + declArgs[FLOW_STOP_SIZE] = config._flowStopSize + if config._flowResumeSize: + declArgs[FLOW_RESUME_SIZE] = config._flowResumeSize + if config._flowStopCount: + declArgs[FLOW_STOP_COUNT] = config._flowStopCount + if config._flowResumeCount: + declArgs[FLOW_RESUME_COUNT] = config._flowResumeCount + + if config._msgGroupHeader: + declArgs[MSG_GROUP_HDR_KEY] = config._msgGroupHeader + if config._sharedMsgGroup: + declArgs[SHARED_MSG_GROUP] = 1 + if config._altern_ex != None: self.broker.getAmqpSession().queue_declare(queue=qname, alternate_exchange=config._altern_ex, passive=config._passive, durable=config._durable, arguments=declArgs) else: self.broker.getAmqpSession().queue_declare(queue=qname, passive=config._passive, durable=config._durable, arguments=declArgs) + def DelQueue(self, args): if len(args) < 1: Usage() @@ -617,7 +714,7 @@ def main(argv=None): print "Failed: %s: %s" % (e.__class__.__name__, e) return 1 - return 0 + return config._returnCode if __name__ == "__main__": sys.exit(main()) diff --git a/tools/src/py/qpid-printevents b/tools/src/py/qpid-printevents index 2be2f0be8b..d56d2899b1 100755 --- a/tools/src/py/qpid-printevents +++ b/tools/src/py/qpid-printevents @@ -20,7 +20,7 @@ # import os -import optparse +import optparse from optparse import IndentedHelpFormatter import sys import socket @@ -62,11 +62,11 @@ _usage = "%prog [options] [broker-addr]..." _description = \ """ -Collect and print events from one or more Qpid message brokers. +Collect and print events from one or more Qpid message brokers. If no broker-addr is supplied, %prog connects to 'localhost:5672'. -[broker-addr] syntax: +[broker-addr] syntax: [username/password@] hostname ip-address [:<port>] @@ -91,20 +91,20 @@ def main(argv=None): session = Session(console, rcvObjects=False, rcvHeartbeats=options.heartbeats, manageConnections=True) brokers = [] try: - for host in arguments: - brokers.append(session.addBroker(host, None, options.sasl_mechanism)) + try: + for host in arguments: + brokers.append(session.addBroker(host, None, options.sasl_mechanism)) - while (True): - sleep(10) + while (True): + sleep(10) - except KeyboardInterrupt: - print - return 0 - - except Exception, e: - print "Failed: %s - %s" % (e.__class__.__name__, e) - return 1 + except KeyboardInterrupt: + print + return 0 + except Exception, e: + print "Failed: %s - %s" % (e.__class__.__name__, e) + return 1 finally: while len(brokers): b = brokers.pop() diff --git a/tools/src/py/qpid-route b/tools/src/py/qpid-route index 3c4de85d1e..f90416d7b0 100755 --- a/tools/src/py/qpid-route +++ b/tools/src/py/qpid-route @@ -27,18 +27,18 @@ import locale from qmf.console import Session, BrokerURL usage = """ -Usage: qpid-route [OPTIONS] dynamic add <dest-broker> <src-broker> <exchange> [tag] [exclude-list] +Usage: qpid-route [OPTIONS] dynamic add <dest-broker> <src-broker> <exchange> [tag] [exclude-list] [mechanism] qpid-route [OPTIONS] dynamic del <dest-broker> <src-broker> <exchange> qpid-route [OPTIONS] route add <dest-broker> <src-broker> <exchange> <routing-key> [tag] [exclude-list] [mechanism] qpid-route [OPTIONS] route del <dest-broker> <src-broker> <exchange> <routing-key> - qpid-route [OPTIONS] queue add <dest-broker> <src-broker> <exchange> <queue> + qpid-route [OPTIONS] queue add <dest-broker> <src-broker> <exchange> <queue> [mechanism] qpid-route [OPTIONS] queue del <dest-broker> <src-broker> <exchange> <queue> qpid-route [OPTIONS] route list [<dest-broker>] qpid-route [OPTIONS] route flush [<dest-broker>] qpid-route [OPTIONS] route map [<broker>] - qpid-route [OPTIONS] link add <dest-broker> <src-broker> + qpid-route [OPTIONS] link add <dest-broker> <src-broker> [mechanism] qpid-route [OPTIONS] link del <dest-broker> <src-broker> qpid-route [OPTIONS] link list [<dest-broker>]""" @@ -61,7 +61,7 @@ class Config: self._transport = "tcp" self._ack = 0 self._connTimeout = 10 - self._sasl_mechanism = None + self._client_sasl_mechanism = None config = Config() @@ -95,7 +95,7 @@ def OptionsAndArguments(argv): parser.add_option("--ack", action="store", type="int", metavar="<n>", help="Acknowledge transfers over the bridge in batches of N") parser.add_option("-t", "--transport", action="store", type="string", default="tcp", metavar="<transport>", help="Transport to use for links, defaults to tcp") - parser.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). Used when the client connects to the destination broker (not for authentication between the source and destination brokers - that is specified using the [mechanisms] argument to 'add route'). SASL automatically picks the most secure available mechanism - use this option to override.") + parser.add_option("--client-sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). Used when the client connects to the destination broker (not for authentication between the source and destination brokers - that is specified using the [mechanisms] argument to 'add route'). SASL automatically picks the most secure available mechanism - use this option to override.") opts, encArgs = parser.parse_args(args=argv) @@ -131,8 +131,8 @@ def OptionsAndArguments(argv): if opts.ack: config._ack = opts.ack - if opts.sasl_mechanism: - config._sasl_mechanism = opts.sasl_mechanism + if opts.client_sasl_mechanism: + config._client_sasl_mechanism = opts.client_sasl_mechanism return args @@ -143,7 +143,7 @@ class RouteManager: self.local = BrokerURL(localBroker) self.remote = None self.qmf = Session() - self.broker = self.qmf.addBroker(localBroker, config._connTimeout, config._sasl_mechanism) + self.broker = self.qmf.addBroker(localBroker, config._connTimeout, config._client_sasl_mechanism) self.broker._waitForStable() self.agent = self.broker.getBrokerAgent() @@ -166,7 +166,7 @@ class RouteManager: return link return None - def addLink(self, remoteBroker, mech="PLAIN"): + def addLink(self, remoteBroker, interbroker_mechanism=""): self.remote = BrokerURL(remoteBroker) if self.local.match(self.remote.host, self.remote.port): raise Exception("Linking broker to itself is not permitted") @@ -176,7 +176,7 @@ class RouteManager: link = self.getLink() if link == None: res = broker.connect(self.remote.host, self.remote.port, config._durable, - mech, self.remote.authName or "", self.remote.authPass or "", + interbroker_mechanism, self.remote.authName or "", self.remote.authPass or "", config._transport) if config._verbose: print "Connect method returned:", res.status, res.text @@ -217,11 +217,11 @@ class RouteManager: added = False links = self.qmf.getObjects(_class="link") for link in links: - url = BrokerURL("%s:%d" % (link.host, link.port)) + url = BrokerURL(host=link.host, port=link.port) if url.name() not in self.brokerList: print " %s..." % url.name(), try: - b = self.qmf.addBroker("%s:%d" % (link.host, link.port), config._connTimeout) + b = self.qmf.addBroker(url, config._connTimeout) self.brokerList[url.name()] = b added = True print "Ok" @@ -245,7 +245,7 @@ class RouteManager: for bridge in bridges: if bridge.src == ex: link = bridge._linkRef_ - fromUrl = "%s:%s" % (link.host, link.port) + fromUrl = BrokerURL(host=link.host, port=link.port) toUrl = bridge.getBroker().getUrl() found = False for pair in pairs: @@ -295,11 +295,11 @@ class RouteManager: if b[0] != self.local.name(): self.qmf.delBroker(b[1]) - def addRoute(self, remoteBroker, exchange, routingKey, tag, excludes, mech="PLAIN", dynamic=False): + def addRoute(self, remoteBroker, exchange, routingKey, tag, excludes, interbroker_mechanism="", dynamic=False): if dynamic and config._srclocal: raise Exception("--src-local is not permitted on dynamic routes") - self.addLink(remoteBroker, mech) + self.addLink(remoteBroker, interbroker_mechanism) link = self.getLink() if link == None: raise Exception("Link failed to create") @@ -320,8 +320,8 @@ class RouteManager: if config._verbose: print "Bridge method returned:", res.status, res.text - def addQueueRoute(self, remoteBroker, exchange, queue): - self.addLink(remoteBroker) + def addQueueRoute(self, remoteBroker, interbroker_mechanism, exchange, queue ): + self.addLink(remoteBroker, interbroker_mechanism) link = self.getLink() if link == None: raise Exception("Link failed to create") @@ -504,10 +504,12 @@ def main(argv=None): rm = RouteManager(localBroker) if group == "link": if cmd == "add": - if nargs != 4: + if nargs < 3 or nargs > 5: Usage() return(-1) - rm.addLink(remoteBroker) + interbroker_mechanism = "" + if nargs > 4: interbroker_mechanism = args[4] + rm.addLink(remoteBroker, interbroker_mechanism) elif cmd == "del": if nargs != 4: Usage() @@ -518,16 +520,17 @@ def main(argv=None): elif group == "dynamic": if cmd == "add": - if nargs < 5 or nargs > 7: + if nargs < 5 or nargs > 8: Usage() return(-1) tag = "" excludes = "" - mech = "PLAIN" + interbroker_mechanism = "" if nargs > 5: tag = args[5] if nargs > 6: excludes = args[6] - rm.addRoute(remoteBroker, args[4], "", tag, excludes, mech, dynamic=True) + if nargs > 7: interbroker_mechanism = args[7] + rm.addRoute(remoteBroker, args[4], "", tag, excludes, interbroker_mechanism, dynamic=True) elif cmd == "del": if nargs != 5: Usage() @@ -543,11 +546,11 @@ def main(argv=None): tag = "" excludes = "" - mech = "PLAIN" + interbroker_mechanism = "" if nargs > 6: tag = args[6] if nargs > 7: excludes = args[7] - if nargs > 8: mech = args[8] - rm.addRoute(remoteBroker, args[4], args[5], tag, excludes, mech, dynamic=False) + if nargs > 8: interbroker_mechanism = args[8] + rm.addRoute(remoteBroker, args[4], args[5], tag, excludes, interbroker_mechanism, dynamic=False) elif cmd == "del": if nargs != 6: Usage() @@ -565,16 +568,21 @@ def main(argv=None): return(-1) elif group == "queue": - if nargs != 6: + if nargs < 6 or nargs > 7: Usage() return(-1) if cmd == "add": - rm.addQueueRoute(remoteBroker, exchange=args[4], queue=args[5]) + interbroker_mechanism = "" + if nargs > 6: interbroker_mechanism = args[6] + rm.addQueueRoute(remoteBroker, interbroker_mechanism, exchange=args[4], queue=args[5] ) elif cmd == "del": rm.delQueueRoute(remoteBroker, exchange=args[4], queue=args[5]) else: Usage() return(-1) + else: + Usage() + return(-1) except Exception,e: if rm: diff --git a/tools/src/py/qpid-tool b/tools/src/py/qpid-tool index d3b0aa4097..df8b7e3f96 100755 --- a/tools/src/py/qpid-tool +++ b/tools/src/py/qpid-tool @@ -259,7 +259,24 @@ class QmfData(Console): return displayId = long(tokens[0]) methodName = tokens[1] - args = tokens[2:] + args = [] + for arg in tokens[2:]: + ## + ## If the argument is a map, list, boolean, integer, or floating (one decimal point), + ## run it through the Python evaluator so it is converted to the correct type. + ## + ## TODO: use a regex for this instead of this convoluted logic, + ## or even consider passing all args through eval() [which would + ## be a minor change to the interface as string args would then + ## always need to be quoted as strings within a map/list would + ## now] + if arg[0] == '{' or arg[0] == '[' or arg[0] == '"' or arg[0] == '\'' or arg == "True" or arg == "False" or \ + ((arg.count('.') < 2 and (arg.count('-') == 0 or \ + (arg.count('-') == 1 and arg[0] == '-')) and \ + arg.replace('.','').replace('-','').isdigit())): + args.append(eval(arg)) + else: + args.append(arg) obj = None try: @@ -333,7 +350,7 @@ class QmfData(Console): self.notNone(prop.unit), notes, self.notNone(prop.desc)) rows.append(row) for stat in schema.getStatistics(): - row = (stat.name, self.typeName(stat.type), "", self.notNone(prop.unit), "", self.notNone(prop.desc)) + row = (stat.name, self.typeName(stat.type), "", self.notNone(stat.unit), "", self.notNone(stat.desc)) rows.append(row) self.disp.table(title, heads, rows) |