diff options
| author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-08-19 22:12:27 +0000 |
|---|---|---|
| committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-08-19 22:12:27 +0000 |
| commit | 6c81457d6b3f54bc05a24f9d5a036aca7c887521 (patch) | |
| tree | 83afe39cf12d6650e3def6911c69c64b8e8a82ce | |
| parent | fbe5d1e5b2554cc00b8b128b4f50abae43007ed2 (diff) | |
| download | qpid-python-6c81457d6b3f54bc05a24f9d5a036aca7c887521.tar.gz | |
QPID-2810: clean up the broker thread properly on shutdown.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@987330 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | extras/qmf/src/py/qmf/console.py | 96 | ||||
| -rwxr-xr-x | tools/src/py/qpid-cluster | 21 | ||||
| -rwxr-xr-x | tools/src/py/qpid-config | 3 | ||||
| -rwxr-xr-x | tools/src/py/qpid-printevents | 24 | ||||
| -rwxr-xr-x | tools/src/py/qpid-route | 30 | ||||
| -rwxr-xr-x | tools/src/py/qpid-stat | 15 | ||||
| -rwxr-xr-x | tools/src/py/qpid-tool | 14 |
7 files changed, 151 insertions, 52 deletions
diff --git a/extras/qmf/src/py/qmf/console.py b/extras/qmf/src/py/qmf/console.py index 0a256d0bb5..3d514d7073 100644 --- a/extras/qmf/src/py/qmf/console.py +++ b/extras/qmf/src/py/qmf/console.py @@ -586,6 +586,20 @@ class Session: if self.userBindings and not self.rcvObjects: raise Exception("userBindings can't be set unless rcvObjects is set and a console is provided") + def close(self): + """ Releases all resources held by the session. Must be called by the + application when it is done with the Session object. + """ + self.cv.acquire() + try: + while len(self.brokers): + b = self.brokers.pop() + try: + b._shutdown() + except: + pass + finally: + self.cv.release() def _getBrokerForAgentAddr(self, agent_addr): try: @@ -616,23 +630,23 @@ class Session: def addBroker(self, target="localhost", timeout=None, mechanisms=None): - """ Connect to a Qpid broker. Returns an object of type Broker. """ + """ Connect to a Qpid broker. Returns an object of type Broker. + Will raise an exception if the session is not managing the connection and + the connection setup to the broker fails. + """ url = BrokerURL(target) broker = Broker(self, url.host, url.port, mechanisms, url.authName, url.authPass, ssl = url.scheme == URL.AMQPS, connTimeout=timeout) self.brokers.append(broker) - if not self.manageConnections: - broker._waitForStable() - agent = broker.getBrokerAgent() - if agent: - agent.getObjects(_class="agent") return broker def delBroker(self, broker): - """ Disconnect from a broker. The 'broker' argument is the object - returned from the addBroker call """ + """ Disconnect from a broker, and deallocate the broker proxy object. The + 'broker' argument is the object returned from the addBroker call. Errors + are ignored. + """ if self.console: for agent in broker.getAgents(): self.console.delAgent(agent) @@ -2053,6 +2067,13 @@ class Broker(Thread): self.data = data def __init__(self, session, host, port, authMechs, authUser, authPass, ssl=False, connTimeout=None): + """ Create a broker proxy and setup a connection to the broker. Will raise + an exception if the connection fails and the session is not configured to + retry connection setup (manageConnections = False). + + Spawns a thread to manage the broker connection. Call _shutdown() to + shutdown the thread when releasing the broker. + """ Thread.__init__(self) self.session = session self.host = host @@ -2071,6 +2092,8 @@ class Broker(Thread): self.brokerAgent = None self.brokerSupportsV2 = None self.rcv_queue = Queue() # for msg received on session + self.conn = None + self.amqpSession = None self.amqpSessionId = "%s.%d.%d" % (platform.uname()[1], os.getpid(), Broker.nextSeq) Broker.nextSeq += 1 self.last_age_check = time() @@ -2083,10 +2106,20 @@ class Broker(Thread): self.start() if not self.session.manageConnections: # wait for connection setup to complete in subthread. - # On failure, propagate exeception to caller + # On failure, propagate exception to caller self.ready.acquire() if self.conn_exc: + self._shutdown() # wait for the subthread to clean up... raise self.conn_exc + # connection up - wait for stable... + try: + self._waitForStable() + agent = self.getBrokerAgent() + if agent: + agent.getObjects(_class="agent") + except: + self._shutdown() # wait for the subthread to clean up... + raise def isConnected(self): @@ -2173,6 +2206,10 @@ class Broker(Thread): self.cv.release() def _tryToConnect(self): + """ Connect to the broker. Returns True if connection setup completes + successfully, otherwise returns False and sets self.error/self.conn_exc + with error info. Does not raise exceptions. + """ self.error = None self.conn_exc = None try: @@ -2188,6 +2225,20 @@ class Broker(Thread): self.syncResult = None self.reqsOutstanding = 1 + try: + if self.amqpSession: + self.amqpSession.close() + except: + pass + self.amqpSession = None + + try: + if self.conn: + self.conn.close() + except: + pass + self.conn = None + sock = connect(self.host, self.port) sock.settimeout(5) oldTimeout = sock.gettimeout() @@ -2426,22 +2477,27 @@ class Broker(Thread): self.amqpSession.message_transfer(destination=dest, message=msg) def _shutdown(self, _timeout=10): + """ Disconnect from a broker, and release its resources. Errors are + ignored. + """ if self.isAlive(): # kick the thread self.canceled = True self.rcv_queue.put(Broker._q_item(Broker._q_item.type_wakeup, None)) self.join(_timeout) - if self.connected: - self.amqpSession.incoming("rdest").stop() - if self.session.console != None: - self.amqpSession.incoming("tdest").stop() - if self.brokerSupportsV2: - self.amqpSession.incoming("v2dest").stop() - self.amqpSession.incoming("v2TopicUI").stop() - self.amqpSession.incoming("v2TopicHB").stop() - self.amqpSession.close() - self.conn.close() - self.connected = False + try: + if self.amqpSession: + self.amqpSession.close(); + except: + pass + self.amqpSession = None + try: + if self.conn: + self.conn.close() + except: + pass + self.conn = None + self.connected = False def _waitForStable(self): try: diff --git a/tools/src/py/qpid-cluster b/tools/src/py/qpid-cluster index e02cca7a88..b96d0d3694 100755 --- a/tools/src/py/qpid-cluster +++ b/tools/src/py/qpid-cluster @@ -90,6 +90,7 @@ class BrokerManager: self.brokerName = None self.qmf = None self.broker = None + self.brokers = [] def SetBroker(self, brokerUrl): self.url = brokerUrl @@ -101,8 +102,18 @@ class BrokerManager: self.brokerAgent = a def Disconnect(self): - if self.broker: - self.qmf.delBroker(self.broker) + """ Release any allocated brokers. Ignore any failures as the tool is + shutting down. + """ + try: + if self.broker: + self.qmf.delBroker(self.broker) + self.broker = None + while len(self.brokers): + b = self.brokers.pop() + self.qmf.delBroker(b) + except: + pass def _getClusters(self): packages = self.qmf.getPackages() @@ -195,7 +206,6 @@ class BrokerManager: hostList = self._getHostList(memberList) self.qmf.delBroker(self.broker) self.broker = None - self.brokers = [] idx = 0 for host in hostList: @@ -238,10 +248,10 @@ class BrokerManager: if self.config._delConn and not found: print "Client connection '%s' not found" % self.config._delConn - for broker in self.brokers: + while len(self.brokers): + broker = self.brokers.pop() self.qmf.delBroker(broker) - def main(argv=None): if argv is None: argv = sys.argv try: @@ -322,6 +332,7 @@ def main(argv=None): except KeyboardInterrupt: print except Exception,e: + bm.Disconnect() # try to deallocate brokers - ignores errors if str(e).find("connection aborted") > 0: # we expect this when asking the connected broker to shut down return 0 diff --git a/tools/src/py/qpid-config b/tools/src/py/qpid-config index a97b6cba94..cc7debc80a 100755 --- a/tools/src/py/qpid-config +++ b/tools/src/py/qpid-config @@ -574,13 +574,16 @@ except KeyboardInterrupt: print except IOError, e: print e + bm.Disconnect() sys.exit(1) except SystemExit, e: + bm.Disconnect() sys.exit(1) except Exception,e: if e.__class__.__name__ != "Timeout": # ignore Timeout exception, handle in the loop below print "Failed: %s: %s" % (e.__class__.__name__, e) + bm.Disconnect() sys.exit(1) while True: diff --git a/tools/src/py/qpid-printevents b/tools/src/py/qpid-printevents index ed2155ad22..e14d85e12a 100755 --- a/tools/src/py/qpid-printevents +++ b/tools/src/py/qpid-printevents @@ -59,18 +59,20 @@ ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost console = EventConsole() session = Session(console, rcvObjects=False, rcvHeartbeats=False, manageConnections=True) brokers = [] - for host in arguments: - brokers.append(session.addBroker(host)) - try: - while (True): - sleep(10) - except KeyboardInterrupt: - for broker in brokers: - session.delBroker(broker) - print - sys.exit(0) - + for host in arguments: + brokers.append(session.addBroker(host)) + try: + while (True): + sleep(10) + except KeyboardInterrupt: + print + sys.exit(0) + finally: + while len(brokers): + b = brokers.pop() + session.delBroker(b) + if __name__ == '__main__': main() diff --git a/tools/src/py/qpid-route b/tools/src/py/qpid-route index ceb0c6313e..be6bdf958c 100755 --- a/tools/src/py/qpid-route +++ b/tools/src/py/qpid-route @@ -71,6 +71,7 @@ _connTimeout = 10 class RouteManager: def __init__(self, localBroker): + self.brokerList = {} self.local = BrokerURL(localBroker) self.remote = None self.qmf = Session() @@ -79,7 +80,16 @@ class RouteManager: self.agent = self.broker.getBrokerAgent() def disconnect(self): - self.qmf.delBroker(self.broker) + try: + if self.broker: + self.qmf.delBroker(self.broker) + self.broker = None + while len(self.brokerList): + b = self.brokerList.popitem() + if b[0] != self.local.name(): + self.qmf.delBroker(b[1]) + except: + pass # ignore errors while shutting down def getLink(self): links = self.agent.getObjects(_class="link") @@ -135,8 +145,7 @@ class RouteManager: print print "Finding Linked Brokers:" - brokerList = {} - brokerList[self.local.name()] = self.broker + self.brokerList[self.local.name()] = self.broker print " %s... Ok" % self.local added = True @@ -145,11 +154,11 @@ class RouteManager: links = self.qmf.getObjects(_class="link") for link in links: url = BrokerURL("%s:%d" % (link.host, link.port)) - if url.name() not in brokerList: + if url.name() not in self.brokerList: print " %s..." % url.name(), try: b = self.qmf.addBroker("%s:%d" % (link.host, link.port), _connTimeout) - brokerList[url.name()] = b + self.brokerList[url.name()] = b added = True print "Ok" except Exception, e: @@ -217,10 +226,10 @@ class RouteManager: (toUrl, leftType, left, arrow, fromUrl, rightType, right, bridge.key) print - for broker in brokerList: - if broker != self.local.name(): - self.qmf.delBroker(brokerList[broker]) - + while len(self.brokerList): + b = self.brokerList.popitem() + if b[0] != self.local.name(): + self.qmf.delBroker(b[1]) def addRoute(self, remoteBroker, exchange, routingKey, tag, excludes, dynamic=False): if dynamic and _srclocal: @@ -463,6 +472,7 @@ else: group = cargs[0] cmd = cargs[1] +rm = None try: rm = RouteManager(localBroker) if group == "link": @@ -528,6 +538,8 @@ try: Usage() except Exception,e: + if rm: + rm.disconnect() # try to release broker resources print "Failed: %s - %s" % (e.__class__.__name__, e) sys.exit(1) diff --git a/tools/src/py/qpid-stat b/tools/src/py/qpid-stat index 848596158f..ae5683ec54 100755 --- a/tools/src/py/qpid-stat +++ b/tools/src/py/qpid-stat @@ -161,10 +161,16 @@ class BrokerManager(Console): self.brokerAgent = a def Disconnect(self): - if self.broker: - self.qmf.delBroker(self.broker) - else: - for b in self.brokers: self.qmf.delBroker(b.broker) + """ Release any allocated brokers. Ignore any failures as the tool is + shutting down. + """ + try: + if self.broker: + self.qmf.delBroker(self.broker) + else: + for b in self.brokers: self.qmf.delBroker(b.broker) + except: + pass def _getCluster(self): packages = self.qmf.getPackages() @@ -524,6 +530,7 @@ except KeyboardInterrupt: print except Exception,e: print "Failed: %s - %s" % (e.__class__.__name__, e) + bm.Disconnect() # try to deallocate brokers raise # FIXME aconway 2010-03-03: sys.exit(1) diff --git a/tools/src/py/qpid-tool b/tools/src/py/qpid-tool index 3a2a340080..cebd865501 100755 --- a/tools/src/py/qpid-tool +++ b/tools/src/py/qpid-tool @@ -191,8 +191,12 @@ class QmfData(Console): self.cli = cli def close(self): - self.closing = True - self.session.delBroker(self.broker) + try: + self.closing = True + if self.session and self.broker: + self.session.delBroker(self.broker) + except: + pass # we're shutting down - ignore any errors def classCompletions(self, text): pass @@ -645,4 +649,8 @@ try: except KeyboardInterrupt: print print "Exiting..." - data.close() +except Exception, e: + print "Failed: %s - %s" % (e.__class__.__name__, e) + +# alway attempt to cleanup broker resources +data.close() |
