From da8935e0491a2b8e9edb6671e5874b77516ee2a9 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Tue, 10 Feb 2015 09:32:24 +0000 Subject: QPID-6374 : Reorder ock aquisition and remove synchronization where it is not desired to reduce deadlocks git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1658652 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQConnection.java | 109 ++++++++++---------- .../java/org/apache/qpid/client/AMQSession.java | 111 ++++++++++----------- .../apache/qpid/client/BasicMessageConsumer.java | 41 ++++---- 3 files changed, 133 insertions(+), 128 deletions(-) (limited to 'qpid/java/client') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 6c37462011..c7fcde824a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -865,13 +865,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public void close(long timeout) throws JMSException { - close(new ArrayList(_sessions.values()), timeout); - } + boolean closed; - public void close(List sessions, long timeout) throws JMSException - { - if (!setClosed()) + synchronized (_sessionCreationLock) + { + closed = setClosed(); + } + + if (!closed) { + List sessions = new ArrayList<>(_sessions.values()); + setClosing(true); try { @@ -886,54 +890,52 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private void doClose(List sessions, long timeout) throws JMSException { - synchronized (_sessionCreationLock) + if (!sessions.isEmpty()) { - if (!sessions.isEmpty()) + AMQSession session = sessions.remove(0); + synchronized (session.getMessageDeliveryLock()) { - AMQSession session = sessions.remove(0); - synchronized (session.getMessageDeliveryLock()) - { - doClose(sessions, timeout); - } + doClose(sessions, timeout); } - else + } + else + { + synchronized (getFailoverMutex()) { - synchronized (getFailoverMutex()) + try { try { - try - { - closeAllSessions(null, timeout); - } - finally - { - //This MUST occur after we have successfully closed all Channels/Sessions - shutdownTaskPool(timeout); - } + closeAllSessions(null, timeout); } - catch (JMSException e) + finally { - _logger.error("Error closing connection", e); - JMSException jmse = new JMSException("Error closing connection: " + e); - jmse.setLinkedException(e); - jmse.initCause(e); - throw jmse; + //This MUST occur after we have successfully closed all Channels/Sessions + shutdownTaskPool(timeout); } - finally + } + catch (JMSException e) + { + _logger.error("Error closing connection", e); + JMSException jmse = new JMSException("Error closing connection: " + e); + jmse.setLinkedException(e); + jmse.initCause(e); + throw jmse; + } + finally + { + try { - try - { - _delegate.closeConnection(timeout); - } - catch (Exception e) - { - _logger.warn("Error closing underlying protocol connection", e); - } + _delegate.closeConnection(timeout); + } + catch (Exception e) + { + _logger.warn("Error closing underlying protocol connection", e); } } } } + } private void shutdownTaskPool(final long timeout) @@ -1308,28 +1310,29 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect try { - // get the failover mutex before trying to close - synchronized (getFailoverMutex()) + // decide if we are going to close the session + if (hardError(cause)) { - // decide if we are going to close the session - if (hardError(cause)) + closer = (!setClosed()) || closer; { - closer = (!setClosed()) || closer; - { - _logger.info("Closing AMQConnection due to :" + cause); - } - } - else - { - _logger.info("Not a hard-error connection not closing: " + cause); + _logger.info("Closing AMQConnection due to :" + cause); } + } + else + { + _logger.info("Not a hard-error connection not closing: " + cause); + } + - // if we are closing the connection, close sessions first - if (closer) + // if we are closing the connection, close sessions first + if (closer) + { + // get the failover mutex before trying to close + synchronized (getFailoverMutex()) { try { - closeAllSessions(cause, -1); // FIXME: when doing this end up with RejectedExecutionException from executor. + closeAllSessions(cause, -1); } catch (JMSException e) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 29460bb42d..12e9285af8 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -808,7 +808,16 @@ public abstract class AMQSession extends Closeable implements Messa if (sendClose) { + // The Synchronized block only needs to protect network traffic. - synchronized (_connection.getFailoverMutex()) + + try { - try + // If the session is open or we are in the process + // of closing the session then send a cance + // no point otherwise as the connection will be gone + if (!_session.isClosed() || _session.isClosing()) { - // If the session is open or we are in the process - // of closing the session then send a cance - // no point otherwise as the connection will be gone - if (!_session.isClosed() || _session.isClosing()) + synchronized(_session.getMessageDeliveryLock()) { - synchronized(_session.getMessageDeliveryLock()) + synchronized (_connection.getFailoverMutex()) { sendCancel(); } } } - catch (AMQException e) - { - throw new JMSAMQException("Error closing consumer: " + e, e); - } - catch (FailoverException e) - { - throw new JMSAMQException("FailoverException interrupted basic cancel.", e); - } - catch (TransportException e) - { - throw _session.toJMSException("Exception while closing consumer: " + e.getMessage(), e); - } } + catch (AMQException e) + { + throw new JMSAMQException("Error closing consumer: " + e, e); + } + catch (FailoverException e) + { + throw new JMSAMQException("FailoverException interrupted basic cancel.", e); + } + catch (TransportException e) + { + throw _session.toJMSException("Exception while closing consumer: " + e.getMessage(), e); + } + } else { -- cgit v1.2.1