summaryrefslogtreecommitdiff
path: root/RC9/qpid/python/commands/qpid-config
diff options
context:
space:
mode:
Diffstat (limited to 'RC9/qpid/python/commands/qpid-config')
-rwxr-xr-xRC9/qpid/python/commands/qpid-config392
1 files changed, 392 insertions, 0 deletions
diff --git a/RC9/qpid/python/commands/qpid-config b/RC9/qpid/python/commands/qpid-config
new file mode 100755
index 0000000000..ff3c7db46e
--- /dev/null
+++ b/RC9/qpid/python/commands/qpid-config
@@ -0,0 +1,392 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+import getopt
+import sys
+import locale
+from qmf.console import Session
+
+_recursive = False
+_host = "localhost"
+_durable = False
+_clusterDurable = False
+_fileCount = 8
+_fileSize = 24
+_maxQueueSize = None
+_maxQueueCount = None
+_policyType = None
+_lvq = False
+_msgSequence = False
+_ive = False
+
+FILECOUNT = "qpid.file_count"
+FILESIZE = "qpid.file_size"
+MAX_QUEUE_SIZE = "qpid.max_size"
+MAX_QUEUE_COUNT = "qpid.max_count"
+POLICY_TYPE = "qpid.policy_type"
+CLUSTER_DURABLE = "qpid.persist_last_node"
+LVQ = "qpid.last_value_queue"
+MSG_SEQUENCE = "qpid.msg_sequence"
+IVE = "qpid.ive"
+
+def Usage ():
+ print "Usage: qpid-config [OPTIONS]"
+ print " qpid-config [OPTIONS] exchanges [filter-string]"
+ print " qpid-config [OPTIONS] queues [filter-string]"
+ print " qpid-config [OPTIONS] add exchange <type> <name> [AddExchangeOptions]"
+ print " qpid-config [OPTIONS] del exchange <name>"
+ print " qpid-config [OPTIONS] add queue <name> [AddQueueOptions]"
+ print " qpid-config [OPTIONS] del queue <name>"
+ print " qpid-config [OPTIONS] bind <exchange-name> <queue-name> [binding-key]"
+ print " qpid-config [OPTIONS] unbind <exchange-name> <queue-name> [binding-key]"
+ print
+ print "Options:"
+ print " -b [ --bindings ] Show bindings in queue or exchange list"
+ print " -a [ --broker-addr ] Address (localhost) Address of qpidd broker"
+ print " broker-addr is in the form: [username/password@] hostname | ip-address [:<port>]"
+ print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost"
+ print
+ print "Add Queue Options:"
+ print " --durable Queue is durable"
+ print " --cluster-durable Queue becomes durable if there is only one functioning cluster node"
+ print " --file-count N (8) Number of files in queue's persistence journal"
+ print " --file-size N (24) File size in pages (64Kib/page)"
+ print " --max-queue-size N Maximum in-memory queue size as bytes"
+ print " --max-queue-count N Maximum in-memory queue size as a number of messages"
+ print " --policy-type TYPE Action taken when queue limit is reached (reject, flow_to_disk, ring, ring_strict)"
+ print " --last-value-queue Enable LVQ behavior on the queue"
+ print
+ print "Add Exchange Options:"
+ print " --durable Exchange is durable"
+ print " --sequence Exchange will insert a 'qpid.msg_sequence' field in the message header"
+ print " with a value that increments for each message forwarded."
+ print " --ive Exchange will behave as an 'initial-value-exchange', keeping a reference"
+ print " to the last message forwarded and enqueuing that message to newly bound"
+ print " queues."
+ print
+ sys.exit (1)
+
+class BrokerManager:
+ def __init__ (self):
+ self.brokerName = None
+ self.qmf = None
+ self.broker = None
+
+ def SetBroker (self, brokerUrl):
+ self.url = brokerUrl
+ self.qmf = Session()
+ self.broker = self.qmf.addBroker(brokerUrl)
+ agents = self.qmf.getAgents()
+ for a in agents:
+ if a.getAgentBank() == 0:
+ self.brokerAgent = a
+
+ def Disconnect(self):
+ if self.broker:
+ self.qmf.delBroker(self.broker)
+
+ def Overview (self):
+ exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent)
+ queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
+ print "Total Exchanges: %d" % len (exchanges)
+ etype = {}
+ for ex in exchanges:
+ if ex.type not in etype:
+ etype[ex.type] = 1
+ else:
+ etype[ex.type] = etype[ex.type] + 1
+ for typ in etype:
+ print "%15s: %d" % (typ, etype[typ])
+
+ print
+ print " Total Queues: %d" % len (queues)
+ _durable = 0
+ for queue in queues:
+ if queue.durable:
+ _durable = _durable + 1
+ print " durable: %d" % _durable
+ print " non-durable: %d" % (len (queues) - _durable)
+
+ def ExchangeList (self, filter):
+ exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent)
+ caption1 = "Type "
+ caption2 = "Exchange Name"
+ maxNameLen = len(caption2)
+ for ex in exchanges:
+ if self.match(ex.name, filter):
+ if len(ex.name) > maxNameLen: maxNameLen = len(ex.name)
+ print "%s%-*s Attributes" % (caption1, maxNameLen, caption2)
+ line = ""
+ for i in range(((maxNameLen + len(caption1)) / 5) + 5):
+ line += "====="
+ print line
+
+ for ex in exchanges:
+ if self.match (ex.name, filter):
+ print "%-10s%-*s " % (ex.type, maxNameLen, ex.name),
+ args = ex.arguments
+ if ex.durable: print "--durable",
+ if MSG_SEQUENCE in args and args[MSG_SEQUENCE] == 1: print "--sequence",
+ if IVE in args and args[IVE] == 1: print "--ive",
+ print
+
+ def ExchangeListRecurse (self, filter):
+ exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent)
+ bindings = self.qmf.getObjects(_class="binding", _agent=self.brokerAgent)
+ queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
+ for ex in exchanges:
+ if self.match (ex.name, filter):
+ print "Exchange '%s' (%s)" % (ex.name, ex.type)
+ for bind in bindings:
+ if bind.exchangeRef == ex.getObjectId():
+ qname = "<unknown>"
+ queue = self.findById (queues, bind.queueRef)
+ if queue != None:
+ qname = queue.name
+ print " bind [%s] => %s" % (bind.bindingKey, qname)
+
+
+ def QueueList (self, filter):
+ queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
+
+ caption = "Queue Name"
+ maxNameLen = len(caption)
+ for q in queues:
+ if self.match (q.name, filter):
+ if len(q.name) > maxNameLen: maxNameLen = len(q.name)
+ print "%-*s Attributes" % (maxNameLen, caption)
+ line = ""
+ for i in range((maxNameLen / 5) + 5):
+ line += "====="
+ print line
+
+ for q in queues:
+ if self.match (q.name, filter):
+ print "%-*s " % (maxNameLen, q.name),
+ args = q.arguments
+ if q.durable: print "--durable",
+ if CLUSTER_DURABLE in args and args[CLUSTER_DURABLE] == 1: print "--cluster-durable",
+ if q.autoDelete: print "auto-del",
+ if q.exclusive: print "excl",
+ if FILESIZE in args: print "--file-size=%d" % args[FILESIZE],
+ if FILECOUNT in args: print "--file-count=%d" % args[FILECOUNT],
+ if MAX_QUEUE_SIZE in args: print "--max-queue-size=%d" % args[MAX_QUEUE_SIZE],
+ if MAX_QUEUE_COUNT in args: print "--max-queue-count=%d" % args[MAX_QUEUE_COUNT],
+ if POLICY_TYPE in args: print "--policy-type=%s" % args[POLICY_TYPE],
+ if LVQ in args and args[LVQ] == 1: print "--last-value-queue",
+ print
+
+ def QueueListRecurse (self, filter):
+ exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent)
+ bindings = self.qmf.getObjects(_class="binding", _agent=self.brokerAgent)
+ queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
+ for queue in queues:
+ if self.match (queue.name, filter):
+ print "Queue '%s'" % queue.name
+ for bind in bindings:
+ if bind.queueRef == queue.getObjectId():
+ ename = "<unknown>"
+ ex = self.findById (exchanges, bind.exchangeRef)
+ if ex != None:
+ ename = ex.name
+ if ename == "":
+ ename = "''"
+ print " bind [%s] => %s" % (bind.bindingKey, ename)
+
+ def AddExchange (self, args):
+ if len (args) < 2:
+ Usage ()
+ etype = args[0]
+ ename = args[1]
+ declArgs = {}
+ if _msgSequence:
+ declArgs[MSG_SEQUENCE] = 1
+ if _ive:
+ declArgs[IVE] = 1
+ self.broker.getAmqpSession().exchange_declare (exchange=ename, type=etype, durable=_durable, arguments=declArgs)
+
+ def DelExchange (self, args):
+ if len (args) < 1:
+ Usage ()
+ ename = args[0]
+ self.broker.getAmqpSession().exchange_delete (exchange=ename)
+
+ def AddQueue (self, args):
+ if len (args) < 1:
+ Usage ()
+ qname = args[0]
+ declArgs = {}
+ if _durable:
+ declArgs[FILECOUNT] = _fileCount
+ declArgs[FILESIZE] = _fileSize
+
+ if _maxQueueSize:
+ declArgs[MAX_QUEUE_SIZE] = _maxQueueSize
+ if _maxQueueCount:
+ declArgs[MAX_QUEUE_COUNT] = _maxQueueCount
+ if _policyType:
+ declArgs[POLICY_TYPE] = _policyType
+ if _clusterDurable:
+ declArgs[CLUSTER_DURABLE] = 1
+ if _lvq:
+ declArgs[LVQ] = 1
+
+ self.broker.getAmqpSession().queue_declare (queue=qname, durable=_durable, arguments=declArgs)
+
+ def DelQueue (self, args):
+ if len (args) < 1:
+ Usage ()
+ qname = args[0]
+ self.broker.getAmqpSession().queue_delete (queue=qname)
+
+ def Bind (self, args):
+ if len (args) < 2:
+ Usage ()
+ ename = args[0]
+ qname = args[1]
+ key = ""
+ if len (args) > 2:
+ key = args[2]
+ self.broker.getAmqpSession().exchange_bind (queue=qname, exchange=ename, binding_key=key)
+
+ def Unbind (self, args):
+ if len (args) < 2:
+ Usage ()
+ ename = args[0]
+ qname = args[1]
+ key = ""
+ if len (args) > 2:
+ key = args[2]
+ self.broker.getAmqpSession().exchange_unbind (queue=qname, exchange=ename, binding_key=key)
+
+ def findById (self, items, id):
+ for item in items:
+ if item.getObjectId() == id:
+ return item
+ return None
+
+ def match (self, name, filter):
+ if filter == "":
+ return True
+ if name.find (filter) == -1:
+ return False
+ return True
+
+def YN (bool):
+ if bool:
+ return 'Y'
+ return 'N'
+
+
+##
+## Main Program
+##
+
+try:
+ longOpts = ("durable", "cluster-durable", "bindings", "broker-addr=", "file-count=",
+ "file-size=", "max-queue-size=", "max-queue-count=", "policy-type=",
+ "last-value-queue", "sequence", "ive")
+ (optlist, encArgs) = getopt.gnu_getopt (sys.argv[1:], "a:b", longOpts)
+except:
+ Usage ()
+
+try:
+ encoding = locale.getpreferredencoding()
+ cargs = [a.decode(encoding) for a in encArgs]
+except:
+ cargs = encArgs
+
+for opt in optlist:
+ if opt[0] == "-b" or opt[0] == "--bindings":
+ _recursive = True
+ if opt[0] == "-a" or opt[0] == "--broker-addr":
+ _host = opt[1]
+ if opt[0] == "--durable":
+ _durable = True
+ if opt[0] == "--cluster-durable":
+ _clusterDurable = True
+ if opt[0] == "--file-count":
+ _fileCount = int (opt[1])
+ if opt[0] == "--file-size":
+ _fileSize = int (opt[1])
+ if opt[0] == "--max-queue-size":
+ _maxQueueSize = int (opt[1])
+ if opt[0] == "--max-queue-count":
+ _maxQueueCount = int (opt[1])
+ if opt[0] == "--policy-type":
+ _policyType = opt[1]
+ if opt[0] == "--last-value-queue":
+ _lvq = True
+ if opt[0] == "--sequence":
+ _msgSequence = True
+ if opt[0] == "--ive":
+ _ive = True
+
+nargs = len (cargs)
+bm = BrokerManager ()
+
+try:
+ bm.SetBroker(_host)
+ if nargs == 0:
+ bm.Overview ()
+ else:
+ cmd = cargs[0]
+ modifier = ""
+ if nargs > 1:
+ modifier = cargs[1]
+ if cmd == "exchanges":
+ if _recursive:
+ bm.ExchangeListRecurse (modifier)
+ else:
+ bm.ExchangeList (modifier)
+ elif cmd == "queues":
+ if _recursive:
+ bm.QueueListRecurse (modifier)
+ else:
+ bm.QueueList (modifier)
+ elif cmd == "add":
+ if modifier == "exchange":
+ bm.AddExchange (cargs[2:])
+ elif modifier == "queue":
+ bm.AddQueue (cargs[2:])
+ else:
+ Usage ()
+ elif cmd == "del":
+ if modifier == "exchange":
+ bm.DelExchange (cargs[2:])
+ elif modifier == "queue":
+ bm.DelQueue (cargs[2:])
+ else:
+ Usage ()
+ elif cmd == "bind":
+ bm.Bind (cargs[1:])
+ elif cmd == "unbind":
+ bm.Unbind (cargs[1:])
+ else:
+ Usage ()
+except KeyboardInterrupt:
+ print
+except Exception,e:
+ print "Failed:", e.args
+ sys.exit(1)
+
+bm.Disconnect()