summaryrefslogtreecommitdiff
path: root/qpid/python
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2008-10-31 18:56:24 +0000
committerTed Ross <tross@apache.org>2008-10-31 18:56:24 +0000
commita5049238140656ad5fbf2ada920b65b507b327d6 (patch)
treec0f8aeb5e9d3ea5be6b489a885219a674a75ed37 /qpid/python
parentf29a0d9787daffa72a381864ae057e189d547702 (diff)
downloadqpid-python-a5049238140656ad5fbf2ada920b65b507b327d6.tar.gz
Federation enhancements and bug fixes:
qmfconsole.py - minor fixes, make sure object-dereference only queries one broker Bridge.cpp - Added channel-id to queue name to avoid collisions qpid-route - Added link-map feature for viewing the entire federated topology git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@709532 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python')
-rwxr-xr-xqpid/python/commands/qpid-route102
-rw-r--r--qpid/python/qpid/qmfconsole.py12
2 files changed, 110 insertions, 4 deletions
diff --git a/qpid/python/commands/qpid-route b/qpid/python/commands/qpid-route
index 7d6d3e333e..47eeef3ff2 100755
--- a/qpid/python/commands/qpid-route
+++ b/qpid/python/commands/qpid-route
@@ -29,6 +29,7 @@ def Usage ():
print "Usage: qpid-route [OPTIONS] link add <dest-broker> <src-broker>"
print " qpid-route [OPTIONS] link del <dest-broker> <src-broker>"
print " qpid-route [OPTIONS] link list [<dest-broker>]"
+ print " qpid-route [OPTIONS] link map [<broker>]"
print
print " qpid-route [OPTIONS] route add <dest-broker> <src-broker> <exchange> <routing-key> [tag] [exclude-list]"
print " qpid-route [OPTIONS] route del <dest-broker> <src-broker> <exchange> <routing-key>"
@@ -120,6 +121,80 @@ class RouteManager:
print "%-16s%-8d %c %-18s%s" % \
(link.host, link.port, YN(link.durable), link.state, link.lastError)
+ def MapLinks(self):
+ qmf = self.qmf
+ print
+ print "Finding Linked Brokers:"
+
+ brokerList = {}
+ brokerList[self.dest.name()] = self.broker
+ print " %s... Ok" % self.dest
+
+ added = True
+ while added:
+ added = False
+ links = qmf.getObjects(_class="link")
+ for link in links:
+ url = qmfconsole.BrokerURL("%s:%d" % (link.host, link.port))
+ if url.name() not in brokerList:
+ print " %s..." % url.name(),
+ try:
+ b = qmf.addBroker("%s:%d" % (link.host, link.port))
+ brokerList[url.name()] = b
+ added = True
+ print "Ok"
+ except Exception, e:
+ print e
+
+ print
+ print "Dynamic Routes:"
+ bridges = qmf.getObjects(_class="bridge", dynamic=True)
+ fedExchanges = []
+ for bridge in bridges:
+ if bridge.src not in fedExchanges:
+ fedExchanges.append(bridge.src)
+ if len(fedExchanges) == 0:
+ print " none found"
+ else:
+ print
+
+ for ex in fedExchanges:
+ print " Exchange %s:" % ex
+ pairs = []
+ for bridge in bridges:
+ if bridge.src == ex:
+ link = bridge._linkRef_
+ fromUrl = "%s:%s" % (link.host, link.port)
+ toUrl = bridge.getBroker().getUrl()
+ found = False
+ for pair in pairs:
+ if pair.matches(fromUrl, toUrl):
+ found = True
+ if not found:
+ pairs.append(RoutePair(fromUrl, toUrl))
+ for pair in pairs:
+ print " %s" % pair
+ print
+
+ print "Static Routes:"
+ bridges = qmf.getObjects(_class="bridge", dynamic=False)
+ if len(bridges) == 0:
+ print " none found"
+ else:
+ print
+
+ for bridge in bridges:
+ link = bridge._linkRef_
+ fromUrl = "%s:%s" % (link.host, link.port)
+ toUrl = bridge.getBroker().getUrl()
+ print " %s(%s) <= %s(%s) key=%s" % (toUrl, bridge.dest, fromUrl, bridge.src, bridge.key)
+ print
+
+ for broker in brokerList:
+ if broker != self.dest.name():
+ qmf.delBroker(brokerList[broker])
+
+
def AddRoute (self, srcBroker, exchange, routingKey, tag, excludes, dynamic=False):
self.src = qmfconsole.BrokerURL(srcBroker)
if self.dest.match(self.src.host, self.src.port):
@@ -240,6 +315,28 @@ class RouteManager:
elif _verbose:
print "Ok"
+class RoutePair:
+ def __init__(self, fromUrl, toUrl):
+ self.fromUrl = fromUrl
+ self.toUrl = toUrl
+ self.bidir = False
+
+ def __repr__(self):
+ if self.bidir:
+ delimit = "<=>"
+ else:
+ delimit = " =>"
+ return "%s %s %s" % (self.fromUrl, delimit, self.toUrl)
+
+ def matches(self, fromUrl, toUrl):
+ if fromUrl == self.fromUrl and toUrl == self.toUrl:
+ return True
+ if toUrl == self.fromUrl and fromUrl == self.toUrl:
+ self.bidir = True
+ return True
+ return False
+
+
def YN(val):
if val == 1:
return 'Y'
@@ -290,7 +387,9 @@ try:
Usage()
rm.DelLink (cargs[3])
elif cmd == "list":
- rm.ListLinks ()
+ rm.ListLinks()
+ elif cmd == "map":
+ rm.MapLinks()
elif group == "dynamic":
if cmd == "add":
@@ -330,6 +429,7 @@ try:
rm.ClearAllRoutes ()
else:
Usage ()
+
except Exception,e:
print "Failed:", e.args[0]
sys.exit(1)
diff --git a/qpid/python/qpid/qmfconsole.py b/qpid/python/qpid/qmfconsole.py
index f5c9af18cc..3ceb41b7a6 100644
--- a/qpid/python/qpid/qmfconsole.py
+++ b/qpid/python/qpid/qmfconsole.py
@@ -511,6 +511,8 @@ class Session:
self.packages[pname][(cname, hash)] = _class
finally:
self.cv.release()
+
+ self.seqMgr._release(seq)
broker._decOutstanding()
if self.console != None:
self.console.newClass(kind, classKey)
@@ -759,7 +761,7 @@ class SchemaProperty:
self.unit = None
self.min = None
self.max = None
- self.maxlan = None
+ self.maxlen = None
self.desc = None
for key, value in map.items():
@@ -916,6 +918,10 @@ class Object(object):
for statistic in schema.getStatistics():
self._statistics.append((statistic, self._session._decodeValue(codec, statistic.type)))
+ def getBroker(self):
+ """ Return the broker from which this object was sent """
+ return self._broker
+
def getObjectId(self):
""" Return the object identifier for this object """
return self._objectId
@@ -972,7 +978,7 @@ class Object(object):
if name == property.name:
return value
if name == "_" + property.name + "_" and property.type == 10: # Dereference references
- deref = self._session.getObjects(_objectId=value)
+ deref = self._session.getObjects(_objectId=value, _broker=self._broker)
if len(deref) != 1:
return None
else:
@@ -1090,6 +1096,7 @@ class Broker:
self.error = None
self.brokerId = None
self.isConnected = False
+ self.amqpSessionId = "%s.%d" % (os.uname()[1], os.getpid())
self._tryToConnect()
def isConnected(self):
@@ -1126,7 +1133,6 @@ class Broker:
def _tryToConnect(self):
try:
- self.amqpSessionId = "%s.%d" % (os.uname()[1], os.getpid())
sock = connect(self.host, self.port)
if self.ssl:
sock = ssl(sock)