summaryrefslogtreecommitdiff
path: root/qpid/extras
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2010-09-27 21:41:40 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2010-09-27 21:41:40 +0000
commit26b27fa73febae0c40a80f0a62f2c22ded6cdd6d (patch)
tree3468fef954e329f929d12051f05452d6a850b6e5 /qpid/extras
parent553a232fbb1ca15cbb36d5a5621f6611887a2d93 (diff)
downloadqpid-python-26b27fa73febae0c40a80f0a62f2c22ded6cdd6d.tar.gz
QPID-2862: immediately cancel a pending getObjects or method call if the broker disconnects.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1001917 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/extras')
-rw-r--r--qpid/extras/qmf/src/py/qmf/console.py44
1 files changed, 32 insertions, 12 deletions
diff --git a/qpid/extras/qmf/src/py/qmf/console.py b/qpid/extras/qmf/src/py/qmf/console.py
index 15a58eddd9..f96800db9c 100644
--- a/qpid/extras/qmf/src/py/qmf/console.py
+++ b/qpid/extras/qmf/src/py/qmf/console.py
@@ -442,15 +442,15 @@ class Object(object):
if seq:
if not sync:
return seq
+ self._broker.cv.acquire()
try:
- self._broker.cv.acquire()
starttime = time()
while self._broker.syncInFlight and self._broker.error == None:
self._broker.cv.wait(timeout)
if time() - starttime > timeout:
- self._session.seqMgr._release(seq)
raise RuntimeError("Timed out waiting for method to respond")
finally:
+ self._session.seqMgr._release(seq)
self._broker.cv.release()
if self._broker.error != None:
errorText = self._broker.error
@@ -2497,6 +2497,29 @@ class Broker(Thread):
def _send(self, msg, dest="qpid.management"):
self.amqpSession.message_transfer(destination=dest, message=msg)
+ def _disconnect(self, err_info=None):
+ """ Called when the remote broker has disconnected. Re-initializes all
+ state associated with the broker.
+ """
+ # notify any waiters, and callback
+ self.cv.acquire()
+ try:
+ if err_info is not None:
+ self.error = err_info
+ _agents = self.agents
+ self.agents = {}
+ for agent in _agents.itervalues():
+ agent.close()
+ self.syncInFlight = False
+ self.reqsOutstanding = 0
+ self.cv.notifyAll()
+ finally:
+ self.cv.release()
+
+ if self.session.console:
+ for agent in _agents:
+ self.session.console.delAgent(agent)
+
def _shutdown(self, _timeout=10):
""" Disconnect from a broker, and release its resources. Errors are
ignored.
@@ -2506,6 +2529,10 @@ class Broker(Thread):
self.canceled = True
self.rcv_queue.put(Broker._q_item(Broker._q_item.type_wakeup, None))
self.join(_timeout)
+
+ # abort any pending transactions
+ self._disconnect("broker shutdown")
+
try:
if self.amqpSession:
self.amqpSession.close();
@@ -2729,7 +2756,7 @@ class Broker(Thread):
self.cv.acquire()
try:
self.connected = False
- self.error = data
+ self.error = "exception received from messaging layer: %s" % str(data)
finally:
self.cv.release()
self.rcv_queue.put(Broker._q_item(Broker._q_item.type_wakeup, None))
@@ -2789,15 +2816,8 @@ class Broker(Thread):
item = None
break
- # notify any waiters, and callback
- self.cv.acquire()
- try:
- edata = self.error;
- if self.syncInFlight:
- self.cv.notify()
- finally:
- self.cv.release()
- self.session._handleError(edata)
+ self._disconnect() # clean up any pending agents
+ self.session._handleError(self.error)
self.session._handleBrokerDisconnect(self)
if not self.session.manageConnections: