summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-02-20 15:54:15 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-02-20 15:54:15 +0000
commit1080a1c708c09fc2524de3a7d16b46f275e07d24 (patch)
treec23b2f5a4b24c584b72d5b412126eb7336ec386e /qpid/java
parent1ee9cd5f7d43bd8c8117eab317842dd3783fac63 (diff)
downloadqpid-python-1080a1c708c09fc2524de3a7d16b46f275e07d24.tar.gz
QPID-5571 : Java Broker AMQP 1.0 deadlock on close consumer
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1570245 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java14
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java2
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java62
3 files changed, 52 insertions, 26 deletions
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java
index 301dd0695a..434f939a21 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java
@@ -300,14 +300,20 @@ public abstract class LinkEndpoint<T extends LinkEventListener>
}
}
- public synchronized boolean isAttached()
+ public boolean isAttached()
{
- return _state == State.ATTACHED;
+ synchronized (getLock())
+ {
+ return _state == State.ATTACHED;
+ }
}
- public synchronized boolean isDetached()
+ public boolean isDetached()
{
- return _state == State.DETACHED || _session.isEnded();
+ synchronized (getLock())
+ {
+ return _state == State.DETACHED || _session.isEnded();
+ }
}
public SessionEndpoint getSession()
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
index 317e184dae..fb4740dd4a 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
@@ -96,7 +96,6 @@ public class QueueRunner implements Runnable
}
finally
{
- CurrentActor.remove();
_scheduled.compareAndSet(RUNNING, IDLE);
final long stateChangeCount = _queue.getStateChangeCount();
_lastRunAgain.set(runAgain);
@@ -108,6 +107,7 @@ public class QueueRunner implements Runnable
_queue.execute(this);
}
}
+ CurrentActor.remove();
}
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index 50c0693f31..f757137cf1 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -382,38 +382,58 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
// if not durable or close
if(!TerminusDurability.UNSETTLED_STATE.equals(_durability))
{
- _consumer.close();
+ while(!_consumer.trySendLock())
+ {
+ synchronized (endpoint.getLock())
+ {
+ try
+ {
+ endpoint.getLock().wait(100);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ }
+ try
+ {
+ _consumer.close();
- Modified state = new Modified();
- state.setDeliveryFailed(true);
+ Modified state = new Modified();
+ state.setDeliveryFailed(true);
- for(UnsettledAction action : _unsettledActionMap.values())
- {
+ for(UnsettledAction action : _unsettledActionMap.values())
+ {
- action.process(state,Boolean.TRUE);
- }
- _unsettledActionMap.clear();
+ action.process(state,Boolean.TRUE);
+ }
+ _unsettledActionMap.clear();
- endpoint.close();
+ endpoint.close();
- if(_destination instanceof ExchangeDestination
- && (_durability == TerminusDurability.CONFIGURATION
- || _durability == TerminusDurability.UNSETTLED_STATE))
- {
- try
+ if(_destination instanceof ExchangeDestination
+ && (_durability == TerminusDurability.CONFIGURATION
+ || _durability == TerminusDurability.UNSETTLED_STATE))
{
- _vhost.removeQueue((AMQQueue)_queue);
+ try
+ {
+ _vhost.removeQueue((AMQQueue)_queue);
+ }
+ catch (AccessControlException e)
+ {
+ //TODO
+ _logger.error("Error registering subscription", e);
+ }
}
- catch (AccessControlException e)
+
+ if(_closeAction != null)
{
- //TODO
- _logger.error("Error registering subscription", e);
+ _closeAction.run();
}
}
-
- if(_closeAction != null)
+ finally
{
- _closeAction.run();
+ _consumer.releaseSendLock();
}
}
else if(detach == null || detach.getError() != null)