summaryrefslogtreecommitdiff
path: root/qpid/python/commands/qpid-config
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2008-09-19 20:54:08 +0000
committerTed Ross <tross@apache.org>2008-09-19 20:54:08 +0000
commit49bc5a1a3bf61b521883027fc58b3950920f31f2 (patch)
tree32a1a4c9efde38e0293da4e2662147819cac3115 /qpid/python/commands/qpid-config
parent42650546ce3239e5de9e3c7335c89326012c2b93 (diff)
downloadqpid-python-49bc5a1a3bf61b521883027fc58b3950920f31f2.tar.gz
QPID-1288 - Added error handling and remote agent support to the console API. Ported qpid-config and qpid-route to the new API
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@697237 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python/commands/qpid-config')
-rwxr-xr-xqpid/python/commands/qpid-config220
1 files changed, 74 insertions, 146 deletions
diff --git a/qpid/python/commands/qpid-config b/qpid/python/commands/qpid-config
index cc9315f7ea..6bc38c7440 100755
--- a/qpid/python/commands/qpid-config
+++ b/qpid/python/commands/qpid-config
@@ -22,16 +22,7 @@
import os
import getopt
import sys
-import socket
-import qpid
-from threading import Condition
-from qpid.management import managementClient
-from qpid.managementdata import Broker
-from qpid.peer import Closed
-from qpid.connection import Connection, ConnectionFailed
-from qpid.datatypes import uuid4
-from qpid.util import connect
-from time import sleep
+from qpid import qmfconsole
_recursive = False
_host = "localhost"
@@ -78,44 +69,21 @@ def Usage ():
class BrokerManager:
def __init__ (self):
- self.dest = None
- self.src = None
- self.broker = None
-
- def SetBroker (self, broker):
- self.broker = broker
-
- def ConnectToBroker (self):
- try:
- self.sessionId = "%s.%d" % (os.uname()[1], os.getpid())
- self.conn = Connection (connect (self.broker.host, self.broker.port),
- username=self.broker.username, password=self.broker.password)
- self.conn.start ()
- self.session = self.conn.session (self.sessionId)
- self.mclient = managementClient (self.conn.spec)
- self.mchannel = self.mclient.addChannel (self.session)
- except socket.error, e:
- print "Socket Error %s - %s" % (e[0], e[1])
- sys.exit (1)
- except Closed, e:
- print "Connect Failed %d - %s" % (e[0], e[1])
- sys.exit (1)
- except ConnectionFailed, e:
- print "Connect Failed %d - %s" % (e[0], e[1])
- sys.exit(1)
-
- def Disconnect (self):
- self.mclient.removeChannel (self.mchannel)
- self.session.close(timeout=10)
- self.conn.close(timeout=10)
+ self.brokerName = None
+ self.qmf = None
+ self.broker = None
+
+ def SetBroker (self, brokerUrl):
+ self.url = brokerUrl
+ self.qmf = qmfconsole.Session()
+ self.broker = self.qmf.addBroker(brokerUrl)
+
+ def Disconnect(self):
+ self.qmf.delBroker(self.broker)
def Overview (self):
- self.ConnectToBroker ()
- mc = self.mclient
- mch = self.mchannel
- mc.syncWaitForStable (mch)
- exchanges = mc.syncGetObjects (mch, "exchange")
- queues = mc.syncGetObjects (mch, "queue")
+ exchanges = self.qmf.getObjects(name="exchange")
+ queues = self.qmf.getObjects(name="queue")
print "Total Exchanges: %d" % len (exchanges)
etype = {}
for ex in exchanges:
@@ -136,11 +104,7 @@ class BrokerManager:
print " non-durable: %d" % (len (queues) - _durable)
def ExchangeList (self, filter):
- self.ConnectToBroker ()
- mc = self.mclient
- mch = self.mchannel
- mc.syncWaitForStable (mch)
- exchanges = mc.syncGetObjects (mch, "exchange")
+ exchanges = self.qmf.getObjects(name="exchange")
print "Durable Type Bindings Exchange Name"
print "======================================================="
for ex in exchanges:
@@ -148,18 +112,14 @@ class BrokerManager:
print "%4c %-10s%5d %s" % (YN (ex.durable), ex.type, ex.bindingCount, ex.name)
def ExchangeListRecurse (self, filter):
- self.ConnectToBroker ()
- mc = self.mclient
- mch = self.mchannel
- mc.syncWaitForStable (mch)
- exchanges = mc.syncGetObjects (mch, "exchange")
- bindings = mc.syncGetObjects (mch, "binding")
- queues = mc.syncGetObjects (mch, "queue")
+ exchanges = self.qmf.getObjects(name="exchange")
+ bindings = self.qmf.getObjects(name="binding")
+ queues = self.qmf.getObjects(name="queue")
for ex in exchanges:
if self.match (ex.name, filter):
print "Exchange '%s' (%s)" % (ex.name, ex.type)
for bind in bindings:
- if bind.exchangeRef == ex.id:
+ if bind.exchangeRef == ex.getObjectId():
qname = "<unknown>"
queue = self.findById (queues, bind.queueRef)
if queue != None:
@@ -168,12 +128,8 @@ class BrokerManager:
def QueueList (self, filter):
- self.ConnectToBroker ()
- mc = self.mclient
- mch = self.mchannel
- mc.syncWaitForStable (mch)
- queues = mc.syncGetObjects (mch, "queue")
- journals = mc.syncGetObjects (mch, "journal")
+ queues = self.qmf.getObjects(name="queue")
+ journals = self.qmf.getObjects(name="journal")
print " Store Size"
print "Durable AutoDel Excl Bindings (files x file pages) Queue Name"
print "==========================================================================================="
@@ -193,18 +149,14 @@ class BrokerManager:
YN (q.exclusive), q.bindingCount, q.name)
def QueueListRecurse (self, filter):
- self.ConnectToBroker ()
- mc = self.mclient
- mch = self.mchannel
- mc.syncWaitForStable (mch)
- exchanges = mc.syncGetObjects (mch, "exchange")
- bindings = mc.syncGetObjects (mch, "binding")
- queues = mc.syncGetObjects (mch, "queue")
+ exchanges = self.qmf.getObjects(name="exchange")
+ bindings = self.qmf.getObjects(name="binding")
+ queues = self.qmf.getObjects(name="queue")
for queue in queues:
if self.match (queue.name, filter):
print "Queue '%s'" % queue.name
for bind in bindings:
- if bind.queueRef == queue.id:
+ if bind.queueRef == queue.getObjectId():
ename = "<unknown>"
ex = self.findById (exchanges, bind.exchangeRef)
if ex != None:
@@ -216,30 +168,19 @@ class BrokerManager:
def AddExchange (self, args):
if len (args) < 2:
Usage ()
- self.ConnectToBroker ()
etype = args[0]
ename = args[1]
-
- try:
- self.session.exchange_declare (exchange=ename, type=etype, durable=_durable)
- except Closed, e:
- print "Failed:", e
+ self.broker.getAmqpSession().exchange_declare (exchange=ename, type=etype, durable=_durable)
def DelExchange (self, args):
if len (args) < 1:
Usage ()
- self.ConnectToBroker ()
ename = args[0]
-
- try:
- self.session.exchange_delete (exchange=ename)
- except Closed, e:
- print "Failed:", e
+ self.broker.getAmqpSession().exchange_delete (exchange=ename)
def AddQueue (self, args):
if len (args) < 1:
Usage ()
- self.ConnectToBroker ()
qname = args[0]
declArgs = {}
if _durable:
@@ -251,55 +192,37 @@ class BrokerManager:
if _maxQueueCount:
declArgs[MAX_QUEUE_COUNT] = _maxQueueCount
- try:
- self.session.queue_declare (queue=qname, durable=_durable, arguments=declArgs)
- except Closed, e:
- print "Failed:", e
+ self.broker.getAmqpSession().queue_declare (queue=qname, durable=_durable, arguments=declArgs)
def DelQueue (self, args):
if len (args) < 1:
Usage ()
- self.ConnectToBroker ()
qname = args[0]
-
- try:
- self.session.queue_delete (queue=qname)
- except Closed, e:
- print "Failed:", e
+ self.broker.getAmqpSession().queue_delete (queue=qname)
def Bind (self, args):
if len (args) < 2:
Usage ()
- self.ConnectToBroker ()
ename = args[0]
qname = args[1]
key = ""
if len (args) > 2:
key = args[2]
-
- try:
- self.session.exchange_bind (queue=qname, exchange=ename, binding_key=key)
- except Closed, e:
- print "Failed:", e
+ self.broker.getAmqpSession().exchange_bind (queue=qname, exchange=ename, binding_key=key)
def Unbind (self, args):
if len (args) < 2:
Usage ()
- self.ConnectToBroker ()
ename = args[0]
qname = args[1]
key = ""
if len (args) > 2:
key = args[2]
-
- try:
- self.session.exchange_unbind (queue=qname, exchange=ename, binding_key=key)
- except Closed, e:
- print "Failed:", e
+ self.broker.getAmqpSession().exchange_unbind (queue=qname, exchange=ename, binding_key=key)
def findById (self, items, id):
for item in items:
- if item.id == id:
+ if item.getObjectId() == id:
return item
return None
@@ -343,43 +266,48 @@ for opt in optlist:
nargs = len (cargs)
bm = BrokerManager ()
-bm.SetBroker (Broker (_host))
-
-if nargs == 0:
- bm.Overview ()
-else:
- cmd = cargs[0]
- modifier = ""
- if nargs > 1:
- modifier = cargs[1]
- if cmd[0] == 'e':
- if _recursive:
- bm.ExchangeListRecurse (modifier)
- else:
- bm.ExchangeList (modifier)
- elif cmd[0] == 'q':
- if _recursive:
- bm.QueueListRecurse (modifier)
- else:
- bm.QueueList (modifier)
- elif cmd == "add":
- if modifier == "exchange":
- bm.AddExchange (cargs[2:])
- elif modifier == "queue":
- bm.AddQueue (cargs[2:])
- else:
- Usage ()
- elif cmd == "del":
- if modifier == "exchange":
- bm.DelExchange (cargs[2:])
- elif modifier == "queue":
- bm.DelQueue (cargs[2:])
+
+try:
+ bm.SetBroker(qmfconsole.BrokerURL(_host))
+ if nargs == 0:
+ bm.Overview ()
+ else:
+ cmd = cargs[0]
+ modifier = ""
+ if nargs > 1:
+ modifier = cargs[1]
+ if cmd[0] == 'e':
+ if _recursive:
+ bm.ExchangeListRecurse (modifier)
+ else:
+ bm.ExchangeList (modifier)
+ elif cmd[0] == 'q':
+ if _recursive:
+ bm.QueueListRecurse (modifier)
+ else:
+ bm.QueueList (modifier)
+ elif cmd == "add":
+ if modifier == "exchange":
+ bm.AddExchange (cargs[2:])
+ elif modifier == "queue":
+ bm.AddQueue (cargs[2:])
+ else:
+ Usage ()
+ elif cmd == "del":
+ if modifier == "exchange":
+ bm.DelExchange (cargs[2:])
+ elif modifier == "queue":
+ bm.DelQueue (cargs[2:])
+ else:
+ Usage ()
+ elif cmd == "bind":
+ bm.Bind (cargs[1:])
+ elif cmd == "unbind":
+ bm.Unbind (cargs[1:])
else:
Usage ()
- elif cmd == "bind":
- bm.Bind (cargs[1:])
- elif cmd == "unbind":
- bm.Unbind (cargs[1:])
- else:
- Usage ()
+except Exception,e:
+ print "Failed:", e.message
+ sys.exit(1)
+
bm.Disconnect()