diff options
| author | Keith Wall <kwall@apache.org> | 2014-11-06 14:28:30 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2014-11-06 14:28:30 +0000 |
| commit | e054c7a1153c0b112efda73c524e6da3c1b252f6 (patch) | |
| tree | 9383eb3356a047c00e940d894d67c38ef8481694 /qpid/java | |
| parent | cf5cb805e3b72258a0d4e2ac92d6c54a5aa9ca28 (diff) | |
| download | qpid-python-e054c7a1153c0b112efda73c524e6da3c1b252f6.tar.gz | |
QPID-6125: [Java Broker] Partial revert so that closeConnection will once again await for an already closing connection to become closed
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1637118 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
| -rw-r--r-- | qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java | 75 |
1 files changed, 44 insertions, 31 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index b2b2f904a6..8b29c33e52 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -103,6 +103,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 65 * 1024; public static final String BROKER_DEBUG_BINARY_DATA_LENGTH = "broker.debug.binaryDataLength"; public static final int DEFAULT_DEBUG_BINARY_DATA_LENGTH = 80; + private static final long AWAIT_CLOSED_TIMEOUT = 60000; private final AmqpPort<?> _port; private final long _creationTime; @@ -765,7 +766,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, } } - /** This must be called when the session is _closed in order to free up any resources managed by the session. */ public void closeSession() { @@ -813,34 +813,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, } else { - synchronized(this) - { - - boolean lockHeld = _receivedLock.isHeldByCurrentThread(); - - while(!_closed) - { - try - { - if(lockHeld) - { - _receivedLock.unlock(); - } - wait(1000); - } - catch (InterruptedException e) - { - // do nothing - } - finally - { - if(lockHeld) - { - _receivedLock.lock(); - } - } - } - } + awaitClosed(); } } else @@ -858,6 +831,44 @@ public class AMQProtocolEngine implements ServerProtocolEngine, } } + private void awaitClosed() + { + synchronized(this) + { + final boolean lockHeld = _receivedLock.isHeldByCurrentThread(); + final long endTime = System.currentTimeMillis() + AWAIT_CLOSED_TIMEOUT; + + while(!_closed && endTime > System.currentTimeMillis()) + { + try + { + if(lockHeld) + { + _receivedLock.unlock(); + } + wait(1000); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + break; + } + finally + { + if(lockHeld) + { + _receivedLock.lock(); + } + } + } + + if (!_closed) + { + throw new ConnectionScopedRuntimeException("Connection " + this + " failed to become closed within " + AWAIT_CLOSED_TIMEOUT + "ms."); + } + } + } + private void closeConnection(int channelId, AMQConnectionException e) { @@ -901,8 +912,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, } } } - - + else + { + awaitClosed(); + } } public void closeProtocolSession() |
