diff options
Diffstat (limited to 'RC9/qpid/python/commands/qpid-config')
| -rwxr-xr-x | RC9/qpid/python/commands/qpid-config | 392 |
1 files changed, 392 insertions, 0 deletions
diff --git a/RC9/qpid/python/commands/qpid-config b/RC9/qpid/python/commands/qpid-config new file mode 100755 index 0000000000..ff3c7db46e --- /dev/null +++ b/RC9/qpid/python/commands/qpid-config @@ -0,0 +1,392 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os +import getopt +import sys +import locale +from qmf.console import Session + +_recursive = False +_host = "localhost" +_durable = False +_clusterDurable = False +_fileCount = 8 +_fileSize = 24 +_maxQueueSize = None +_maxQueueCount = None +_policyType = None +_lvq = False +_msgSequence = False +_ive = False + +FILECOUNT = "qpid.file_count" +FILESIZE = "qpid.file_size" +MAX_QUEUE_SIZE = "qpid.max_size" +MAX_QUEUE_COUNT = "qpid.max_count" +POLICY_TYPE = "qpid.policy_type" +CLUSTER_DURABLE = "qpid.persist_last_node" +LVQ = "qpid.last_value_queue" +MSG_SEQUENCE = "qpid.msg_sequence" +IVE = "qpid.ive" + +def Usage (): + print "Usage: qpid-config [OPTIONS]" + print " qpid-config [OPTIONS] exchanges [filter-string]" + print " qpid-config [OPTIONS] queues [filter-string]" + print " qpid-config [OPTIONS] add exchange <type> <name> [AddExchangeOptions]" + print " qpid-config [OPTIONS] del exchange <name>" + print " qpid-config [OPTIONS] add queue <name> [AddQueueOptions]" + print " qpid-config [OPTIONS] del queue <name>" + print " qpid-config [OPTIONS] bind <exchange-name> <queue-name> [binding-key]" + print " qpid-config [OPTIONS] unbind <exchange-name> <queue-name> [binding-key]" + print + print "Options:" + print " -b [ --bindings ] Show bindings in queue or exchange list" + print " -a [ --broker-addr ] Address (localhost) Address of qpidd broker" + print " broker-addr is in the form: [username/password@] hostname | ip-address [:<port>]" + print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost" + print + print "Add Queue Options:" + print " --durable Queue is durable" + print " --cluster-durable Queue becomes durable if there is only one functioning cluster node" + print " --file-count N (8) Number of files in queue's persistence journal" + print " --file-size N (24) File size in pages (64Kib/page)" + print " --max-queue-size N Maximum in-memory queue size as bytes" + print " --max-queue-count N Maximum in-memory queue size as a number of messages" + print " --policy-type TYPE Action taken when queue limit is reached (reject, flow_to_disk, ring, ring_strict)" + print " --last-value-queue Enable LVQ behavior on the queue" + print + print "Add Exchange Options:" + print " --durable Exchange is durable" + print " --sequence Exchange will insert a 'qpid.msg_sequence' field in the message header" + print " with a value that increments for each message forwarded." + print " --ive Exchange will behave as an 'initial-value-exchange', keeping a reference" + print " to the last message forwarded and enqueuing that message to newly bound" + print " queues." + print + sys.exit (1) + +class BrokerManager: + def __init__ (self): + self.brokerName = None + self.qmf = None + self.broker = None + + def SetBroker (self, brokerUrl): + self.url = brokerUrl + self.qmf = Session() + self.broker = self.qmf.addBroker(brokerUrl) + agents = self.qmf.getAgents() + for a in agents: + if a.getAgentBank() == 0: + self.brokerAgent = a + + def Disconnect(self): + if self.broker: + self.qmf.delBroker(self.broker) + + def Overview (self): + exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) + queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) + print "Total Exchanges: %d" % len (exchanges) + etype = {} + for ex in exchanges: + if ex.type not in etype: + etype[ex.type] = 1 + else: + etype[ex.type] = etype[ex.type] + 1 + for typ in etype: + print "%15s: %d" % (typ, etype[typ]) + + print + print " Total Queues: %d" % len (queues) + _durable = 0 + for queue in queues: + if queue.durable: + _durable = _durable + 1 + print " durable: %d" % _durable + print " non-durable: %d" % (len (queues) - _durable) + + def ExchangeList (self, filter): + exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) + caption1 = "Type " + caption2 = "Exchange Name" + maxNameLen = len(caption2) + for ex in exchanges: + if self.match(ex.name, filter): + if len(ex.name) > maxNameLen: maxNameLen = len(ex.name) + print "%s%-*s Attributes" % (caption1, maxNameLen, caption2) + line = "" + for i in range(((maxNameLen + len(caption1)) / 5) + 5): + line += "=====" + print line + + for ex in exchanges: + if self.match (ex.name, filter): + print "%-10s%-*s " % (ex.type, maxNameLen, ex.name), + args = ex.arguments + if ex.durable: print "--durable", + if MSG_SEQUENCE in args and args[MSG_SEQUENCE] == 1: print "--sequence", + if IVE in args and args[IVE] == 1: print "--ive", + print + + def ExchangeListRecurse (self, filter): + exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) + bindings = self.qmf.getObjects(_class="binding", _agent=self.brokerAgent) + queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) + for ex in exchanges: + if self.match (ex.name, filter): + print "Exchange '%s' (%s)" % (ex.name, ex.type) + for bind in bindings: + if bind.exchangeRef == ex.getObjectId(): + qname = "<unknown>" + queue = self.findById (queues, bind.queueRef) + if queue != None: + qname = queue.name + print " bind [%s] => %s" % (bind.bindingKey, qname) + + + def QueueList (self, filter): + queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) + + caption = "Queue Name" + maxNameLen = len(caption) + for q in queues: + if self.match (q.name, filter): + if len(q.name) > maxNameLen: maxNameLen = len(q.name) + print "%-*s Attributes" % (maxNameLen, caption) + line = "" + for i in range((maxNameLen / 5) + 5): + line += "=====" + print line + + for q in queues: + if self.match (q.name, filter): + print "%-*s " % (maxNameLen, q.name), + args = q.arguments + if q.durable: print "--durable", + if CLUSTER_DURABLE in args and args[CLUSTER_DURABLE] == 1: print "--cluster-durable", + if q.autoDelete: print "auto-del", + if q.exclusive: print "excl", + if FILESIZE in args: print "--file-size=%d" % args[FILESIZE], + if FILECOUNT in args: print "--file-count=%d" % args[FILECOUNT], + if MAX_QUEUE_SIZE in args: print "--max-queue-size=%d" % args[MAX_QUEUE_SIZE], + if MAX_QUEUE_COUNT in args: print "--max-queue-count=%d" % args[MAX_QUEUE_COUNT], + if POLICY_TYPE in args: print "--policy-type=%s" % args[POLICY_TYPE], + if LVQ in args and args[LVQ] == 1: print "--last-value-queue", + print + + def QueueListRecurse (self, filter): + exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) + bindings = self.qmf.getObjects(_class="binding", _agent=self.brokerAgent) + queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) + for queue in queues: + if self.match (queue.name, filter): + print "Queue '%s'" % queue.name + for bind in bindings: + if bind.queueRef == queue.getObjectId(): + ename = "<unknown>" + ex = self.findById (exchanges, bind.exchangeRef) + if ex != None: + ename = ex.name + if ename == "": + ename = "''" + print " bind [%s] => %s" % (bind.bindingKey, ename) + + def AddExchange (self, args): + if len (args) < 2: + Usage () + etype = args[0] + ename = args[1] + declArgs = {} + if _msgSequence: + declArgs[MSG_SEQUENCE] = 1 + if _ive: + declArgs[IVE] = 1 + self.broker.getAmqpSession().exchange_declare (exchange=ename, type=etype, durable=_durable, arguments=declArgs) + + def DelExchange (self, args): + if len (args) < 1: + Usage () + ename = args[0] + self.broker.getAmqpSession().exchange_delete (exchange=ename) + + def AddQueue (self, args): + if len (args) < 1: + Usage () + qname = args[0] + declArgs = {} + if _durable: + declArgs[FILECOUNT] = _fileCount + declArgs[FILESIZE] = _fileSize + + if _maxQueueSize: + declArgs[MAX_QUEUE_SIZE] = _maxQueueSize + if _maxQueueCount: + declArgs[MAX_QUEUE_COUNT] = _maxQueueCount + if _policyType: + declArgs[POLICY_TYPE] = _policyType + if _clusterDurable: + declArgs[CLUSTER_DURABLE] = 1 + if _lvq: + declArgs[LVQ] = 1 + + self.broker.getAmqpSession().queue_declare (queue=qname, durable=_durable, arguments=declArgs) + + def DelQueue (self, args): + if len (args) < 1: + Usage () + qname = args[0] + self.broker.getAmqpSession().queue_delete (queue=qname) + + def Bind (self, args): + if len (args) < 2: + Usage () + ename = args[0] + qname = args[1] + key = "" + if len (args) > 2: + key = args[2] + self.broker.getAmqpSession().exchange_bind (queue=qname, exchange=ename, binding_key=key) + + def Unbind (self, args): + if len (args) < 2: + Usage () + ename = args[0] + qname = args[1] + key = "" + if len (args) > 2: + key = args[2] + self.broker.getAmqpSession().exchange_unbind (queue=qname, exchange=ename, binding_key=key) + + def findById (self, items, id): + for item in items: + if item.getObjectId() == id: + return item + return None + + def match (self, name, filter): + if filter == "": + return True + if name.find (filter) == -1: + return False + return True + +def YN (bool): + if bool: + return 'Y' + return 'N' + + +## +## Main Program +## + +try: + longOpts = ("durable", "cluster-durable", "bindings", "broker-addr=", "file-count=", + "file-size=", "max-queue-size=", "max-queue-count=", "policy-type=", + "last-value-queue", "sequence", "ive") + (optlist, encArgs) = getopt.gnu_getopt (sys.argv[1:], "a:b", longOpts) +except: + Usage () + +try: + encoding = locale.getpreferredencoding() + cargs = [a.decode(encoding) for a in encArgs] +except: + cargs = encArgs + +for opt in optlist: + if opt[0] == "-b" or opt[0] == "--bindings": + _recursive = True + if opt[0] == "-a" or opt[0] == "--broker-addr": + _host = opt[1] + if opt[0] == "--durable": + _durable = True + if opt[0] == "--cluster-durable": + _clusterDurable = True + if opt[0] == "--file-count": + _fileCount = int (opt[1]) + if opt[0] == "--file-size": + _fileSize = int (opt[1]) + if opt[0] == "--max-queue-size": + _maxQueueSize = int (opt[1]) + if opt[0] == "--max-queue-count": + _maxQueueCount = int (opt[1]) + if opt[0] == "--policy-type": + _policyType = opt[1] + if opt[0] == "--last-value-queue": + _lvq = True + if opt[0] == "--sequence": + _msgSequence = True + if opt[0] == "--ive": + _ive = True + +nargs = len (cargs) +bm = BrokerManager () + +try: + bm.SetBroker(_host) + if nargs == 0: + bm.Overview () + else: + cmd = cargs[0] + modifier = "" + if nargs > 1: + modifier = cargs[1] + if cmd == "exchanges": + if _recursive: + bm.ExchangeListRecurse (modifier) + else: + bm.ExchangeList (modifier) + elif cmd == "queues": + if _recursive: + bm.QueueListRecurse (modifier) + else: + bm.QueueList (modifier) + elif cmd == "add": + if modifier == "exchange": + bm.AddExchange (cargs[2:]) + elif modifier == "queue": + bm.AddQueue (cargs[2:]) + else: + Usage () + elif cmd == "del": + if modifier == "exchange": + bm.DelExchange (cargs[2:]) + elif modifier == "queue": + bm.DelQueue (cargs[2:]) + else: + Usage () + elif cmd == "bind": + bm.Bind (cargs[1:]) + elif cmd == "unbind": + bm.Unbind (cargs[1:]) + else: + Usage () +except KeyboardInterrupt: + print +except Exception,e: + print "Failed:", e.args + sys.exit(1) + +bm.Disconnect() |
