diff options
| author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-09-27 21:41:40 +0000 |
|---|---|---|
| committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-09-27 21:41:40 +0000 |
| commit | 26b27fa73febae0c40a80f0a62f2c22ded6cdd6d (patch) | |
| tree | 3468fef954e329f929d12051f05452d6a850b6e5 /qpid/extras | |
| parent | 553a232fbb1ca15cbb36d5a5621f6611887a2d93 (diff) | |
| download | qpid-python-26b27fa73febae0c40a80f0a62f2c22ded6cdd6d.tar.gz | |
QPID-2862: immediately cancel a pending getObjects or method call if the broker disconnects.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1001917 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/extras')
| -rw-r--r-- | qpid/extras/qmf/src/py/qmf/console.py | 44 |
1 files changed, 32 insertions, 12 deletions
diff --git a/qpid/extras/qmf/src/py/qmf/console.py b/qpid/extras/qmf/src/py/qmf/console.py index 15a58eddd9..f96800db9c 100644 --- a/qpid/extras/qmf/src/py/qmf/console.py +++ b/qpid/extras/qmf/src/py/qmf/console.py @@ -442,15 +442,15 @@ class Object(object): if seq: if not sync: return seq + self._broker.cv.acquire() try: - self._broker.cv.acquire() starttime = time() while self._broker.syncInFlight and self._broker.error == None: self._broker.cv.wait(timeout) if time() - starttime > timeout: - self._session.seqMgr._release(seq) raise RuntimeError("Timed out waiting for method to respond") finally: + self._session.seqMgr._release(seq) self._broker.cv.release() if self._broker.error != None: errorText = self._broker.error @@ -2497,6 +2497,29 @@ class Broker(Thread): def _send(self, msg, dest="qpid.management"): self.amqpSession.message_transfer(destination=dest, message=msg) + def _disconnect(self, err_info=None): + """ Called when the remote broker has disconnected. Re-initializes all + state associated with the broker. + """ + # notify any waiters, and callback + self.cv.acquire() + try: + if err_info is not None: + self.error = err_info + _agents = self.agents + self.agents = {} + for agent in _agents.itervalues(): + agent.close() + self.syncInFlight = False + self.reqsOutstanding = 0 + self.cv.notifyAll() + finally: + self.cv.release() + + if self.session.console: + for agent in _agents: + self.session.console.delAgent(agent) + def _shutdown(self, _timeout=10): """ Disconnect from a broker, and release its resources. Errors are ignored. @@ -2506,6 +2529,10 @@ class Broker(Thread): self.canceled = True self.rcv_queue.put(Broker._q_item(Broker._q_item.type_wakeup, None)) self.join(_timeout) + + # abort any pending transactions + self._disconnect("broker shutdown") + try: if self.amqpSession: self.amqpSession.close(); @@ -2729,7 +2756,7 @@ class Broker(Thread): self.cv.acquire() try: self.connected = False - self.error = data + self.error = "exception received from messaging layer: %s" % str(data) finally: self.cv.release() self.rcv_queue.put(Broker._q_item(Broker._q_item.type_wakeup, None)) @@ -2789,15 +2816,8 @@ class Broker(Thread): item = None break - # notify any waiters, and callback - self.cv.acquire() - try: - edata = self.error; - if self.syncInFlight: - self.cv.notify() - finally: - self.cv.release() - self.session._handleError(edata) + self._disconnect() # clean up any pending agents + self.session._handleError(self.error) self.session._handleBrokerDisconnect(self) if not self.session.manageConnections: |
