diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-02-20 15:54:15 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-02-20 15:54:15 +0000 |
| commit | 1080a1c708c09fc2524de3a7d16b46f275e07d24 (patch) | |
| tree | c23b2f5a4b24c584b72d5b412126eb7336ec386e /qpid/java | |
| parent | 1ee9cd5f7d43bd8c8117eab317842dd3783fac63 (diff) | |
| download | qpid-python-1080a1c708c09fc2524de3a7d16b46f275e07d24.tar.gz | |
QPID-5571 : Java Broker AMQP 1.0 deadlock on close consumer
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1570245 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
3 files changed, 52 insertions, 26 deletions
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java index 301dd0695a..434f939a21 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java @@ -300,14 +300,20 @@ public abstract class LinkEndpoint<T extends LinkEventListener> } } - public synchronized boolean isAttached() + public boolean isAttached() { - return _state == State.ATTACHED; + synchronized (getLock()) + { + return _state == State.ATTACHED; + } } - public synchronized boolean isDetached() + public boolean isDetached() { - return _state == State.DETACHED || _session.isEnded(); + synchronized (getLock()) + { + return _state == State.DETACHED || _session.isEnded(); + } } public SessionEndpoint getSession() diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java index 317e184dae..fb4740dd4a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java @@ -96,7 +96,6 @@ public class QueueRunner implements Runnable } finally { - CurrentActor.remove(); _scheduled.compareAndSet(RUNNING, IDLE); final long stateChangeCount = _queue.getStateChangeCount(); _lastRunAgain.set(runAgain); @@ -108,6 +107,7 @@ public class QueueRunner implements Runnable _queue.execute(this); } } + CurrentActor.remove(); } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index 50c0693f31..f757137cf1 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -382,38 +382,58 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS // if not durable or close if(!TerminusDurability.UNSETTLED_STATE.equals(_durability)) { - _consumer.close(); + while(!_consumer.trySendLock()) + { + synchronized (endpoint.getLock()) + { + try + { + endpoint.getLock().wait(100); + } + catch (InterruptedException e) + { + } + } + } + try + { + _consumer.close(); - Modified state = new Modified(); - state.setDeliveryFailed(true); + Modified state = new Modified(); + state.setDeliveryFailed(true); - for(UnsettledAction action : _unsettledActionMap.values()) - { + for(UnsettledAction action : _unsettledActionMap.values()) + { - action.process(state,Boolean.TRUE); - } - _unsettledActionMap.clear(); + action.process(state,Boolean.TRUE); + } + _unsettledActionMap.clear(); - endpoint.close(); + endpoint.close(); - if(_destination instanceof ExchangeDestination - && (_durability == TerminusDurability.CONFIGURATION - || _durability == TerminusDurability.UNSETTLED_STATE)) - { - try + if(_destination instanceof ExchangeDestination + && (_durability == TerminusDurability.CONFIGURATION + || _durability == TerminusDurability.UNSETTLED_STATE)) { - _vhost.removeQueue((AMQQueue)_queue); + try + { + _vhost.removeQueue((AMQQueue)_queue); + } + catch (AccessControlException e) + { + //TODO + _logger.error("Error registering subscription", e); + } } - catch (AccessControlException e) + + if(_closeAction != null) { - //TODO - _logger.error("Error registering subscription", e); + _closeAction.run(); } } - - if(_closeAction != null) + finally { - _closeAction.run(); + _consumer.releaseSendLock(); } } else if(detach == null || detach.getError() != null) |
