diff options
| author | Keith Wall <kwall@apache.org> | 2012-07-12 12:19:34 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2012-07-12 12:19:34 +0000 |
| commit | 38c28485ec354e20c5d5365ed34ca2076177e5ef (patch) | |
| tree | d47b3b83e3d6ce8066b27ef14bb315ee9fd43979 /java | |
| parent | 26aad66b25a1be097c28486e22335704d5786c9f (diff) | |
| download | qpid-python-38c28485ec354e20c5d5365ed34ca2076177e5ef.tar.gz | |
QPID-4131: On 0-8...0-9-1 code path broker now closes the connection when the housekeeping thread times out a transaction. AMQChannel now uses AMQProtocolEngine's _receivedLock so that this connection-closing is thread-safe. This gives better compatibility with older clients that do not hand session closes correctly. 0-10 behaviour unaffected by this change.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1360651 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
3 files changed, 40 insertions, 21 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index fead99a79d..a6218b8255 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -36,6 +36,8 @@ import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; + import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; @@ -1552,18 +1554,37 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm _logger.warn("OPEN TRANSACTION ALERT " + _logSubject.toString() + " " + openTime + " ms"); } - // Close session for idle or open transactions that have timed out + // Close _connection_ for idle or open transactions that have timed out (this is different + // than the 0-10 code path which closes the session). if (idleClose > 0L && idleTime > idleClose) { - getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out"); + closeConnection("Idle transaction timed out"); } else if (openClose > 0L && openTime > openClose) { - getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out"); + closeConnection("Open transaction timed out"); } } } + /** + * Typically called from the HouseKeepingThread instead of the main receiver thread, + * therefore uses a lock to close the connection in a thread-safe manner. + */ + private void closeConnection(String reason) throws AMQException + { + Lock receivedLock = _session.getReceivedLock(); + receivedLock.lock(); + try + { + _session.close(AMQConstant.RESOURCE_ERROR, reason); + } + finally + { + receivedLock.unlock(); + } + } + public void deadLetter(long deliveryTag) throws AMQException { final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index de61a03aff..1e649c3cb7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -1323,25 +1323,17 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException { - _receivedLock.lock(); - try - { - int channelId = ((AMQChannel)session).getChannelId(); - closeChannel(channelId); + int channelId = ((AMQChannel)session).getChannelId(); + closeChannel(channelId); - MethodRegistry methodRegistry = getMethodRegistry(); - ChannelCloseBody responseBody = - methodRegistry.createChannelCloseBody( - cause.getCode(), - new AMQShortString(message), - 0,0); + MethodRegistry methodRegistry = getMethodRegistry(); + ChannelCloseBody responseBody = + methodRegistry.createChannelCloseBody( + cause.getCode(), + new AMQShortString(message), + 0,0); - writeFrame(responseBody.generateFrame(channelId)); - } - finally - { - _receivedLock.unlock(); - } + writeFrame(responseBody.generateFrame(channelId)); } public void close(AMQConstant cause, String message) throws AMQException @@ -1497,4 +1489,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi { return _reference; } + + public Lock getReceivedLock() + { + return _receivedLock; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index 01666ca58b..ba806c04bd 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -23,11 +23,11 @@ package org.apache.qpid.server.protocol; import java.net.SocketAddress; import java.security.Principal; import java.util.List; +import java.util.concurrent.locks.Lock; import javax.security.auth.Subject; import javax.security.sasl.SaslServer; -import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; @@ -217,4 +217,5 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth public Principal getPeerPrincipal(); + Lock getReceivedLock(); } |
