diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2009-12-26 12:42:57 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2009-12-26 12:42:57 +0000 |
| commit | 248f1fe188fe2307b9dcf2c87a83b653eaa1920c (patch) | |
| tree | d5d0959a70218946ff72e107a6c106e32479a398 /python/commands | |
| parent | 3c83a0e3ec7cf4dc23e83a340b25f5fc1676f937 (diff) | |
| download | qpid-python-248f1fe188fe2307b9dcf2c87a83b653eaa1920c.tar.gz | |
synchronized with trunk except for ruby dir
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid.rnr@893970 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/commands')
| -rwxr-xr-x | python/commands/qpid-cluster | 328 | ||||
| -rwxr-xr-x | python/commands/qpid-config | 474 | ||||
| -rwxr-xr-x | python/commands/qpid-printevents | 74 | ||||
| -rwxr-xr-x | python/commands/qpid-queue-stats | 203 | ||||
| -rwxr-xr-x | python/commands/qpid-route | 593 | ||||
| -rwxr-xr-x | python/commands/qpid-stat | 460 | ||||
| -rwxr-xr-x | python/commands/qpid-tool | 6 |
7 files changed, 1622 insertions, 516 deletions
diff --git a/python/commands/qpid-cluster b/python/commands/qpid-cluster new file mode 100755 index 0000000000..7afb7671b8 --- /dev/null +++ b/python/commands/qpid-cluster @@ -0,0 +1,328 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os +import getopt +import sys +import locale +import socket +import re +from qmf.console import Session + +class Config: + def __init__(self): + self._host = "localhost" + self._connTimeout = 10 + self._stopId = None + self._stopAll = False + self._force = False + self._numeric = False + self._showConn = False + self._delConn = None + +def usage (): + print "Usage: qpid-cluster [OPTIONS] [broker-addr]" + print + print " broker-addr is in the form: [username/password@] hostname | ip-address [:<port>]" + print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost" + print + print "Options:" + print " --timeout seconds (10) Maximum time to wait for broker connection" + print " -C [--all-connections] View client connections to all cluster members" + print " -c [--connections] ID View client connections to specified member" + print " -d [--del-connection] HOST:PORT" + print " Disconnect a client connection" + print " -s [--stop] ID Stop one member of the cluster by its ID" + print " -k [--all-stop] Shut down the whole cluster" + print " -f [--force] Suppress the 'are-you-sure?' prompt" + print " -n [--numeric] Don't resolve names" + print + +class IpAddr: + def __init__(self, text): + if text.find("@") != -1: + tokens = text.split("@") + text = tokens[1] + if text.find(":") != -1: + tokens = text.split(":") + text = tokens[0] + self.port = int(tokens[1]) + else: + self.port = 5672 + self.dottedQuad = socket.gethostbyname(text) + nums = self.dottedQuad.split(".") + self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3]) + + def bestAddr(self, addrPortList): + bestDiff = 0xFFFFFFFFL + bestAddr = None + for addrPort in addrPortList: + diff = IpAddr(addrPort[0]).addr ^ self.addr + if diff < bestDiff: + bestDiff = diff + bestAddr = addrPort + return bestAddr + +class BrokerManager: + def __init__(self, config): + self.config = config + self.brokerName = None + self.qmf = None + self.broker = None + + def SetBroker(self, brokerUrl): + self.url = brokerUrl + self.qmf = Session() + self.broker = self.qmf.addBroker(brokerUrl, self.config._connTimeout) + agents = self.qmf.getAgents() + for a in agents: + if a.getAgentBank() == 0: + self.brokerAgent = a + + def Disconnect(self): + if self.broker: + self.qmf.delBroker(self.broker) + + def _getClusters(self): + packages = self.qmf.getPackages() + if "org.apache.qpid.cluster" not in packages: + raise Exception("Clustering is not installed on the broker.") + + clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent) + if len(clusters) == 0: + raise Exception("Clustering is installed but not enabled on the broker.") + + return clusters + + def _getHostList(self, urlList): + hosts = [] + hostAddr = IpAddr(self.config._host) + for url in urlList: + if url.find("amqp:") != 0: + raise Exception("Invalid URL 1") + url = url[5:] + addrs = str(url).split(",") + addrList = [] + for addr in addrs: + tokens = addr.split(":") + if len(tokens) != 3: + raise Exception("Invalid URL 2") + addrList.append((tokens[1], tokens[2])) + + # Find the address in the list that is most likely to be in the same subnet as the address + # with which we made the original QMF connection. This increases the probability that we will + # be able to reach the cluster member. + + best = hostAddr.bestAddr(addrList) + bestUrl = best[0] + ":" + best[1] + hosts.append(bestUrl) + return hosts + + def overview(self): + clusters = self._getClusters() + cluster = clusters[0] + memberList = cluster.members.split(";") + idList = cluster.memberIDs.split(";") + + print " Cluster Name: %s" % cluster.clusterName + print "Cluster Status: %s" % cluster.status + print " Cluster Size: %d" % cluster.clusterSize + print " Members: ID=%s URL=%s" % (idList[0], memberList[0]) + for idx in range(1,len(idList)): + print " : ID=%s URL=%s" % (idList[idx], memberList[idx]) + + def stopMember(self, id): + clusters = self._getClusters() + cluster = clusters[0] + idList = cluster.memberIDs.split(";") + if id not in idList: + raise Exception("No member with matching ID found") + + if not self.config._force: + prompt = "Warning: " + if len(idList) == 1: + prompt += "This command will shut down the last running cluster member." + else: + prompt += "This command will shut down a cluster member." + prompt += " Are you sure? [N]: " + + confirm = raw_input(prompt) + if len(confirm) == 0 or confirm[0].upper() != 'Y': + raise Exception("Operation canceled") + + cluster.stopClusterNode(id) + + def stopAll(self): + clusters = self._getClusters() + if not self.config._force: + prompt = "Warning: This command will shut down the entire cluster." + prompt += " Are you sure? [N]: " + + confirm = raw_input(prompt) + if len(confirm) == 0 or confirm[0].upper() != 'Y': + raise Exception("Operation canceled") + + cluster = clusters[0] + cluster.stopFullCluster() + + def showConnections(self): + clusters = self._getClusters() + cluster = clusters[0] + memberList = cluster.members.split(";") + idList = cluster.memberIDs.split(";") + displayList = [] + hostList = self._getHostList(memberList) + self.qmf.delBroker(self.broker) + self.broker = None + self.brokers = [] + pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$") + + idx = 0 + for host in hostList: + if self.config._showConn == "all" or self.config._showConn == idList[idx] or self.config._delConn: + self.brokers.append(self.qmf.addBroker(host, self.config._connTimeout)) + displayList.append(idList[idx]) + idx += 1 + + idx = 0 + found = False + for broker in self.brokers: + if not self.config._delConn: + print "Clients on Member: ID=%s:" % displayList[idx] + connList = self.qmf.getObjects(_class="connection", _package="org.apache.qpid.broker", _broker=broker) + for conn in connList: + if pattern.match(conn.address): + if self.config._numeric or self.config._delConn: + a = conn.address + else: + tokens = conn.address.split(":") + try: + hostList = socket.gethostbyaddr(tokens[0]) + host = hostList[0] + except: + host = tokens[0] + a = host + ":" + tokens[1] + if self.config._delConn: + tokens = self.config._delConn.split(":") + ip = socket.gethostbyname(tokens[0]) + toDelete = ip + ":" + tokens[1] + if a == toDelete: + print "Closing connection from client: %s" % a + conn.close() + found = True + else: + print " %s" % a + idx += 1 + if not self.config._delConn: + print + if self.config._delConn and not found: + print "Client connection '%s' not found" % self.config._delConn + + for broker in self.brokers: + self.qmf.delBroker(broker) + + +def main(argv=None): + if argv is None: argv = sys.argv + try: + config = Config() + try: + longOpts = ("stop=", "all-stop", "force", "connections=", "all-connections" "del-connection=", "numeric", "timeout=") + (optlist, encArgs) = getopt.gnu_getopt(argv[1:], "s:kfCc:d:n", longOpts) + except: + usage() + return 1 + + try: + encoding = locale.getpreferredencoding() + cargs = [a.decode(encoding) for a in encArgs] + except: + cargs = encArgs + + count = 0 + for opt in optlist: + if opt[0] == "--timeout": + config._connTimeout = int(opt[1]) + if config._connTimeout == 0: + config._connTimeout = None + if opt[0] == "-s" or opt[0] == "--stop": + config._stopId = opt[1] + if len(config._stopId.split(":")) != 2: + raise Exception("Member ID must be of form: <host or ip>:<number>") + count += 1 + if opt[0] == "-k" or opt[0] == "--all-stop": + config._stopAll = True + count += 1 + if opt[0] == "-f" or opt[0] == "--force": + config._force = True + if opt[0] == "-n" or opt[0] == "--numeric": + config._numeric = True + if opt[0] == "-C" or opt[0] == "--all-connections": + config._showConn = "all" + count += 1 + if opt[0] == "-c" or opt[0] == "--connections": + config._showConn = opt[1] + if len(config._showConn.split(":")) != 2: + raise Exception("Member ID must be of form: <host or ip>:<number>") + count += 1 + if opt[0] == "-d" or opt[0] == "--del-connection": + config._delConn = opt[1] + if len(config._delConn.split(":")) != 2: + raise Exception("Connection must be of form: <host or ip>:<port>") + count += 1 + + if count > 1: + print "Only one command option may be supplied" + print + usage() + return 1 + + nargs = len(cargs) + bm = BrokerManager(config) + + if nargs == 1: + config._host = cargs[0] + + try: + bm.SetBroker(config._host) + if config._stopId: + bm.stopMember(config._stopId) + elif config._stopAll: + bm.stopAll() + elif config._showConn or config._delConn: + bm.showConnections() + else: + bm.overview() + except KeyboardInterrupt: + print + except Exception,e: + if str(e).find("connection aborted") > 0: + # we expect this when asking the connected broker to shut down + return 0 + raise Exception("Failed: %s - %s" % (e.__class__.__name__, e)) + + bm.Disconnect() + except Exception, e: + print str(e) + return 1 + +if __name__ == "__main__": + sys.exit(main()) diff --git a/python/commands/qpid-config b/python/commands/qpid-config index cc9315f7ea..39af67f39c 100755 --- a/python/commands/qpid-config +++ b/python/commands/qpid-config @@ -22,30 +22,39 @@ import os import getopt import sys -import socket -import qpid -from threading import Condition -from qpid.management import managementClient -from qpid.managementdata import Broker -from qpid.peer import Closed -from qpid.connection import Connection, ConnectionFailed -from qpid.datatypes import uuid4 -from qpid.util import connect -from time import sleep - -_recursive = False -_host = "localhost" -_durable = False -_fileCount = 8 -_fileSize = 24 -_maxQueueSize = None -_maxQueueCount= None - +import locale +from qmf.console import Session + +_recursive = False +_host = "localhost" +_connTimeout = 10 +_altern_ex = None +_passive = False +_durable = False +_clusterDurable = False +_if_empty = True +_if_unused = True +_fileCount = 8 +_fileSize = 24 +_maxQueueSize = None +_maxQueueCount = None +_limitPolicy = None +_order = None +_msgSequence = False +_ive = False +_eventGeneration = None FILECOUNT = "qpid.file_count" FILESIZE = "qpid.file_size" MAX_QUEUE_SIZE = "qpid.max_size" MAX_QUEUE_COUNT = "qpid.max_count" +POLICY_TYPE = "qpid.policy_type" +CLUSTER_DURABLE = "qpid.persist_last_node" +LVQ = "qpid.last_value_queue" +LVQNB = "qpid.last_value_queue_no_browse" +MSG_SEQUENCE = "qpid.msg_sequence" +IVE = "qpid.ive" +QUEUE_EVENT_GENERATION = "qpid.queue_event_generation" def Usage (): print "Usage: qpid-config [OPTIONS]" @@ -54,68 +63,99 @@ def Usage (): print " qpid-config [OPTIONS] add exchange <type> <name> [AddExchangeOptions]" print " qpid-config [OPTIONS] del exchange <name>" print " qpid-config [OPTIONS] add queue <name> [AddQueueOptions]" - print " qpid-config [OPTIONS] del queue <name>" + print " qpid-config [OPTIONS] del queue <name> [DelQueueOptions]" print " qpid-config [OPTIONS] bind <exchange-name> <queue-name> [binding-key]" print " qpid-config [OPTIONS] unbind <exchange-name> <queue-name> [binding-key]" print print "Options:" + print " --timeout seconds (10) Maximum time to wait for broker connection" print " -b [ --bindings ] Show bindings in queue or exchange list" print " -a [ --broker-addr ] Address (localhost) Address of qpidd broker" print " broker-addr is in the form: [username/password@] hostname | ip-address [:<port>]" print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost" print print "Add Queue Options:" - print " --durable Queue is durable" - print " --file-count N (8) Number of files in queue's persistence journal" - print " --file-size N (24) File size in pages (64Kib/page)" - print " --max-queue-size N Maximum in-memory queue size as bytes" - print " --max-queue-count N Maximum in-memory queue size as a number of messages" + print " --alternate-exchange [name of the alternate exchange]" + print " The alternate-exchange field specifies how messages on this queue should" + print " be treated when they are rejected by a subscriber, or when they are" + print " orphaned by queue deletion. When present, rejected or orphaned messages" + print " MUST be routed to the alternate-exchange. In all cases the messages MUST" + print " be removed from the queue." + print " --passive Do not actually change the broker state (queue will not be created)" + print " --durable Queue is durable" + print " --cluster-durable Queue becomes durable if there is only one functioning cluster node" + print " --file-count N (8) Number of files in queue's persistence journal" + print " --file-size N (24) File size in pages (64Kib/page)" + print " --max-queue-size N Maximum in-memory queue size as bytes" + print " --max-queue-count N Maximum in-memory queue size as a number of messages" + print " --limit-policy [none | reject | flow-to-disk | ring | ring-strict]" + print " Action taken when queue limit is reached:" + print " none (default) - Use broker's default policy" + print " reject - Reject enqueued messages" + print " flow-to-disk - Page messages to disk" + print " ring - Replace oldest unacquired message with new" + print " ring-strict - Replace oldest message, reject if oldest is acquired" + print " --order [fifo | lvq | lvq-no-browse]" + print " Set queue ordering policy:" + print " fifo (default) - First in, first out" + print " lvq - Last Value Queue ordering, allows queue browsing" + print " lvq-no-browse - Last Value Queue ordering, browsing clients may lose data" + print " --generate-queue-events N" + print " If set to 1, every enqueue will generate an event that can be processed by" + print " registered listeners (e.g. for replication). If set to 2, events will be" + print " generated for enqueues and dequeues" + print + print "Del Queue Options:" + print " --force Force delete of queue even if it's currently used or it's not empty" + print " --force-if-not-empty Force delete of queue even if it's not empty" + print " --force-if-used Force delete of queue even if it's currently used" + print + print "Add Exchange <type> values:" + print " direct Direct exchange for point-to-point communication" + print " fanout Fanout exchange for broadcast communication" + print " topic Topic exchange that routes messages using binding keys with wildcards" + print " headers Headers exchange that matches header fields against the binding keys" print print "Add Exchange Options:" - print " --durable Exchange is durable" + print " --alternate-exchange [name of the alternate exchange]" + print " In the event that a message cannot be routed, this is the name of the exchange to" + print " which the message will be sent. Messages transferred using message.transfer will" + print " be routed to the alternate-exchange only if they are sent with the \"none\"" + print " accept-mode, and the discard-unroutable delivery property is set to false, and" + print " there is no queue to route to for the given message according to the bindings" + print " on this exchange." + print " --passive Do not actually change the broker state (exchange will not be created)" + print " --durable Exchange is durable" + print " --sequence Exchange will insert a 'qpid.msg_sequence' field in the message header" + print " with a value that increments for each message forwarded." + print " --ive Exchange will behave as an 'initial-value-exchange', keeping a reference" + print " to the last message forwarded and enqueuing that message to newly bound" + print " queues." print sys.exit (1) class BrokerManager: def __init__ (self): - self.dest = None - self.src = None - self.broker = None - - def SetBroker (self, broker): - self.broker = broker - - def ConnectToBroker (self): - try: - self.sessionId = "%s.%d" % (os.uname()[1], os.getpid()) - self.conn = Connection (connect (self.broker.host, self.broker.port), - username=self.broker.username, password=self.broker.password) - self.conn.start () - self.session = self.conn.session (self.sessionId) - self.mclient = managementClient (self.conn.spec) - self.mchannel = self.mclient.addChannel (self.session) - except socket.error, e: - print "Socket Error %s - %s" % (e[0], e[1]) - sys.exit (1) - except Closed, e: - print "Connect Failed %d - %s" % (e[0], e[1]) - sys.exit (1) - except ConnectionFailed, e: - print "Connect Failed %d - %s" % (e[0], e[1]) - sys.exit(1) - - def Disconnect (self): - self.mclient.removeChannel (self.mchannel) - self.session.close(timeout=10) - self.conn.close(timeout=10) + self.brokerName = None + self.qmf = None + self.broker = None + + def SetBroker (self, brokerUrl): + self.url = brokerUrl + self.qmf = Session() + self.broker = self.qmf.addBroker(brokerUrl, _connTimeout) + agents = self.qmf.getAgents() + for a in agents: + if a.getAgentBank() == 0: + self.brokerAgent = a + + def Disconnect(self): + if self.broker: + self.qmf.delBroker(self.broker) def Overview (self): - self.ConnectToBroker () - mc = self.mclient - mch = self.mchannel - mc.syncWaitForStable (mch) - exchanges = mc.syncGetObjects (mch, "exchange") - queues = mc.syncGetObjects (mch, "queue") + exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) + queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) print "Total Exchanges: %d" % len (exchanges) etype = {} for ex in exchanges: @@ -136,30 +176,39 @@ class BrokerManager: print " non-durable: %d" % (len (queues) - _durable) def ExchangeList (self, filter): - self.ConnectToBroker () - mc = self.mclient - mch = self.mchannel - mc.syncWaitForStable (mch) - exchanges = mc.syncGetObjects (mch, "exchange") - print "Durable Type Bindings Exchange Name" - print "=======================================================" + exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) + caption1 = "Type " + caption2 = "Exchange Name" + maxNameLen = len(caption2) + for ex in exchanges: + if self.match(ex.name, filter): + if len(ex.name) > maxNameLen: maxNameLen = len(ex.name) + print "%s%-*s Attributes" % (caption1, maxNameLen, caption2) + line = "" + for i in range(((maxNameLen + len(caption1)) / 5) + 5): + line += "=====" + print line + for ex in exchanges: if self.match (ex.name, filter): - print "%4c %-10s%5d %s" % (YN (ex.durable), ex.type, ex.bindingCount, ex.name) + print "%-10s%-*s " % (ex.type, maxNameLen, ex.name), + args = ex.arguments + if ex.durable: print "--durable", + if MSG_SEQUENCE in args and args[MSG_SEQUENCE] == 1: print "--sequence", + if IVE in args and args[IVE] == 1: print "--ive", + if ex.altExchange: + print "--alternate-exchange=%s" % ex._altExchange_.name, + print def ExchangeListRecurse (self, filter): - self.ConnectToBroker () - mc = self.mclient - mch = self.mchannel - mc.syncWaitForStable (mch) - exchanges = mc.syncGetObjects (mch, "exchange") - bindings = mc.syncGetObjects (mch, "binding") - queues = mc.syncGetObjects (mch, "queue") + exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) + bindings = self.qmf.getObjects(_class="binding", _agent=self.brokerAgent) + queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) for ex in exchanges: if self.match (ex.name, filter): print "Exchange '%s' (%s)" % (ex.name, ex.type) for bind in bindings: - if bind.exchangeRef == ex.id: + if bind.exchangeRef == ex.getObjectId(): qname = "<unknown>" queue = self.findById (queues, bind.queueRef) if queue != None: @@ -168,43 +217,48 @@ class BrokerManager: def QueueList (self, filter): - self.ConnectToBroker () - mc = self.mclient - mch = self.mchannel - mc.syncWaitForStable (mch) - queues = mc.syncGetObjects (mch, "queue") - journals = mc.syncGetObjects (mch, "journal") - print " Store Size" - print "Durable AutoDel Excl Bindings (files x file pages) Queue Name" - print "===========================================================================================" + queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) + + caption = "Queue Name" + maxNameLen = len(caption) + for q in queues: + if self.match (q.name, filter): + if len(q.name) > maxNameLen: maxNameLen = len(q.name) + print "%-*s Attributes" % (maxNameLen, caption) + line = "" + for i in range((maxNameLen / 5) + 5): + line += "=====" + print line + for q in queues: if self.match (q.name, filter): + print "%-*s " % (maxNameLen, q.name), args = q.arguments - if q.durable and FILESIZE in args and FILECOUNT in args: - fs = int (args[FILESIZE]) - fc = int (args[FILECOUNT]) - print "%4c%9c%7c%10d%11dx%-14d%s" % \ - (YN (q.durable), YN (q.autoDelete), - YN (q.exclusive), q.bindingCount, fc, fs, q.name) - else: - if not _durable: - print "%4c%9c%7c%10d %s" % \ - (YN (q.durable), YN (q.autoDelete), - YN (q.exclusive), q.bindingCount, q.name) + if q.durable: print "--durable", + if CLUSTER_DURABLE in args and args[CLUSTER_DURABLE] == 1: print "--cluster-durable", + if q.autoDelete: print "auto-del", + if q.exclusive: print "excl", + if FILESIZE in args: print "--file-size=%d" % args[FILESIZE], + if FILECOUNT in args: print "--file-count=%d" % args[FILECOUNT], + if MAX_QUEUE_SIZE in args: print "--max-queue-size=%d" % args[MAX_QUEUE_SIZE], + if MAX_QUEUE_COUNT in args: print "--max-queue-count=%d" % args[MAX_QUEUE_COUNT], + if POLICY_TYPE in args: print "--limit-policy=%s" % args[POLICY_TYPE].replace("_", "-"), + if LVQ in args and args[LVQ] == 1: print "--order lvq", + if LVQNB in args and args[LVQNB] == 1: print "--order lvq-no-browse", + if QUEUE_EVENT_GENERATION in args: print "--generate-queue-events=%d" % args[QUEUE_EVENT_GENERATION], + if q.altExchange: + print "--alternate-exchange=%s" % q._altExchange_.name, + print def QueueListRecurse (self, filter): - self.ConnectToBroker () - mc = self.mclient - mch = self.mchannel - mc.syncWaitForStable (mch) - exchanges = mc.syncGetObjects (mch, "exchange") - bindings = mc.syncGetObjects (mch, "binding") - queues = mc.syncGetObjects (mch, "queue") + exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) + bindings = self.qmf.getObjects(_class="binding", _agent=self.brokerAgent) + queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) for queue in queues: if self.match (queue.name, filter): print "Queue '%s'" % queue.name for bind in bindings: - if bind.queueRef == queue.id: + if bind.queueRef == queue.getObjectId(): ename = "<unknown>" ex = self.findById (exchanges, bind.exchangeRef) if ex != None: @@ -216,30 +270,27 @@ class BrokerManager: def AddExchange (self, args): if len (args) < 2: Usage () - self.ConnectToBroker () etype = args[0] ename = args[1] - - try: - self.session.exchange_declare (exchange=ename, type=etype, durable=_durable) - except Closed, e: - print "Failed:", e + declArgs = {} + if _msgSequence: + declArgs[MSG_SEQUENCE] = 1 + if _ive: + declArgs[IVE] = 1 + if _altern_ex != None: + self.broker.getAmqpSession().exchange_declare (exchange=ename, type=etype, alternate_exchange=_altern_ex, passive=_passive, durable=_durable, arguments=declArgs) + else: + self.broker.getAmqpSession().exchange_declare (exchange=ename, type=etype, passive=_passive, durable=_durable, arguments=declArgs) def DelExchange (self, args): if len (args) < 1: Usage () - self.ConnectToBroker () ename = args[0] - - try: - self.session.exchange_delete (exchange=ename) - except Closed, e: - print "Failed:", e + self.broker.getAmqpSession().exchange_delete (exchange=ename) def AddQueue (self, args): if len (args) < 1: Usage () - self.ConnectToBroker () qname = args[0] declArgs = {} if _durable: @@ -250,56 +301,64 @@ class BrokerManager: declArgs[MAX_QUEUE_SIZE] = _maxQueueSize if _maxQueueCount: declArgs[MAX_QUEUE_COUNT] = _maxQueueCount - - try: - self.session.queue_declare (queue=qname, durable=_durable, arguments=declArgs) - except Closed, e: - print "Failed:", e + if _limitPolicy: + if _limitPolicy == "none": + pass + elif _limitPolicy == "reject": + declArgs[POLICY_TYPE] = "reject" + elif _limitPolicy == "flow-to-disk": + declArgs[POLICY_TYPE] = "flow_to_disk" + elif _limitPolicy == "ring": + declArgs[POLICY_TYPE] = "ring" + elif _limitPolicy == "ring-strict": + declArgs[POLICY_TYPE] = "ring_strict" + + if _clusterDurable: + declArgs[CLUSTER_DURABLE] = 1 + if _order: + if _order == "fifo": + pass + elif _order == "lvq": + declArgs[LVQ] = 1 + elif _order == "lvq-no-browse": + declArgs[LVQNB] = 1 + if _eventGeneration: + declArgs[QUEUE_EVENT_GENERATION] = _eventGeneration + + if _altern_ex != None: + self.broker.getAmqpSession().queue_declare (queue=qname, alternate_exchange=_altern_ex, passive=_passive, durable=_durable, arguments=declArgs) + else: + self.broker.getAmqpSession().queue_declare (queue=qname, passive=_passive, durable=_durable, arguments=declArgs) def DelQueue (self, args): if len (args) < 1: Usage () - self.ConnectToBroker () qname = args[0] - - try: - self.session.queue_delete (queue=qname) - except Closed, e: - print "Failed:", e + self.broker.getAmqpSession().queue_delete (queue=qname, if_empty=_if_empty, if_unused=_if_unused) def Bind (self, args): if len (args) < 2: Usage () - self.ConnectToBroker () ename = args[0] qname = args[1] key = "" if len (args) > 2: key = args[2] - - try: - self.session.exchange_bind (queue=qname, exchange=ename, binding_key=key) - except Closed, e: - print "Failed:", e + self.broker.getAmqpSession().exchange_bind (queue=qname, exchange=ename, binding_key=key) def Unbind (self, args): if len (args) < 2: Usage () - self.ConnectToBroker () ename = args[0] qname = args[1] key = "" if len (args) > 2: key = args[2] - - try: - self.session.exchange_unbind (queue=qname, exchange=ename, binding_key=key) - except Closed, e: - print "Failed:", e + self.broker.getAmqpSession().exchange_unbind (queue=qname, exchange=ename, binding_key=key) def findById (self, items, id): for item in items: - if item.id == id: + if item.getObjectId() == id: return item return None @@ -315,23 +374,43 @@ def YN (bool): return 'Y' return 'N' + ## ## Main Program ## try: - longOpts = ("durable", "bindings", "broker-addr=", "file-count=", "file-size=", "max-queue-size=", "max-queue-count=") - (optlist, cargs) = getopt.gnu_getopt (sys.argv[1:], "a:b", longOpts) + longOpts = ("durable", "cluster-durable", "bindings", "broker-addr=", "file-count=", + "file-size=", "max-queue-size=", "max-queue-count=", "limit-policy=", + "order=", "sequence", "ive", "generate-queue-events=", "force", "force-if-not-empty", + "force_if_used", "alternate-exchange=", "passive", "timeout=") + (optlist, encArgs) = getopt.gnu_getopt (sys.argv[1:], "a:b", longOpts) except: Usage () +try: + encoding = locale.getpreferredencoding() + cargs = [a.decode(encoding) for a in encArgs] +except: + cargs = encArgs + for opt in optlist: if opt[0] == "-b" or opt[0] == "--bindings": _recursive = True if opt[0] == "-a" or opt[0] == "--broker-addr": _host = opt[1] + if opt[0] == "--timeout": + _connTimeout = int(opt[1]) + if _connTimeout == 0: + _connTimeout = None + if opt[0] == "--alternate-exchange": + _altern_ex = opt[1] + if opt[0] == "--passive": + _passive = True if opt[0] == "--durable": _durable = True + if opt[0] == "--cluster-durable": + _clusterDurable = True if opt[0] == "--file-count": _fileCount = int (opt[1]) if opt[0] == "--file-size": @@ -340,46 +419,77 @@ for opt in optlist: _maxQueueSize = int (opt[1]) if opt[0] == "--max-queue-count": _maxQueueCount = int (opt[1]) + if opt[0] == "--limit-policy": + _limitPolicy = opt[1] + if _limitPolicy not in ("none", "reject", "flow-to-disk", "ring", "ring-strict"): + print "Error: Invalid --limit-policy argument" + sys.exit(1) + if opt[0] == "--order": + _order = opt[1] + if _order not in ("fifo", "lvq", "lvq-no-browse"): + print "Error: Invalid --order argument" + sys.exit(1) + if opt[0] == "--sequence": + _msgSequence = True + if opt[0] == "--ive": + _ive = True + if opt[0] == "--generate-queue-events": + _eventGeneration = int (opt[1]) + if opt[0] == "--force": + _if_empty = False + _if_unused = False + if opt[0] == "--force-if-not-empty": + _if_empty = False + if opt[0] == "--force-if-used": + _if_unused = False + nargs = len (cargs) bm = BrokerManager () -bm.SetBroker (Broker (_host)) - -if nargs == 0: - bm.Overview () -else: - cmd = cargs[0] - modifier = "" - if nargs > 1: - modifier = cargs[1] - if cmd[0] == 'e': - if _recursive: - bm.ExchangeListRecurse (modifier) - else: - bm.ExchangeList (modifier) - elif cmd[0] == 'q': - if _recursive: - bm.QueueListRecurse (modifier) - else: - bm.QueueList (modifier) - elif cmd == "add": - if modifier == "exchange": - bm.AddExchange (cargs[2:]) - elif modifier == "queue": - bm.AddQueue (cargs[2:]) - else: - Usage () - elif cmd == "del": - if modifier == "exchange": - bm.DelExchange (cargs[2:]) - elif modifier == "queue": - bm.DelQueue (cargs[2:]) + +try: + bm.SetBroker(_host) + if nargs == 0: + bm.Overview () + else: + cmd = cargs[0] + modifier = "" + if nargs > 1: + modifier = cargs[1] + if cmd == "exchanges": + if _recursive: + bm.ExchangeListRecurse (modifier) + else: + bm.ExchangeList (modifier) + elif cmd == "queues": + if _recursive: + bm.QueueListRecurse (modifier) + else: + bm.QueueList (modifier) + elif cmd == "add": + if modifier == "exchange": + bm.AddExchange (cargs[2:]) + elif modifier == "queue": + bm.AddQueue (cargs[2:]) + else: + Usage () + elif cmd == "del": + if modifier == "exchange": + bm.DelExchange (cargs[2:]) + elif modifier == "queue": + bm.DelQueue (cargs[2:]) + else: + Usage () + elif cmd == "bind": + bm.Bind (cargs[1:]) + elif cmd == "unbind": + bm.Unbind (cargs[1:]) else: Usage () - elif cmd == "bind": - bm.Bind (cargs[1:]) - elif cmd == "unbind": - bm.Unbind (cargs[1:]) - else: - Usage () +except KeyboardInterrupt: + print +except Exception,e: + print "Failed: %s: %s" % (e.__class__.__name__, e) + sys.exit(1) + bm.Disconnect() diff --git a/python/commands/qpid-printevents b/python/commands/qpid-printevents new file mode 100755 index 0000000000..0c1b618a1f --- /dev/null +++ b/python/commands/qpid-printevents @@ -0,0 +1,74 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os +import optparse +import sys +import socket +from time import time, strftime, gmtime, sleep +from qmf.console import Console, Session + +class EventConsole(Console): + def event(self, broker, event): + print event + + def brokerConnected(self, broker): + print strftime("%c", gmtime(time())), "NOTIC qpid-printevents:brokerConnected broker=%s" % broker.getUrl() + + def brokerDisconnected(self, broker): + print strftime("%c", gmtime(time())), "NOTIC qpid-printevents:brokerDisconnected broker=%s" % broker.getUrl() + + +## +## Main Program +## +def main(): + _usage = "%prog [options] [broker-addr]..." + _description = \ +"""Collect and print events from one or more Qpid message brokers. If no broker-addr is +supplied, %prog will connect to 'localhost:5672'. +broker-addr is of the form: [username/password@] hostname | ip-address [:<port>] +ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost +""" + p = optparse.OptionParser(usage=_usage, description=_description) + + options, arguments = p.parse_args() + if len(arguments) == 0: + arguments.append("localhost") + + console = EventConsole() + session = Session(console, rcvObjects=False, rcvHeartbeats=False, manageConnections=True) + brokers = [] + for host in arguments: + brokers.append(session.addBroker(host)) + + try: + while (True): + sleep(10) + except KeyboardInterrupt: + for broker in brokers: + session.delBroker(broker) + print + sys.exit(0) + +if __name__ == '__main__': + main() + diff --git a/python/commands/qpid-queue-stats b/python/commands/qpid-queue-stats index 98dfa7580a..3b8a0dcb19 100755 --- a/python/commands/qpid-queue-stats +++ b/python/commands/qpid-queue-stats @@ -26,120 +26,100 @@ import re import socket import qpid from threading import Condition -from qpid.management import managementClient -from qpid.managementdata import Broker +from qmf.console import Session, Console from qpid.peer import Closed from qpid.connection import Connection, ConnectionFailed -from qpid.util import connect from time import sleep -class mgmtObject (object): - """ Generic object that holds the contents of a management object with its - attributes set as object attributes. """ - - def __init__ (self, classKey, timestamps, row): - self.classKey = classKey - self.timestamps = timestamps - for cell in row: - setattr (self, cell[0], cell[1]) - - - -class BrokerManager: - def __init__ (self): - self.dest = None - self.src = None - self.broker = None - self.objects = {} - self.filter = None - - def SetBroker (self, broker): - self.broker = broker - - def ConnectToBroker (self): - try: - self.sessionId = "%s.%d" % (os.uname()[1], os.getpid()) - self.conn = Connection (connect (self.broker.host, self.broker.port), - username=self.broker.username, password=self.broker.password) - self.conn.start () - self.session = self.conn.session(self.sessionId) - self.mclient = managementClient (self.conn.spec, None, self.configCb, self.instCb) - self.mchannel = self.mclient.addChannel (self.session) - except socket.error, e: - print "Socket Error %s - %s" % (e[0], e[1]) - sys.exit (1) - except Closed, e: - print "Connect Failed %d - %s" % (e[0], e[1]) - sys.exit (1) - except ConnectionFailed, e: - print "Connect Failed %d - %s" % (e[0], e[1]) - sys.exit(1) - - def setFilter(self,filter): - self.filter = filter - - def Disconnect (self): - self.mclient.removeChannel (self.mchannel) - self.session.close(timeout=10) - self.conn.close(timeout=10) - - def configCb (self, context, classKey, row, timestamps): - className = classKey[1] - if className != "queue": - return - - obj = mgmtObject (classKey, timestamps, row) - if obj.id not in self.objects: - self.objects[obj.id] = (obj.name, None, None) - - def instCb (self, context, classKey, row, timestamps): - className = classKey[1] - if className != "queue": - return - - obj = mgmtObject (classKey, timestamps, row) - if obj.id not in self.objects: - return - - (name, first, last) = self.objects[obj.id] - if first == None: - self.objects[obj.id] = (name, obj, None) - return - - if len(self.filter) > 0 : - match = False - - for x in self.filter: - if x.match(name): - match = True - break - if match == False: - return - - if last == None: - lastSample = first - else: - lastSample = last - - self.objects[obj.id] = (name, first, obj) - - deltaTime = float (obj.timestamps[0] - lastSample.timestamps[0]) - enqueueRate = float (obj.msgTotalEnqueues - lastSample.msgTotalEnqueues) / (deltaTime / 1000000000.0) - dequeueRate = float (obj.msgTotalDequeues - lastSample.msgTotalDequeues) / (deltaTime / 1000000000.0) - print "%-41s%10.2f%11d%13.2f%13.2f" % \ - (name, deltaTime / 1000000000, obj.msgDepth, enqueueRate, dequeueRate) - - - def Display (self): - self.ConnectToBroker () - print "Queue Name Sec Depth Enq Rate Deq Rate" - print "========================================================================================" - try: - while True: - sleep (1) - except KeyboardInterrupt: - pass - self.Disconnect () +class BrokerManager(Console): + def __init__(self, host): + self.url = host + self.objects = {} + self.filter = None + self.session = Session(self, rcvEvents=False, rcvHeartbeats=False, + userBindings=True, manageConnections=True) + self.broker = self.session.addBroker(self.url) + self.firstError = True + + def setFilter(self,filter): + self.filter = filter + + def brokerConnected(self, broker): + if not self.firstError: + print "*** Broker connected" + self.firstError = False + + def brokerDisconnected(self, broker): + print "*** Broker connection lost - %s, retrying..." % broker.getError() + self.firstError = False + self.objects.clear() + + def objectProps(self, broker, record): + className = record.getClassKey().getClassName() + if className != "queue": + return + + id = record.getObjectId().__repr__() + if id not in self.objects: + self.objects[id] = (record.name, None, None) + + def objectStats(self, broker, record): + className = record.getClassKey().getClassName() + if className != "queue": + return + + id = record.getObjectId().__repr__() + if id not in self.objects: + return + + (name, first, last) = self.objects[id] + if first == None: + self.objects[id] = (name, record, None) + return + + if len(self.filter) > 0 : + match = False + + for x in self.filter: + if x.match(name): + match = True + break + if match == False: + return + + if last == None: + lastSample = first + else: + lastSample = last + + self.objects[id] = (name, first, record) + + deltaTime = float (record.getTimestamps()[0] - lastSample.getTimestamps()[0]) + if deltaTime < 1000000000.0: + return + enqueueRate = float (record.msgTotalEnqueues - lastSample.msgTotalEnqueues) / \ + (deltaTime / 1000000000.0) + dequeueRate = float (record.msgTotalDequeues - lastSample.msgTotalDequeues) / \ + (deltaTime / 1000000000.0) + print "%-41s%10.2f%11d%13.2f%13.2f" % \ + (name, deltaTime / 1000000000, record.msgDepth, enqueueRate, dequeueRate) + sys.stdout.flush() + + + def Display (self): + self.session.bindClass("org.apache.qpid.broker", "queue") + print "Queue Name Sec Depth Enq Rate Deq Rate" + print "========================================================================================" + sys.stdout.flush() + try: + while True: + sleep (1) + if self.firstError and self.broker.getError(): + self.firstError = False + print "*** Error: %s, retrying..." % self.broker.getError() + except KeyboardInterrupt: + print + self.session.delBroker(self.broker) ## ## Main Program @@ -157,8 +137,7 @@ def main(): for s in options.filter.split(","): filter.append(re.compile(s)) - bm = BrokerManager () - bm.SetBroker (Broker (host)) + bm = BrokerManager(host) bm.setFilter(filter) bm.Display() diff --git a/python/commands/qpid-route b/python/commands/qpid-route index 3cd9109a6a..9965047000 100755 --- a/python/commands/qpid-route +++ b/python/commands/qpid-route @@ -22,280 +22,379 @@ import getopt import sys import socket -import qpid import os -from qpid.management import managementClient -from qpid.managementdata import Broker -from qpid.peer import Closed -from qpid.connection import Connection, ConnectionFailed -from qpid.util import connect - -def Usage (): - print "Usage: qpid-route [OPTIONS] link add <dest-broker> <src-broker>" - print " qpid-route [OPTIONS] link del <dest-broker> <src-broker>" - print " qpid-route [OPTIONS] link list [<dest-broker>]" +import locale +from qmf.console import Session, BrokerURL + +def Usage(): + print "Usage: qpid-route [OPTIONS] dynamic add <dest-broker> <src-broker> <exchange> [tag] [exclude-list]" + print " qpid-route [OPTIONS] dynamic del <dest-broker> <src-broker> <exchange>" print print " qpid-route [OPTIONS] route add <dest-broker> <src-broker> <exchange> <routing-key> [tag] [exclude-list]" print " qpid-route [OPTIONS] route del <dest-broker> <src-broker> <exchange> <routing-key>" + print " qpid-route [OPTIONS] queue add <dest-broker> <src-broker> <exchange> <queue>" + print " qpid-route [OPTIONS] queue del <dest-broker> <src-broker> <exchange> <queue>" print " qpid-route [OPTIONS] route list [<dest-broker>]" print " qpid-route [OPTIONS] route flush [<dest-broker>]" + print " qpid-route [OPTIONS] route map [<broker>]" + print + print " qpid-route [OPTIONS] link add <dest-broker> <src-broker>" + print " qpid-route [OPTIONS] link del <dest-broker> <src-broker>" + print " qpid-route [OPTIONS] link list [<dest-broker>]" print print "Options:" + print " --timeout seconds (10) Maximum time to wait for broker connection" print " -v [ --verbose ] Verbose output" print " -q [ --quiet ] Quiet output, don't print duplicate warnings" print " -d [ --durable ] Added configuration shall be durable" print " -e [ --del-empty-link ] Delete link after deleting last route on the link" + print " -s [ --src-local ] Make connection to source broker (push route)" + print " --ack N Acknowledge transfers over the bridge in batches of N" + print " -t <transport> [ --transport <transport>]" + print " Specify transport to use for links, defaults to tcp" print print " dest-broker and src-broker are in the form: [username/password@] hostname | ip-address [:<port>]" print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost" print - sys.exit (1) + sys.exit(1) -_verbose = False -_quiet = False -_durable = False -_dellink = False +_verbose = False +_quiet = False +_durable = False +_dellink = False +_srclocal = False +_transport = "tcp" +_ack = 0 +_connTimeout = 10 class RouteManager: - def __init__ (self, destBroker): - self.dest = Broker (destBroker) - self.src = None + def __init__(self, localBroker): + self.local = BrokerURL(localBroker) + self.remote = None + self.qmf = Session() + self.broker = self.qmf.addBroker(localBroker, _connTimeout) - def ConnectToBroker (self): - broker = self.dest - if _verbose: - print "Connecting to broker: %s:%d" % (broker.host, broker.port) - try: - self.sessionId = "%s.%d" % (os.uname()[1], os.getpid()) - self.conn = Connection (connect (broker.host, broker.port), \ - username=broker.username, password=broker.password) - self.conn.start () - self.session = self.conn.session(self.sessionId) - self.mclient = managementClient (self.conn.spec) - self.mch = self.mclient.addChannel (self.session) - self.mclient.syncWaitForStable (self.mch) - except socket.error, e: - print "Socket Error %s - %s" % (e[0], e[1]) - sys.exit (1) - except Closed, e: - print "Connect Failed %d - %s" % (e[0], e[1]) - sys.exit (1) - except ConnectionFailed, e: - print "Connect Failed %d - %s" % (e[0], e[1]) - sys.exit(1) - - def Disconnect (self): - self.mclient.removeChannel (self.mch) - self.session.close(timeout=10) - self.conn.close(timeout=10) - - def getLink (self): - links = self.mclient.syncGetObjects (self.mch, "link") + def disconnect(self): + self.qmf.delBroker(self.broker) + + def getLink(self): + links = self.qmf.getObjects(_class="link") for link in links: - if "%s:%d" % (link.host, link.port) == self.src.name (): + if self.remote.match(link.host, link.port): return link return None - def AddLink (self, srcBroker): - self.src = Broker (srcBroker) - mc = self.mclient - - if self.dest.name() == self.src.name(): - print "Linking broker to itself is not permitted" - sys.exit(1) + def addLink(self, remoteBroker): + self.remote = BrokerURL(remoteBroker) + if self.local.match(self.remote.host, self.remote.port): + raise Exception("Linking broker to itself is not permitted") - brokers = mc.syncGetObjects (self.mch, "broker") + brokers = self.qmf.getObjects(_class="broker") broker = brokers[0] link = self.getLink() - if link != None: - print "Link already exists" - sys.exit(1) - - connectArgs = {} - connectArgs["host"] = self.src.host - connectArgs["port"] = self.src.port - connectArgs["useSsl"] = False - connectArgs["durable"] = _durable - if self.src.username == "anonymous": - connectArgs["authMechanism"] = "ANONYMOUS" - else: - connectArgs["authMechanism"] = "PLAIN" - connectArgs["username"] = self.src.username - connectArgs["password"] = self.src.password - res = mc.syncCallMethod (self.mch, broker.id, broker.classKey, "connect", connectArgs) - if _verbose: - print "Connect method returned:", res.status, res.statusText - link = self.getLink () - - def DelLink (self, srcBroker): - self.src = Broker (srcBroker) - mc = self.mclient + if link == None: + if not self.remote.authName or self.remote.authName == "anonymous": + mech = "ANONYMOUS" + else: + mech = "PLAIN" + res = broker.connect(self.remote.host, self.remote.port, _durable, + mech, self.remote.authName or "", self.remote.authPass or "", + _transport) + if _verbose: + print "Connect method returned:", res.status, res.text - brokers = mc.syncGetObjects (self.mch, "broker") + def delLink(self, remoteBroker): + self.remote = BrokerURL(remoteBroker) + brokers = self.qmf.getObjects(_class="broker") broker = brokers[0] link = self.getLink() if link == None: - print "Link not found" - sys.exit(1) + raise Exception("Link not found") - res = mc.syncCallMethod (self.mch, link.id, link.classKey, "close") + res = link.close() if _verbose: - print "Close method returned:", res.status, res.statusText + print "Close method returned:", res.status, res.text - def ListLinks (self): - mc = self.mclient - links = mc.syncGetObjects (self.mch, "link") + def listLinks(self): + links = self.qmf.getObjects(_class="link") if len(links) == 0: print "No Links Found" else: print - print "Host Port Durable State Last Error" - print "===================================================================" + print "Host Port Transport Durable State Last Error" + print "=============================================================================" + for link in links: + print "%-16s%-8d%-13s%c %-18s%s" % \ + (link.host, link.port, link.transport, YN(link.durable), link.state, link.lastError) + + def mapRoutes(self): + qmf = self.qmf + print + print "Finding Linked Brokers:" + + brokerList = {} + brokerList[self.local.name()] = self.broker + print " %s... Ok" % self.local + + added = True + while added: + added = False + links = qmf.getObjects(_class="link") for link in links: - print "%-16s%-8d %c %-18s%s" % (link.host, link.port, YN(link.durable), link.state, link.lastError) + url = BrokerURL("%s:%d" % (link.host, link.port)) + if url.name() not in brokerList: + print " %s..." % url.name(), + try: + b = qmf.addBroker("%s:%d" % (link.host, link.port), _connTimeout) + brokerList[url.name()] = b + added = True + print "Ok" + except Exception, e: + print e + + print + print "Dynamic Routes:" + bridges = qmf.getObjects(_class="bridge", dynamic=True) + fedExchanges = [] + for bridge in bridges: + if bridge.src not in fedExchanges: + fedExchanges.append(bridge.src) + if len(fedExchanges) == 0: + print " none found" + print + + for ex in fedExchanges: + print " Exchange %s:" % ex + pairs = [] + for bridge in bridges: + if bridge.src == ex: + link = bridge._linkRef_ + fromUrl = "%s:%s" % (link.host, link.port) + toUrl = bridge.getBroker().getUrl() + found = False + for pair in pairs: + if pair.matches(fromUrl, toUrl): + found = True + if not found: + pairs.append(RoutePair(fromUrl, toUrl)) + for pair in pairs: + print " %s" % pair + print - def AddRoute (self, srcBroker, exchange, routingKey, tag, excludes): - self.src = Broker (srcBroker) - mc = self.mclient + print "Static Routes:" + bridges = qmf.getObjects(_class="bridge", dynamic=False) + if len(bridges) == 0: + print " none found" + print - if self.dest.name() == self.src.name(): - print "Linking broker to itself is not permitted" - sys.exit(1) + for bridge in bridges: + link = bridge._linkRef_ + fromUrl = "%s:%s" % (link.host, link.port) + toUrl = bridge.getBroker().getUrl() + leftType = "ex" + rightType = "ex" + if bridge.srcIsLocal: + arrow = "=>" + left = bridge.src + right = bridge.dest + if bridge.srcIsQueue: + leftType = "queue" + else: + arrow = "<=" + left = bridge.dest + right = bridge.src + if bridge.srcIsQueue: + rightType = "queue" + + if bridge.srcIsQueue: + print " %s(%s=%s) %s %s(%s=%s)" % \ + (toUrl, leftType, left, arrow, fromUrl, rightType, right) + else: + print " %s(%s=%s) %s %s(%s=%s) key=%s" % \ + (toUrl, leftType, left, arrow, fromUrl, rightType, right, bridge.key) + print + + for broker in brokerList: + if broker != self.local.name(): + qmf.delBroker(brokerList[broker]) - brokers = mc.syncGetObjects (self.mch, "broker") - broker = brokers[0] - link = self.getLink () + def addRoute(self, remoteBroker, exchange, routingKey, tag, excludes, dynamic=False): + if dynamic and _srclocal: + raise Exception("--src-local is not permitted on dynamic routes") + + self.addLink(remoteBroker) + link = self.getLink() if link == None: - if _verbose: - print "Inter-broker link not found, creating..." - - connectArgs = {} - connectArgs["host"] = self.src.host - connectArgs["port"] = self.src.port - connectArgs["useSsl"] = False - connectArgs["durable"] = _durable - if self.src.username == "anonymous": - connectArgs["authMechanism"] = "ANONYMOUS" - else: - connectArgs["authMechanism"] = "PLAIN" - connectArgs["username"] = self.src.username - connectArgs["password"] = self.src.password - res = mc.syncCallMethod (self.mch, broker.id, broker.classKey, "connect", connectArgs) - if _verbose: - print "Connect method returned:", res.status, res.statusText - link = self.getLink () + raise Exception("Link failed to create") + bridges = self.qmf.getObjects(_class="bridge") + for bridge in bridges: + if bridge.linkRef == link.getObjectId() and \ + bridge.dest == exchange and bridge.key == routingKey and not bridge.srcIsQueue: + if not _quiet: + raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey)) + sys.exit(0) + + if _verbose: + print "Creating inter-broker binding..." + res = link.bridge(_durable, exchange, exchange, routingKey, tag, excludes, False, _srclocal, dynamic, _ack) + if res.status != 0: + raise Exception(res.text) + if _verbose: + print "Bridge method returned:", res.status, res.text + + def addQueueRoute(self, remoteBroker, exchange, queue): + self.addLink(remoteBroker) + link = self.getLink() if link == None: - print "Protocol Error - Missing link ID" - sys.exit (1) + raise Exception("Link failed to create") - bridges = mc.syncGetObjects (self.mch, "bridge") + bridges = self.qmf.getObjects(_class="bridge") for bridge in bridges: - if bridge.linkRef == link.id and bridge.dest == exchange and bridge.key == routingKey: + if bridge.linkRef == link.getObjectId() and \ + bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue: if not _quiet: - print "Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey) - sys.exit (1) - sys.exit (0) + raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, queue)) + sys.exit(0) if _verbose: print "Creating inter-broker binding..." - bridgeArgs = {} - bridgeArgs["durable"] = _durable - bridgeArgs["src"] = exchange - bridgeArgs["dest"] = exchange - bridgeArgs["key"] = routingKey - bridgeArgs["tag"] = tag - bridgeArgs["excludes"] = excludes - bridgeArgs["srcIsQueue"] = 0 - bridgeArgs["srcIsLocal"] = 0 - res = mc.syncCallMethod (self.mch, link.id, link.classKey, "bridge", bridgeArgs) - if res.status == 4: - print "Can't create a durable route on a non-durable link" - sys.exit(1) + res = link.bridge(_durable, queue, exchange, "", "", "", True, _srclocal, False, _ack) + if res.status != 0: + raise Exception(res.text) if _verbose: - print "Bridge method returned:", res.status, res.statusText + print "Bridge method returned:", res.status, res.text - def DelRoute (self, srcBroker, exchange, routingKey): - self.src = Broker (srcBroker) - mc = self.mclient + def delQueueRoute(self, remoteBroker, exchange, queue): + self.remote = BrokerURL(remoteBroker) + link = self.getLink() + if link == None: + if not _quiet: + raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name())) + sys.exit(0) + + bridges = self.qmf.getObjects(_class="bridge") + for bridge in bridges: + if bridge.linkRef == link.getObjectId() and \ + bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue: + if _verbose: + print "Closing bridge..." + res = bridge.close() + if res.status != 0: + raise Exception("Error closing bridge: %d - %s" % (res.status, res.text)) + if len(bridges) == 1 and _dellink: + link = self.getLink() + if link == None: + sys.exit(0) + if _verbose: + print "Last bridge on link, closing link..." + res = link.close() + if res.status != 0: + raise Exception("Error closing link: %d - %s" % (res.status, res.text)) + sys.exit(0) + if not _quiet: + raise Exception("Route not found") - link = self.getLink () + def delRoute(self, remoteBroker, exchange, routingKey, dynamic=False): + self.remote = BrokerURL(remoteBroker) + link = self.getLink() if link == None: if not _quiet: - print "No link found from %s to %s" % (self.src.name(), self.dest.name()) - sys.exit (1) - sys.exit (0) + raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name())) + sys.exit(0) - bridges = mc.syncGetObjects (self.mch, "bridge") + bridges = self.qmf.getObjects(_class="bridge") for bridge in bridges: - if bridge.linkRef == link.id and bridge.dest == exchange and bridge.key == routingKey: + if bridge.linkRef == link.getObjectId() and bridge.dest == exchange and bridge.key == routingKey \ + and bridge.dynamic == dynamic: if _verbose: print "Closing bridge..." - res = mc.syncCallMethod (self.mch, bridge.id, bridge.classKey, "close") + res = bridge.close() if res.status != 0: - print "Error closing bridge: %d - %s" % (res.status, res.statusText) - sys.exit (1) - if len (bridges) == 1 and _dellink: - link = self.getLink () + raise Exception("Error closing bridge: %d - %s" % (res.status, res.text)) + if len(bridges) == 1 and _dellink: + link = self.getLink() if link == None: - sys.exit (0) + sys.exit(0) if _verbose: print "Last bridge on link, closing link..." - res = mc.syncCallMethod (self.mch, link.id, link.classKey, "close") + res = link.close() if res.status != 0: - print "Error closing link: %d - %s" % (res.status, res.statusText) - sys.exit (1) - sys.exit (0) + raise Exception("Error closing link: %d - %s" % (res.status, res.text)) + sys.exit(0) if not _quiet: - print "Route not found" - sys.exit (1) + raise Exception("Route not found") - def ListRoutes (self): - mc = self.mclient - links = mc.syncGetObjects (self.mch, "link") - bridges = mc.syncGetObjects (self.mch, "bridge") + def listRoutes(self): + links = self.qmf.getObjects(_class="link") + bridges = self.qmf.getObjects(_class="bridge") for bridge in bridges: myLink = None for link in links: - if bridge.linkRef == link.id: + if bridge.linkRef == link.getObjectId(): myLink = link break if myLink != None: - print "%s %s:%d %s %s" % (self.dest.name(), myLink.host, myLink.port, bridge.dest, bridge.key) + if bridge.dynamic: + keyText = "<dynamic>" + else: + keyText = bridge.key + print "%s %s:%d %s %s" % (self.local.name(), myLink.host, myLink.port, bridge.dest, keyText) - def ClearAllRoutes (self): - mc = self.mclient - links = mc.syncGetObjects (self.mch, "link") - bridges = mc.syncGetObjects (self.mch, "bridge") + def clearAllRoutes(self): + links = self.qmf.getObjects(_class="link") + bridges = self.qmf.getObjects(_class="bridge") for bridge in bridges: if _verbose: myLink = None for link in links: - if bridge.linkRef == link.id: + if bridge.linkRef == link.getObjectId(): myLink = link break if myLink != None: print "Deleting Bridge: %s:%d %s %s... " % (myLink.host, myLink.port, bridge.dest, bridge.key), - res = mc.syncCallMethod (self.mch, bridge.id, bridge.classKey, "close") + res = bridge.close() if res.status != 0: - print "Error: %d - %s" % (res.status, res.statusText) + print "Error: %d - %s" % (res.status, res.text) elif _verbose: print "Ok" if _dellink: - links = mc.syncGetObjects (self.mch, "link") + links = self.qmf.getObjects(_class="link") for link in links: if _verbose: print "Deleting Link: %s:%d... " % (link.host, link.port), - res = mc.syncCallMethod (self.mch, link.id, link.classKey, "close") + res = link.close() if res.status != 0: - print "Error: %d - %s" % (res.status, res.statusText) + print "Error: %d - %s" % (res.status, res.text) elif _verbose: print "Ok" +class RoutePair: + def __init__(self, fromUrl, toUrl): + self.fromUrl = fromUrl + self.toUrl = toUrl + self.bidir = False + + def __repr__(self): + if self.bidir: + delimit = "<=>" + else: + delimit = " =>" + return "%s %s %s" % (self.fromUrl, delimit, self.toUrl) + + def matches(self, fromUrl, toUrl): + if fromUrl == self.fromUrl and toUrl == self.toUrl: + return True + if toUrl == self.fromUrl and fromUrl == self.toUrl: + self.bidir = True + return True + return False + + def YN(val): if val == 1: return 'Y' @@ -306,12 +405,22 @@ def YN(val): ## try: - longOpts = ("verbose", "quiet", "durable", "del-empty-link") - (optlist, cargs) = getopt.gnu_getopt (sys.argv[1:], "vqde", longOpts) + longOpts = ("verbose", "quiet", "durable", "del-empty-link", "src-local", "transport=", "ack=", "timeout=") + (optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "vqdest:", longOpts) except: - Usage () + Usage() + +try: + encoding = locale.getpreferredencoding() + cargs = [a.decode(encoding) for a in encArgs] +except: + cargs = encArgs for opt in optlist: + if opt[0] == "--timeout": + _connTimeout = int(opt[1]) + if _connTimeout == 0: + _connTimeout = None if opt[0] == "-v" or opt[0] == "--verbose": _verbose = True if opt[0] == "-q" or opt[0] == "--quiet": @@ -320,52 +429,96 @@ for opt in optlist: _durable = True if opt[0] == "-e" or opt[0] == "--del-empty-link": _dellink = True - -nargs = len (cargs) + if opt[0] == "-s" or opt[0] == "--src-local": + _srclocal = True + if opt[0] == "-t" or opt[0] == "--transport": + _transport = opt[1] + if opt[0] == "--ack": + _ack = int(opt[1]) + +nargs = len(cargs) if nargs < 2: - Usage () + Usage() if nargs == 2: - destBroker = "localhost" + localBroker = "localhost" else: - destBroker = cargs[2] + if _srclocal: + localBroker = cargs[3] + remoteBroker = cargs[2] + else: + localBroker = cargs[2] + if nargs > 3: + remoteBroker = cargs[3] group = cargs[0] cmd = cargs[1] -rm = RouteManager (destBroker) -rm.ConnectToBroker () -if group == "link": - if cmd == "add": - if nargs != 4: - Usage() - rm.AddLink (cargs[3]) - elif cmd == "del": - if nargs != 4: - Usage() - rm.DelLink (cargs[3]) - elif cmd == "list": - rm.ListLinks () - -elif group == "route": - if cmd == "add": - if nargs < 6 or nargs > 8: - Usage () - - tag = "" - excludes = "" - if nargs > 6: tag = cargs[6] - if nargs > 7: excludes = cargs[7] - rm.AddRoute (cargs[3], cargs[4], cargs[5], tag, excludes) - elif cmd == "del": - if nargs != 6: - Usage () +try: + rm = RouteManager(localBroker) + if group == "link": + if cmd == "add": + if nargs != 4: + Usage() + rm.addLink(remoteBroker) + elif cmd == "del": + if nargs != 4: + Usage() + rm.delLink(remoteBroker) + elif cmd == "list": + rm.listLinks() + + elif group == "dynamic": + if cmd == "add": + if nargs < 5 or nargs > 7: + Usage() + + tag = "" + excludes = "" + if nargs > 5: tag = cargs[5] + if nargs > 6: excludes = cargs[6] + rm.addRoute(remoteBroker, cargs[4], "", tag, excludes, dynamic=True) + elif cmd == "del": + if nargs != 5: + Usage() + else: + rm.delRoute(remoteBroker, cargs[4], "", dynamic=True) + + elif group == "route": + if cmd == "add": + if nargs < 6 or nargs > 8: + Usage() + + tag = "" + excludes = "" + if nargs > 6: tag = cargs[6] + if nargs > 7: excludes = cargs[7] + rm.addRoute(remoteBroker, cargs[4], cargs[5], tag, excludes, dynamic=False) + elif cmd == "del": + if nargs != 6: + Usage() + rm.delRoute(remoteBroker, cargs[4], cargs[5], dynamic=False) + elif cmd == "map": + rm.mapRoutes() else: - rm.DelRoute (cargs[3], cargs[4], cargs[5]) - else: - if cmd == "list": - rm.ListRoutes () - elif cmd == "flush": - rm.ClearAllRoutes () + if cmd == "list": + rm.listRoutes() + elif cmd == "flush": + rm.clearAllRoutes() + else: + Usage() + + elif group == "queue": + if nargs != 6: + Usage() + if cmd == "add": + rm.addQueueRoute(remoteBroker, exchange=cargs[4], queue=cargs[5]) + elif cmd == "del": + rm.delQueueRoute(remoteBroker, exchange=cargs[4], queue=cargs[5]) else: - Usage () -rm.Disconnect () + Usage() + +except Exception,e: + print "Failed: %s - %s" % (e.__class__.__name__, e) + sys.exit(1) + +rm.disconnect() diff --git a/python/commands/qpid-stat b/python/commands/qpid-stat new file mode 100755 index 0000000000..29deeb2342 --- /dev/null +++ b/python/commands/qpid-stat @@ -0,0 +1,460 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os +import getopt +import sys +import locale +import socket +import re +from qmf.console import Session, Console +from qpid.disp import Display, Header, Sorter + +_host = "localhost" +_connTimeout = 10 +_types = "" +_limit = 50 +_increasing = False +_sortcol = None +pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$") + +def Usage (): + print "Usage: qpid-stat [OPTIONS] [broker-addr]" + print + print " broker-addr is in the form: [username/password@] hostname | ip-address [:<port>]" + print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost" + print + print "General Options:" + print " --timeout seconds (10) Maximum time to wait for broker connection" +# print " -n [--numeric] Don't resolve names" + print + print "Display Options:" + print + print " -b Show Brokers" + print " -c Show Connections" +# print " -s Show Sessions" + print " -e Show Exchanges" + print " -q Show Queues" + print + print " -S [--sort-by] COLNAME Sort by column name" + print " -I [--increasing] Sort by increasing value (default = decreasing)" + print " -L [--limit] NUM Limit output to NUM rows (default = 50)" + print + sys.exit (1) + +class IpAddr: + def __init__(self, text): + if text.find("@") != -1: + tokens = text.split("@") + text = tokens[1] + if text.find(":") != -1: + tokens = text.split(":") + text = tokens[0] + self.port = int(tokens[1]) + else: + self.port = 5672 + self.dottedQuad = socket.gethostbyname(text) + nums = self.dottedQuad.split(".") + self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3]) + + def bestAddr(self, addrPortList): + bestDiff = 0xFFFFFFFFL + bestAddr = None + for addrPort in addrPortList: + diff = IpAddr(addrPort[0]).addr ^ self.addr + if diff < bestDiff: + bestDiff = diff + bestAddr = addrPort + return bestAddr + +class Broker(object): + def __init__(self, qmf, broker): + self.broker = broker + + agents = qmf.getAgents() + for a in agents: + if a.getAgentBank() == 0: + self.brokerAgent = a + + bobj = qmf.getObjects(_class="broker", _package="org.apache.qpid.broker", _agent=self.brokerAgent)[0] + self.currentTime = bobj.getTimestamps()[0] + try: + self.uptime = bobj.uptime + except: + self.uptime = 0 + self.connections = {} + self.sessions = {} + self.exchanges = {} + self.queues = {} + package = "org.apache.qpid.broker" + + list = qmf.getObjects(_class="connection", _package=package, _agent=self.brokerAgent) + for conn in list: + if pattern.match(conn.address): + self.connections[conn.getObjectId()] = conn + + list = qmf.getObjects(_class="session", _package=package, _agent=self.brokerAgent) + for sess in list: + if sess.connectionRef in self.connections: + self.sessions[sess.getObjectId()] = sess + + list = qmf.getObjects(_class="exchange", _package=package, _agent=self.brokerAgent) + for exchange in list: + self.exchanges[exchange.getObjectId()] = exchange + + list = qmf.getObjects(_class="queue", _package=package, _agent=self.brokerAgent) + for queue in list: + self.queues[queue.getObjectId()] = queue + + def getName(self): + return self.broker.getUrl() + + def getCurrentTime(self): + return self.currentTime + + def getUptime(self): + return self.uptime + +class BrokerManager(Console): + def __init__(self): + self.brokerName = None + self.qmf = None + self.broker = None + self.brokers = [] + self.cluster = None + + def SetBroker(self, brokerUrl): + self.url = brokerUrl + self.qmf = Session() + self.broker = self.qmf.addBroker(brokerUrl, _connTimeout) + agents = self.qmf.getAgents() + for a in agents: + if a.getAgentBank() == 0: + self.brokerAgent = a + + def Disconnect(self): + if self.broker: + self.qmf.delBroker(self.broker) + + def _getCluster(self): + packages = self.qmf.getPackages() + if "org.apache.qpid.cluster" not in packages: + return None + + clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent) + if len(clusters) == 0: + print "Clustering is installed but not enabled on the broker." + return None + + self.cluster = clusters[0] + + def _getHostList(self, urlList): + hosts = [] + hostAddr = IpAddr(_host) + for url in urlList: + if url.find("amqp:") != 0: + raise Exception("Invalid URL 1") + url = url[5:] + addrs = str(url).split(",") + addrList = [] + for addr in addrs: + tokens = addr.split(":") + if len(tokens) != 3: + raise Exception("Invalid URL 2") + addrList.append((tokens[1], tokens[2])) + + # Find the address in the list that is most likely to be in the same subnet as the address + # with which we made the original QMF connection. This increases the probability that we will + # be able to reach the cluster member. + + best = hostAddr.bestAddr(addrList) + bestUrl = best[0] + ":" + best[1] + hosts.append(bestUrl) + return hosts + + def displaySubs(self, subs, indent, broker=None, conn=None, sess=None, exchange=None, queue=None): + if len(subs) == 0: + return + this = subs[0] + remaining = subs[1:] + newindent = indent + " " + if this == 'b': + pass + elif this == 'c': + if broker: + for oid in broker.connections: + iconn = broker.connections[oid] + self.printConnSub(indent, broker.getName(), iconn) + self.displaySubs(remaining, newindent, broker=broker, conn=iconn, + sess=sess, exchange=exchange, queue=queue) + elif this == 's': + pass + elif this == 'e': + pass + elif this == 'q': + pass + print + + def displayBroker(self, subs): + disp = Display(prefix=" ") + heads = [] + heads.append(Header('broker')) + heads.append(Header('cluster')) + heads.append(Header('uptime', Header.DURATION)) + heads.append(Header('conn', Header.KMG)) + heads.append(Header('sess', Header.KMG)) + heads.append(Header('exch', Header.KMG)) + heads.append(Header('queue', Header.KMG)) + rows = [] + for broker in self.brokers: + if self.cluster: + ctext = "%s(%s)" % (self.cluster.clusterName, self.cluster.status) + else: + ctext = "<standalone>" + row = (broker.getName(), ctext, broker.getUptime(), + len(broker.connections), len(broker.sessions), + len(broker.exchanges), len(broker.queues)) + rows.append(row) + title = "Brokers" + if _sortcol: + sorter = Sorter(heads, rows, _sortcol, _limit, _increasing) + dispRows = sorter.getSorted() + else: + dispRows = rows + disp.formattedTable(title, heads, dispRows) + + def displayConn(self, subs): + disp = Display(prefix=" ") + heads = [] + if self.cluster: + heads.append(Header('broker')) + heads.append(Header('client-addr')) + heads.append(Header('cproc')) + heads.append(Header('cpid')) + heads.append(Header('auth')) + heads.append(Header('connected', Header.DURATION)) + heads.append(Header('idle', Header.DURATION)) + heads.append(Header('msgIn', Header.KMG)) + heads.append(Header('msgOut', Header.KMG)) + rows = [] + for broker in self.brokers: + for oid in broker.connections: + conn = broker.connections[oid] + row = [] + if self.cluster: + row.append(broker.getName()) + row.append(conn.address) + row.append(conn.remoteProcessName) + row.append(conn.remotePid) + row.append(conn.authIdentity) + row.append(broker.getCurrentTime() - conn.getTimestamps()[1]) + idle = broker.getCurrentTime() - conn.getTimestamps()[0] + row.append(broker.getCurrentTime() - conn.getTimestamps()[0]) + row.append(conn.framesFromClient) + row.append(conn.framesToClient) + rows.append(row) + title = "Connections" + if self.cluster: + title += " for cluster '%s'" % self.cluster.clusterName + if _sortcol: + sorter = Sorter(heads, rows, _sortcol, _limit, _increasing) + dispRows = sorter.getSorted() + else: + dispRows = rows + disp.formattedTable(title, heads, dispRows) + + def displaySession(self, subs): + disp = Display(prefix=" ") + + def displayExchange(self, subs): + disp = Display(prefix=" ") + heads = [] + if self.cluster: + heads.append(Header('broker')) + heads.append(Header("exchange")) + heads.append(Header("type")) + heads.append(Header("dur", Header.Y)) + heads.append(Header("bind", Header.KMG)) + heads.append(Header("msgIn", Header.KMG)) + heads.append(Header("msgOut", Header.KMG)) + heads.append(Header("msgDrop", Header.KMG)) + heads.append(Header("byteIn", Header.KMG)) + heads.append(Header("byteOut", Header.KMG)) + heads.append(Header("byteDrop", Header.KMG)) + rows = [] + for broker in self.brokers: + for oid in broker.exchanges: + ex = broker.exchanges[oid] + row = [] + if self.cluster: + row.append(broker.getName()) + row.append(ex.name) + row.append(ex.type) + row.append(ex.durable) + row.append(ex.bindingCount) + row.append(ex.msgReceives) + row.append(ex.msgRoutes) + row.append(ex.msgDrops) + row.append(ex.byteReceives) + row.append(ex.byteRoutes) + row.append(ex.byteDrops) + rows.append(row) + title = "Exchanges" + if self.cluster: + title += " for cluster '%s'" % self.cluster.clusterName + if _sortcol: + sorter = Sorter(heads, rows, _sortcol, _limit, _increasing) + dispRows = sorter.getSorted() + else: + dispRows = rows + disp.formattedTable(title, heads, dispRows) + + def displayQueue(self, subs): + disp = Display(prefix=" ") + heads = [] + if self.cluster: + heads.append(Header('broker')) + heads.append(Header("queue")) + heads.append(Header("dur", Header.Y)) + heads.append(Header("autoDel", Header.Y)) + heads.append(Header("excl", Header.Y)) + heads.append(Header("msg", Header.KMG)) + heads.append(Header("msgIn", Header.KMG)) + heads.append(Header("msgOut", Header.KMG)) + heads.append(Header("bytes", Header.KMG)) + heads.append(Header("bytesIn", Header.KMG)) + heads.append(Header("bytesOut", Header.KMG)) + heads.append(Header("cons", Header.KMG)) + heads.append(Header("bind", Header.KMG)) + rows = [] + for broker in self.brokers: + for oid in broker.queues: + q = broker.queues[oid] + row = [] + if self.cluster: + row.append(broker.getName()) + row.append(q.name) + row.append(q.durable) + row.append(q.autoDelete) + row.append(q.exclusive) + row.append(q.msgDepth) + row.append(q.msgTotalEnqueues) + row.append(q.msgTotalDequeues) + row.append(q.byteDepth) + row.append(q.byteTotalEnqueues) + row.append(q.byteTotalDequeues) + row.append(q.consumerCount) + row.append(q.bindingCount) + rows.append(row) + title = "Queues" + if self.cluster: + title += " for cluster '%s'" % self.cluster.clusterName + if _sortcol: + sorter = Sorter(heads, rows, _sortcol, _limit, _increasing) + dispRows = sorter.getSorted() + else: + dispRows = rows + disp.formattedTable(title, heads, dispRows) + + def displayMain(self, main, subs): + if main == 'b': self.displayBroker(subs) + elif main == 'c': self.displayConn(subs) + elif main == 's': self.displaySession(subs) + elif main == 'e': self.displayExchange(subs) + elif main == 'q': self.displayQueue(subs) + + def display(self): + self._getCluster() + if self.cluster: + memberList = self.cluster.members.split(";") + hostList = self._getHostList(memberList) + self.qmf.delBroker(self.broker) + self.broker = None + if _host.find("@") > 0: + authString = _host.split("@")[0] + "@" + else: + authString = "" + for host in hostList: + b = self.qmf.addBroker(authString + host, _connTimeout) + self.brokers.append(Broker(self.qmf, b)) + else: + self.brokers.append(Broker(self.qmf, self.broker)) + + self.displayMain(_types[0], _types[1:]) + + +## +## Main Program +## + +try: + longOpts = ("top", "numeric", "sort-by=", "limit=", "increasing", "timeout=") + (optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "bceqS:L:I", longOpts) +except: + Usage() + +try: + encoding = locale.getpreferredencoding() + cargs = [a.decode(encoding) for a in encArgs] +except: + cargs = encArgs + +for opt in optlist: + if opt[0] == "--timeout": + _connTimeout = int(opt[1]) + if _connTimeout == 0: + _connTimeout = None + elif opt[0] == "-n" or opt[0] == "--numeric": + _numeric = True + elif opt[0] == "-S" or opt[0] == "--sort-by": + _sortcol = opt[1] + elif opt[0] == "-I" or opt[0] == "--increasing": + _increasing = True + elif opt[0] == "-L" or opt[0] == "--limit": + _limit = int(opt[1]) + elif len(opt[0]) == 2: + char = opt[0][1] + if "bcseq".find(char) != -1: + _types += char + else: + Usage() + else: + Usage() + +if len(_types) == 0: + Usage() + +nargs = len(cargs) +bm = BrokerManager() + +if nargs == 1: + _host = cargs[0] + +try: + bm.SetBroker(_host) + bm.display() +except KeyboardInterrupt: + print +except Exception,e: + print "Failed: %s - %s" % (e.__class__.__name__, e) + sys.exit(1) + +bm.Disconnect() diff --git a/python/commands/qpid-tool b/python/commands/qpid-tool index 60535c253b..05afcc9732 100755 --- a/python/commands/qpid-tool +++ b/python/commands/qpid-tool @@ -24,7 +24,7 @@ import getopt import sys import socket from cmd import Cmd -from qpid.connection import ConnectionFailed +from qpid.connection import ConnectionFailed, Timeout from qpid.managementdata import ManagementData from shlex import split from qpid.disp import Display @@ -148,7 +148,7 @@ class Mcli (Cmd): self.dataObject.close () def Usage (): - print "Usage: qpid-tool [<target-host[:<tcp-port>]]" + print "Usage: qpid-tool [[<username>/<password>@]<target-host>[:<tcp-port>]]" print sys.exit (1) @@ -183,6 +183,8 @@ except ConnectionFailed, e: except Exception, e: if str(e).find ("Exchange not found") != -1: print "Management not enabled on broker: Use '-m yes' option on broker startup." + else: + print "Failed: %s - %s" % (e.__class__.__name__, e) sys.exit(1) # Instantiate the CLI interpreter and launch it. |
