diff options
| author | Ted Ross <tross@apache.org> | 2008-09-19 20:54:08 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2008-09-19 20:54:08 +0000 |
| commit | 49bc5a1a3bf61b521883027fc58b3950920f31f2 (patch) | |
| tree | 32a1a4c9efde38e0293da4e2662147819cac3115 /qpid/python/commands/qpid-config | |
| parent | 42650546ce3239e5de9e3c7335c89326012c2b93 (diff) | |
| download | qpid-python-49bc5a1a3bf61b521883027fc58b3950920f31f2.tar.gz | |
QPID-1288 - Added error handling and remote agent support to the console API. Ported qpid-config and qpid-route to the new API
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@697237 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python/commands/qpid-config')
| -rwxr-xr-x | qpid/python/commands/qpid-config | 220 |
1 files changed, 74 insertions, 146 deletions
diff --git a/qpid/python/commands/qpid-config b/qpid/python/commands/qpid-config index cc9315f7ea..6bc38c7440 100755 --- a/qpid/python/commands/qpid-config +++ b/qpid/python/commands/qpid-config @@ -22,16 +22,7 @@ import os import getopt import sys -import socket -import qpid -from threading import Condition -from qpid.management import managementClient -from qpid.managementdata import Broker -from qpid.peer import Closed -from qpid.connection import Connection, ConnectionFailed -from qpid.datatypes import uuid4 -from qpid.util import connect -from time import sleep +from qpid import qmfconsole _recursive = False _host = "localhost" @@ -78,44 +69,21 @@ def Usage (): class BrokerManager: def __init__ (self): - self.dest = None - self.src = None - self.broker = None - - def SetBroker (self, broker): - self.broker = broker - - def ConnectToBroker (self): - try: - self.sessionId = "%s.%d" % (os.uname()[1], os.getpid()) - self.conn = Connection (connect (self.broker.host, self.broker.port), - username=self.broker.username, password=self.broker.password) - self.conn.start () - self.session = self.conn.session (self.sessionId) - self.mclient = managementClient (self.conn.spec) - self.mchannel = self.mclient.addChannel (self.session) - except socket.error, e: - print "Socket Error %s - %s" % (e[0], e[1]) - sys.exit (1) - except Closed, e: - print "Connect Failed %d - %s" % (e[0], e[1]) - sys.exit (1) - except ConnectionFailed, e: - print "Connect Failed %d - %s" % (e[0], e[1]) - sys.exit(1) - - def Disconnect (self): - self.mclient.removeChannel (self.mchannel) - self.session.close(timeout=10) - self.conn.close(timeout=10) + self.brokerName = None + self.qmf = None + self.broker = None + + def SetBroker (self, brokerUrl): + self.url = brokerUrl + self.qmf = qmfconsole.Session() + self.broker = self.qmf.addBroker(brokerUrl) + + def Disconnect(self): + self.qmf.delBroker(self.broker) def Overview (self): - self.ConnectToBroker () - mc = self.mclient - mch = self.mchannel - mc.syncWaitForStable (mch) - exchanges = mc.syncGetObjects (mch, "exchange") - queues = mc.syncGetObjects (mch, "queue") + exchanges = self.qmf.getObjects(name="exchange") + queues = self.qmf.getObjects(name="queue") print "Total Exchanges: %d" % len (exchanges) etype = {} for ex in exchanges: @@ -136,11 +104,7 @@ class BrokerManager: print " non-durable: %d" % (len (queues) - _durable) def ExchangeList (self, filter): - self.ConnectToBroker () - mc = self.mclient - mch = self.mchannel - mc.syncWaitForStable (mch) - exchanges = mc.syncGetObjects (mch, "exchange") + exchanges = self.qmf.getObjects(name="exchange") print "Durable Type Bindings Exchange Name" print "=======================================================" for ex in exchanges: @@ -148,18 +112,14 @@ class BrokerManager: print "%4c %-10s%5d %s" % (YN (ex.durable), ex.type, ex.bindingCount, ex.name) def ExchangeListRecurse (self, filter): - self.ConnectToBroker () - mc = self.mclient - mch = self.mchannel - mc.syncWaitForStable (mch) - exchanges = mc.syncGetObjects (mch, "exchange") - bindings = mc.syncGetObjects (mch, "binding") - queues = mc.syncGetObjects (mch, "queue") + exchanges = self.qmf.getObjects(name="exchange") + bindings = self.qmf.getObjects(name="binding") + queues = self.qmf.getObjects(name="queue") for ex in exchanges: if self.match (ex.name, filter): print "Exchange '%s' (%s)" % (ex.name, ex.type) for bind in bindings: - if bind.exchangeRef == ex.id: + if bind.exchangeRef == ex.getObjectId(): qname = "<unknown>" queue = self.findById (queues, bind.queueRef) if queue != None: @@ -168,12 +128,8 @@ class BrokerManager: def QueueList (self, filter): - self.ConnectToBroker () - mc = self.mclient - mch = self.mchannel - mc.syncWaitForStable (mch) - queues = mc.syncGetObjects (mch, "queue") - journals = mc.syncGetObjects (mch, "journal") + queues = self.qmf.getObjects(name="queue") + journals = self.qmf.getObjects(name="journal") print " Store Size" print "Durable AutoDel Excl Bindings (files x file pages) Queue Name" print "===========================================================================================" @@ -193,18 +149,14 @@ class BrokerManager: YN (q.exclusive), q.bindingCount, q.name) def QueueListRecurse (self, filter): - self.ConnectToBroker () - mc = self.mclient - mch = self.mchannel - mc.syncWaitForStable (mch) - exchanges = mc.syncGetObjects (mch, "exchange") - bindings = mc.syncGetObjects (mch, "binding") - queues = mc.syncGetObjects (mch, "queue") + exchanges = self.qmf.getObjects(name="exchange") + bindings = self.qmf.getObjects(name="binding") + queues = self.qmf.getObjects(name="queue") for queue in queues: if self.match (queue.name, filter): print "Queue '%s'" % queue.name for bind in bindings: - if bind.queueRef == queue.id: + if bind.queueRef == queue.getObjectId(): ename = "<unknown>" ex = self.findById (exchanges, bind.exchangeRef) if ex != None: @@ -216,30 +168,19 @@ class BrokerManager: def AddExchange (self, args): if len (args) < 2: Usage () - self.ConnectToBroker () etype = args[0] ename = args[1] - - try: - self.session.exchange_declare (exchange=ename, type=etype, durable=_durable) - except Closed, e: - print "Failed:", e + self.broker.getAmqpSession().exchange_declare (exchange=ename, type=etype, durable=_durable) def DelExchange (self, args): if len (args) < 1: Usage () - self.ConnectToBroker () ename = args[0] - - try: - self.session.exchange_delete (exchange=ename) - except Closed, e: - print "Failed:", e + self.broker.getAmqpSession().exchange_delete (exchange=ename) def AddQueue (self, args): if len (args) < 1: Usage () - self.ConnectToBroker () qname = args[0] declArgs = {} if _durable: @@ -251,55 +192,37 @@ class BrokerManager: if _maxQueueCount: declArgs[MAX_QUEUE_COUNT] = _maxQueueCount - try: - self.session.queue_declare (queue=qname, durable=_durable, arguments=declArgs) - except Closed, e: - print "Failed:", e + self.broker.getAmqpSession().queue_declare (queue=qname, durable=_durable, arguments=declArgs) def DelQueue (self, args): if len (args) < 1: Usage () - self.ConnectToBroker () qname = args[0] - - try: - self.session.queue_delete (queue=qname) - except Closed, e: - print "Failed:", e + self.broker.getAmqpSession().queue_delete (queue=qname) def Bind (self, args): if len (args) < 2: Usage () - self.ConnectToBroker () ename = args[0] qname = args[1] key = "" if len (args) > 2: key = args[2] - - try: - self.session.exchange_bind (queue=qname, exchange=ename, binding_key=key) - except Closed, e: - print "Failed:", e + self.broker.getAmqpSession().exchange_bind (queue=qname, exchange=ename, binding_key=key) def Unbind (self, args): if len (args) < 2: Usage () - self.ConnectToBroker () ename = args[0] qname = args[1] key = "" if len (args) > 2: key = args[2] - - try: - self.session.exchange_unbind (queue=qname, exchange=ename, binding_key=key) - except Closed, e: - print "Failed:", e + self.broker.getAmqpSession().exchange_unbind (queue=qname, exchange=ename, binding_key=key) def findById (self, items, id): for item in items: - if item.id == id: + if item.getObjectId() == id: return item return None @@ -343,43 +266,48 @@ for opt in optlist: nargs = len (cargs) bm = BrokerManager () -bm.SetBroker (Broker (_host)) - -if nargs == 0: - bm.Overview () -else: - cmd = cargs[0] - modifier = "" - if nargs > 1: - modifier = cargs[1] - if cmd[0] == 'e': - if _recursive: - bm.ExchangeListRecurse (modifier) - else: - bm.ExchangeList (modifier) - elif cmd[0] == 'q': - if _recursive: - bm.QueueListRecurse (modifier) - else: - bm.QueueList (modifier) - elif cmd == "add": - if modifier == "exchange": - bm.AddExchange (cargs[2:]) - elif modifier == "queue": - bm.AddQueue (cargs[2:]) - else: - Usage () - elif cmd == "del": - if modifier == "exchange": - bm.DelExchange (cargs[2:]) - elif modifier == "queue": - bm.DelQueue (cargs[2:]) + +try: + bm.SetBroker(qmfconsole.BrokerURL(_host)) + if nargs == 0: + bm.Overview () + else: + cmd = cargs[0] + modifier = "" + if nargs > 1: + modifier = cargs[1] + if cmd[0] == 'e': + if _recursive: + bm.ExchangeListRecurse (modifier) + else: + bm.ExchangeList (modifier) + elif cmd[0] == 'q': + if _recursive: + bm.QueueListRecurse (modifier) + else: + bm.QueueList (modifier) + elif cmd == "add": + if modifier == "exchange": + bm.AddExchange (cargs[2:]) + elif modifier == "queue": + bm.AddQueue (cargs[2:]) + else: + Usage () + elif cmd == "del": + if modifier == "exchange": + bm.DelExchange (cargs[2:]) + elif modifier == "queue": + bm.DelQueue (cargs[2:]) + else: + Usage () + elif cmd == "bind": + bm.Bind (cargs[1:]) + elif cmd == "unbind": + bm.Unbind (cargs[1:]) else: Usage () - elif cmd == "bind": - bm.Bind (cargs[1:]) - elif cmd == "unbind": - bm.Unbind (cargs[1:]) - else: - Usage () +except Exception,e: + print "Failed:", e.message + sys.exit(1) + bm.Disconnect() |
