summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2015-02-12 15:18:16 +0000
committerKeith Wall <kwall@apache.org>2015-02-12 15:18:16 +0000
commite243745d439671418016a2be1570209269b45070 (patch)
tree7f17111b385b7759869e2e5594e591979097cc6f /qpid/java/broker-plugins
parent825aceb7e885c793309557a3a886f10c475c4c1c (diff)
downloadqpid-python-e243745d439671418016a2be1570209269b45070.tar.gz
0-10 queue browser fix.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1659288 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java9
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java7
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java7
3 files changed, 14 insertions, 9 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
index 5f99ddf56c..0d1fcb008a 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
@@ -61,7 +61,8 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
private long _lastWriteTime = _createTime;
private volatile boolean _transportBlockedForWriting;
- private volatile boolean _messageAssignmentSuspended;
+ private final AtomicReference<Thread> _messageAssignmentSuspended = new AtomicReference<>();
+
private final AtomicBoolean _stateChanged = new AtomicBoolean();
private final AtomicReference<Action<ServerProtocolEngine>> _workListener = new AtomicReference<>();
@@ -81,13 +82,15 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
@Override
public boolean isMessageAssignmentSuspended()
{
- return _messageAssignmentSuspended;
+ Thread lock = _messageAssignmentSuspended.get();
+ return lock != null && _messageAssignmentSuspended.get() != Thread.currentThread();
}
@Override
public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended)
{
- _messageAssignmentSuspended = messageAssignmentSuspended;
+ _messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread() : null);
+
if(!messageAssignmentSuspended)
{
for(AMQSessionModel<?,?> session : _connection.getSessionModels())
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index f8ca3077b4..88412c3b70 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -209,19 +209,20 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
private long _maxMessageSize;
private volatile boolean _transportBlockedForWriting;
- private volatile boolean _messageAssignmentSuspended;
+ private final AtomicReference<Thread> _messageAssignmentSuspended = new AtomicReference<>();
@Override
public boolean isMessageAssignmentSuspended()
{
- return _messageAssignmentSuspended;
+ Thread lock = _messageAssignmentSuspended.get();
+ return lock != null && _messageAssignmentSuspended.get() != Thread.currentThread();
}
@Override
public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended)
{
- _messageAssignmentSuspended = messageAssignmentSuspended;
+ _messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread() : null);
if(!messageAssignmentSuspended)
{
for(AMQSessionModel<?,?> session : getSessionModels())
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
index 3d7712d1ad..0078235990 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
@@ -144,7 +144,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
private State _state = State.A;
- private volatile boolean _messageAssignmentSuspended;
+ private final AtomicReference<Thread> _messageAssignmentSuspended = new AtomicReference<>();
@@ -166,13 +166,14 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
@Override
public boolean isMessageAssignmentSuspended()
{
- return _messageAssignmentSuspended;
+ Thread lock = _messageAssignmentSuspended.get();
+ return lock != null && _messageAssignmentSuspended.get() != Thread.currentThread();
}
@Override
public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended)
{
- _messageAssignmentSuspended = messageAssignmentSuspended;
+ _messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread() : null);
if(!messageAssignmentSuspended)
{