diff options
| author | Ted Ross <tross@apache.org> | 2008-10-31 18:56:24 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2008-10-31 18:56:24 +0000 |
| commit | a5049238140656ad5fbf2ada920b65b507b327d6 (patch) | |
| tree | c0f8aeb5e9d3ea5be6b489a885219a674a75ed37 /qpid/python | |
| parent | f29a0d9787daffa72a381864ae057e189d547702 (diff) | |
| download | qpid-python-a5049238140656ad5fbf2ada920b65b507b327d6.tar.gz | |
Federation enhancements and bug fixes:
qmfconsole.py - minor fixes, make sure object-dereference only queries one broker
Bridge.cpp - Added channel-id to queue name to avoid collisions
qpid-route - Added link-map feature for viewing the entire federated topology
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@709532 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python')
| -rwxr-xr-x | qpid/python/commands/qpid-route | 102 | ||||
| -rw-r--r-- | qpid/python/qpid/qmfconsole.py | 12 |
2 files changed, 110 insertions, 4 deletions
diff --git a/qpid/python/commands/qpid-route b/qpid/python/commands/qpid-route index 7d6d3e333e..47eeef3ff2 100755 --- a/qpid/python/commands/qpid-route +++ b/qpid/python/commands/qpid-route @@ -29,6 +29,7 @@ def Usage (): print "Usage: qpid-route [OPTIONS] link add <dest-broker> <src-broker>" print " qpid-route [OPTIONS] link del <dest-broker> <src-broker>" print " qpid-route [OPTIONS] link list [<dest-broker>]" + print " qpid-route [OPTIONS] link map [<broker>]" print print " qpid-route [OPTIONS] route add <dest-broker> <src-broker> <exchange> <routing-key> [tag] [exclude-list]" print " qpid-route [OPTIONS] route del <dest-broker> <src-broker> <exchange> <routing-key>" @@ -120,6 +121,80 @@ class RouteManager: print "%-16s%-8d %c %-18s%s" % \ (link.host, link.port, YN(link.durable), link.state, link.lastError) + def MapLinks(self): + qmf = self.qmf + print + print "Finding Linked Brokers:" + + brokerList = {} + brokerList[self.dest.name()] = self.broker + print " %s... Ok" % self.dest + + added = True + while added: + added = False + links = qmf.getObjects(_class="link") + for link in links: + url = qmfconsole.BrokerURL("%s:%d" % (link.host, link.port)) + if url.name() not in brokerList: + print " %s..." % url.name(), + try: + b = qmf.addBroker("%s:%d" % (link.host, link.port)) + brokerList[url.name()] = b + added = True + print "Ok" + except Exception, e: + print e + + print + print "Dynamic Routes:" + bridges = qmf.getObjects(_class="bridge", dynamic=True) + fedExchanges = [] + for bridge in bridges: + if bridge.src not in fedExchanges: + fedExchanges.append(bridge.src) + if len(fedExchanges) == 0: + print " none found" + else: + print + + for ex in fedExchanges: + print " Exchange %s:" % ex + pairs = [] + for bridge in bridges: + if bridge.src == ex: + link = bridge._linkRef_ + fromUrl = "%s:%s" % (link.host, link.port) + toUrl = bridge.getBroker().getUrl() + found = False + for pair in pairs: + if pair.matches(fromUrl, toUrl): + found = True + if not found: + pairs.append(RoutePair(fromUrl, toUrl)) + for pair in pairs: + print " %s" % pair + print + + print "Static Routes:" + bridges = qmf.getObjects(_class="bridge", dynamic=False) + if len(bridges) == 0: + print " none found" + else: + print + + for bridge in bridges: + link = bridge._linkRef_ + fromUrl = "%s:%s" % (link.host, link.port) + toUrl = bridge.getBroker().getUrl() + print " %s(%s) <= %s(%s) key=%s" % (toUrl, bridge.dest, fromUrl, bridge.src, bridge.key) + print + + for broker in brokerList: + if broker != self.dest.name(): + qmf.delBroker(brokerList[broker]) + + def AddRoute (self, srcBroker, exchange, routingKey, tag, excludes, dynamic=False): self.src = qmfconsole.BrokerURL(srcBroker) if self.dest.match(self.src.host, self.src.port): @@ -240,6 +315,28 @@ class RouteManager: elif _verbose: print "Ok" +class RoutePair: + def __init__(self, fromUrl, toUrl): + self.fromUrl = fromUrl + self.toUrl = toUrl + self.bidir = False + + def __repr__(self): + if self.bidir: + delimit = "<=>" + else: + delimit = " =>" + return "%s %s %s" % (self.fromUrl, delimit, self.toUrl) + + def matches(self, fromUrl, toUrl): + if fromUrl == self.fromUrl and toUrl == self.toUrl: + return True + if toUrl == self.fromUrl and fromUrl == self.toUrl: + self.bidir = True + return True + return False + + def YN(val): if val == 1: return 'Y' @@ -290,7 +387,9 @@ try: Usage() rm.DelLink (cargs[3]) elif cmd == "list": - rm.ListLinks () + rm.ListLinks() + elif cmd == "map": + rm.MapLinks() elif group == "dynamic": if cmd == "add": @@ -330,6 +429,7 @@ try: rm.ClearAllRoutes () else: Usage () + except Exception,e: print "Failed:", e.args[0] sys.exit(1) diff --git a/qpid/python/qpid/qmfconsole.py b/qpid/python/qpid/qmfconsole.py index f5c9af18cc..3ceb41b7a6 100644 --- a/qpid/python/qpid/qmfconsole.py +++ b/qpid/python/qpid/qmfconsole.py @@ -511,6 +511,8 @@ class Session: self.packages[pname][(cname, hash)] = _class finally: self.cv.release() + + self.seqMgr._release(seq) broker._decOutstanding() if self.console != None: self.console.newClass(kind, classKey) @@ -759,7 +761,7 @@ class SchemaProperty: self.unit = None self.min = None self.max = None - self.maxlan = None + self.maxlen = None self.desc = None for key, value in map.items(): @@ -916,6 +918,10 @@ class Object(object): for statistic in schema.getStatistics(): self._statistics.append((statistic, self._session._decodeValue(codec, statistic.type))) + def getBroker(self): + """ Return the broker from which this object was sent """ + return self._broker + def getObjectId(self): """ Return the object identifier for this object """ return self._objectId @@ -972,7 +978,7 @@ class Object(object): if name == property.name: return value if name == "_" + property.name + "_" and property.type == 10: # Dereference references - deref = self._session.getObjects(_objectId=value) + deref = self._session.getObjects(_objectId=value, _broker=self._broker) if len(deref) != 1: return None else: @@ -1090,6 +1096,7 @@ class Broker: self.error = None self.brokerId = None self.isConnected = False + self.amqpSessionId = "%s.%d" % (os.uname()[1], os.getpid()) self._tryToConnect() def isConnected(self): @@ -1126,7 +1133,6 @@ class Broker: def _tryToConnect(self): try: - self.amqpSessionId = "%s.%d" % (os.uname()[1], os.getpid()) sock = connect(self.host, self.port) if self.ssl: sock = ssl(sock) |
