summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2010-08-19 22:12:27 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2010-08-19 22:12:27 +0000
commit6c81457d6b3f54bc05a24f9d5a036aca7c887521 (patch)
tree83afe39cf12d6650e3def6911c69c64b8e8a82ce
parentfbe5d1e5b2554cc00b8b128b4f50abae43007ed2 (diff)
downloadqpid-python-6c81457d6b3f54bc05a24f9d5a036aca7c887521.tar.gz
QPID-2810: clean up the broker thread properly on shutdown.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@987330 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--extras/qmf/src/py/qmf/console.py96
-rwxr-xr-xtools/src/py/qpid-cluster21
-rwxr-xr-xtools/src/py/qpid-config3
-rwxr-xr-xtools/src/py/qpid-printevents24
-rwxr-xr-xtools/src/py/qpid-route30
-rwxr-xr-xtools/src/py/qpid-stat15
-rwxr-xr-xtools/src/py/qpid-tool14
7 files changed, 151 insertions, 52 deletions
diff --git a/extras/qmf/src/py/qmf/console.py b/extras/qmf/src/py/qmf/console.py
index 0a256d0bb5..3d514d7073 100644
--- a/extras/qmf/src/py/qmf/console.py
+++ b/extras/qmf/src/py/qmf/console.py
@@ -586,6 +586,20 @@ class Session:
if self.userBindings and not self.rcvObjects:
raise Exception("userBindings can't be set unless rcvObjects is set and a console is provided")
+ def close(self):
+ """ Releases all resources held by the session. Must be called by the
+ application when it is done with the Session object.
+ """
+ self.cv.acquire()
+ try:
+ while len(self.brokers):
+ b = self.brokers.pop()
+ try:
+ b._shutdown()
+ except:
+ pass
+ finally:
+ self.cv.release()
def _getBrokerForAgentAddr(self, agent_addr):
try:
@@ -616,23 +630,23 @@ class Session:
def addBroker(self, target="localhost", timeout=None, mechanisms=None):
- """ Connect to a Qpid broker. Returns an object of type Broker. """
+ """ Connect to a Qpid broker. Returns an object of type Broker.
+ Will raise an exception if the session is not managing the connection and
+ the connection setup to the broker fails.
+ """
url = BrokerURL(target)
broker = Broker(self, url.host, url.port, mechanisms, url.authName, url.authPass,
ssl = url.scheme == URL.AMQPS, connTimeout=timeout)
self.brokers.append(broker)
- if not self.manageConnections:
- broker._waitForStable()
- agent = broker.getBrokerAgent()
- if agent:
- agent.getObjects(_class="agent")
return broker
def delBroker(self, broker):
- """ Disconnect from a broker. The 'broker' argument is the object
- returned from the addBroker call """
+ """ Disconnect from a broker, and deallocate the broker proxy object. The
+ 'broker' argument is the object returned from the addBroker call. Errors
+ are ignored.
+ """
if self.console:
for agent in broker.getAgents():
self.console.delAgent(agent)
@@ -2053,6 +2067,13 @@ class Broker(Thread):
self.data = data
def __init__(self, session, host, port, authMechs, authUser, authPass, ssl=False, connTimeout=None):
+ """ Create a broker proxy and setup a connection to the broker. Will raise
+ an exception if the connection fails and the session is not configured to
+ retry connection setup (manageConnections = False).
+
+ Spawns a thread to manage the broker connection. Call _shutdown() to
+ shutdown the thread when releasing the broker.
+ """
Thread.__init__(self)
self.session = session
self.host = host
@@ -2071,6 +2092,8 @@ class Broker(Thread):
self.brokerAgent = None
self.brokerSupportsV2 = None
self.rcv_queue = Queue() # for msg received on session
+ self.conn = None
+ self.amqpSession = None
self.amqpSessionId = "%s.%d.%d" % (platform.uname()[1], os.getpid(), Broker.nextSeq)
Broker.nextSeq += 1
self.last_age_check = time()
@@ -2083,10 +2106,20 @@ class Broker(Thread):
self.start()
if not self.session.manageConnections:
# wait for connection setup to complete in subthread.
- # On failure, propagate exeception to caller
+ # On failure, propagate exception to caller
self.ready.acquire()
if self.conn_exc:
+ self._shutdown() # wait for the subthread to clean up...
raise self.conn_exc
+ # connection up - wait for stable...
+ try:
+ self._waitForStable()
+ agent = self.getBrokerAgent()
+ if agent:
+ agent.getObjects(_class="agent")
+ except:
+ self._shutdown() # wait for the subthread to clean up...
+ raise
def isConnected(self):
@@ -2173,6 +2206,10 @@ class Broker(Thread):
self.cv.release()
def _tryToConnect(self):
+ """ Connect to the broker. Returns True if connection setup completes
+ successfully, otherwise returns False and sets self.error/self.conn_exc
+ with error info. Does not raise exceptions.
+ """
self.error = None
self.conn_exc = None
try:
@@ -2188,6 +2225,20 @@ class Broker(Thread):
self.syncResult = None
self.reqsOutstanding = 1
+ try:
+ if self.amqpSession:
+ self.amqpSession.close()
+ except:
+ pass
+ self.amqpSession = None
+
+ try:
+ if self.conn:
+ self.conn.close()
+ except:
+ pass
+ self.conn = None
+
sock = connect(self.host, self.port)
sock.settimeout(5)
oldTimeout = sock.gettimeout()
@@ -2426,22 +2477,27 @@ class Broker(Thread):
self.amqpSession.message_transfer(destination=dest, message=msg)
def _shutdown(self, _timeout=10):
+ """ Disconnect from a broker, and release its resources. Errors are
+ ignored.
+ """
if self.isAlive():
# kick the thread
self.canceled = True
self.rcv_queue.put(Broker._q_item(Broker._q_item.type_wakeup, None))
self.join(_timeout)
- if self.connected:
- self.amqpSession.incoming("rdest").stop()
- if self.session.console != None:
- self.amqpSession.incoming("tdest").stop()
- if self.brokerSupportsV2:
- self.amqpSession.incoming("v2dest").stop()
- self.amqpSession.incoming("v2TopicUI").stop()
- self.amqpSession.incoming("v2TopicHB").stop()
- self.amqpSession.close()
- self.conn.close()
- self.connected = False
+ try:
+ if self.amqpSession:
+ self.amqpSession.close();
+ except:
+ pass
+ self.amqpSession = None
+ try:
+ if self.conn:
+ self.conn.close()
+ except:
+ pass
+ self.conn = None
+ self.connected = False
def _waitForStable(self):
try:
diff --git a/tools/src/py/qpid-cluster b/tools/src/py/qpid-cluster
index e02cca7a88..b96d0d3694 100755
--- a/tools/src/py/qpid-cluster
+++ b/tools/src/py/qpid-cluster
@@ -90,6 +90,7 @@ class BrokerManager:
self.brokerName = None
self.qmf = None
self.broker = None
+ self.brokers = []
def SetBroker(self, brokerUrl):
self.url = brokerUrl
@@ -101,8 +102,18 @@ class BrokerManager:
self.brokerAgent = a
def Disconnect(self):
- if self.broker:
- self.qmf.delBroker(self.broker)
+ """ Release any allocated brokers. Ignore any failures as the tool is
+ shutting down.
+ """
+ try:
+ if self.broker:
+ self.qmf.delBroker(self.broker)
+ self.broker = None
+ while len(self.brokers):
+ b = self.brokers.pop()
+ self.qmf.delBroker(b)
+ except:
+ pass
def _getClusters(self):
packages = self.qmf.getPackages()
@@ -195,7 +206,6 @@ class BrokerManager:
hostList = self._getHostList(memberList)
self.qmf.delBroker(self.broker)
self.broker = None
- self.brokers = []
idx = 0
for host in hostList:
@@ -238,10 +248,10 @@ class BrokerManager:
if self.config._delConn and not found:
print "Client connection '%s' not found" % self.config._delConn
- for broker in self.brokers:
+ while len(self.brokers):
+ broker = self.brokers.pop()
self.qmf.delBroker(broker)
-
def main(argv=None):
if argv is None: argv = sys.argv
try:
@@ -322,6 +332,7 @@ def main(argv=None):
except KeyboardInterrupt:
print
except Exception,e:
+ bm.Disconnect() # try to deallocate brokers - ignores errors
if str(e).find("connection aborted") > 0:
# we expect this when asking the connected broker to shut down
return 0
diff --git a/tools/src/py/qpid-config b/tools/src/py/qpid-config
index a97b6cba94..cc7debc80a 100755
--- a/tools/src/py/qpid-config
+++ b/tools/src/py/qpid-config
@@ -574,13 +574,16 @@ except KeyboardInterrupt:
print
except IOError, e:
print e
+ bm.Disconnect()
sys.exit(1)
except SystemExit, e:
+ bm.Disconnect()
sys.exit(1)
except Exception,e:
if e.__class__.__name__ != "Timeout":
# ignore Timeout exception, handle in the loop below
print "Failed: %s: %s" % (e.__class__.__name__, e)
+ bm.Disconnect()
sys.exit(1)
while True:
diff --git a/tools/src/py/qpid-printevents b/tools/src/py/qpid-printevents
index ed2155ad22..e14d85e12a 100755
--- a/tools/src/py/qpid-printevents
+++ b/tools/src/py/qpid-printevents
@@ -59,18 +59,20 @@ ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost
console = EventConsole()
session = Session(console, rcvObjects=False, rcvHeartbeats=False, manageConnections=True)
brokers = []
- for host in arguments:
- brokers.append(session.addBroker(host))
-
try:
- while (True):
- sleep(10)
- except KeyboardInterrupt:
- for broker in brokers:
- session.delBroker(broker)
- print
- sys.exit(0)
-
+ for host in arguments:
+ brokers.append(session.addBroker(host))
+ try:
+ while (True):
+ sleep(10)
+ except KeyboardInterrupt:
+ print
+ sys.exit(0)
+ finally:
+ while len(brokers):
+ b = brokers.pop()
+ session.delBroker(b)
+
if __name__ == '__main__':
main()
diff --git a/tools/src/py/qpid-route b/tools/src/py/qpid-route
index ceb0c6313e..be6bdf958c 100755
--- a/tools/src/py/qpid-route
+++ b/tools/src/py/qpid-route
@@ -71,6 +71,7 @@ _connTimeout = 10
class RouteManager:
def __init__(self, localBroker):
+ self.brokerList = {}
self.local = BrokerURL(localBroker)
self.remote = None
self.qmf = Session()
@@ -79,7 +80,16 @@ class RouteManager:
self.agent = self.broker.getBrokerAgent()
def disconnect(self):
- self.qmf.delBroker(self.broker)
+ try:
+ if self.broker:
+ self.qmf.delBroker(self.broker)
+ self.broker = None
+ while len(self.brokerList):
+ b = self.brokerList.popitem()
+ if b[0] != self.local.name():
+ self.qmf.delBroker(b[1])
+ except:
+ pass # ignore errors while shutting down
def getLink(self):
links = self.agent.getObjects(_class="link")
@@ -135,8 +145,7 @@ class RouteManager:
print
print "Finding Linked Brokers:"
- brokerList = {}
- brokerList[self.local.name()] = self.broker
+ self.brokerList[self.local.name()] = self.broker
print " %s... Ok" % self.local
added = True
@@ -145,11 +154,11 @@ class RouteManager:
links = self.qmf.getObjects(_class="link")
for link in links:
url = BrokerURL("%s:%d" % (link.host, link.port))
- if url.name() not in brokerList:
+ if url.name() not in self.brokerList:
print " %s..." % url.name(),
try:
b = self.qmf.addBroker("%s:%d" % (link.host, link.port), _connTimeout)
- brokerList[url.name()] = b
+ self.brokerList[url.name()] = b
added = True
print "Ok"
except Exception, e:
@@ -217,10 +226,10 @@ class RouteManager:
(toUrl, leftType, left, arrow, fromUrl, rightType, right, bridge.key)
print
- for broker in brokerList:
- if broker != self.local.name():
- self.qmf.delBroker(brokerList[broker])
-
+ while len(self.brokerList):
+ b = self.brokerList.popitem()
+ if b[0] != self.local.name():
+ self.qmf.delBroker(b[1])
def addRoute(self, remoteBroker, exchange, routingKey, tag, excludes, dynamic=False):
if dynamic and _srclocal:
@@ -463,6 +472,7 @@ else:
group = cargs[0]
cmd = cargs[1]
+rm = None
try:
rm = RouteManager(localBroker)
if group == "link":
@@ -528,6 +538,8 @@ try:
Usage()
except Exception,e:
+ if rm:
+ rm.disconnect() # try to release broker resources
print "Failed: %s - %s" % (e.__class__.__name__, e)
sys.exit(1)
diff --git a/tools/src/py/qpid-stat b/tools/src/py/qpid-stat
index 848596158f..ae5683ec54 100755
--- a/tools/src/py/qpid-stat
+++ b/tools/src/py/qpid-stat
@@ -161,10 +161,16 @@ class BrokerManager(Console):
self.brokerAgent = a
def Disconnect(self):
- if self.broker:
- self.qmf.delBroker(self.broker)
- else:
- for b in self.brokers: self.qmf.delBroker(b.broker)
+ """ Release any allocated brokers. Ignore any failures as the tool is
+ shutting down.
+ """
+ try:
+ if self.broker:
+ self.qmf.delBroker(self.broker)
+ else:
+ for b in self.brokers: self.qmf.delBroker(b.broker)
+ except:
+ pass
def _getCluster(self):
packages = self.qmf.getPackages()
@@ -524,6 +530,7 @@ except KeyboardInterrupt:
print
except Exception,e:
print "Failed: %s - %s" % (e.__class__.__name__, e)
+ bm.Disconnect() # try to deallocate brokers
raise # FIXME aconway 2010-03-03:
sys.exit(1)
diff --git a/tools/src/py/qpid-tool b/tools/src/py/qpid-tool
index 3a2a340080..cebd865501 100755
--- a/tools/src/py/qpid-tool
+++ b/tools/src/py/qpid-tool
@@ -191,8 +191,12 @@ class QmfData(Console):
self.cli = cli
def close(self):
- self.closing = True
- self.session.delBroker(self.broker)
+ try:
+ self.closing = True
+ if self.session and self.broker:
+ self.session.delBroker(self.broker)
+ except:
+ pass # we're shutting down - ignore any errors
def classCompletions(self, text):
pass
@@ -645,4 +649,8 @@ try:
except KeyboardInterrupt:
print
print "Exiting..."
- data.close()
+except Exception, e:
+ print "Failed: %s - %s" % (e.__class__.__name__, e)
+
+# alway attempt to cleanup broker resources
+data.close()