diff options
| author | Keith Wall <kwall@apache.org> | 2015-02-12 15:18:16 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2015-02-12 15:18:16 +0000 |
| commit | e243745d439671418016a2be1570209269b45070 (patch) | |
| tree | 7f17111b385b7759869e2e5594e591979097cc6f /qpid/java/broker-plugins | |
| parent | 825aceb7e885c793309557a3a886f10c475c4c1c (diff) | |
| download | qpid-python-e243745d439671418016a2be1570209269b45070.tar.gz | |
0-10 queue browser fix.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1659288 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
3 files changed, 14 insertions, 9 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java index 5f99ddf56c..0d1fcb008a 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java @@ -61,7 +61,8 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol private long _lastWriteTime = _createTime; private volatile boolean _transportBlockedForWriting; - private volatile boolean _messageAssignmentSuspended; + private final AtomicReference<Thread> _messageAssignmentSuspended = new AtomicReference<>(); + private final AtomicBoolean _stateChanged = new AtomicBoolean(); private final AtomicReference<Action<ServerProtocolEngine>> _workListener = new AtomicReference<>(); @@ -81,13 +82,15 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol @Override public boolean isMessageAssignmentSuspended() { - return _messageAssignmentSuspended; + Thread lock = _messageAssignmentSuspended.get(); + return lock != null && _messageAssignmentSuspended.get() != Thread.currentThread(); } @Override public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended) { - _messageAssignmentSuspended = messageAssignmentSuspended; + _messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread() : null); + if(!messageAssignmentSuspended) { for(AMQSessionModel<?,?> session : _connection.getSessionModels()) diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index f8ca3077b4..88412c3b70 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -209,19 +209,20 @@ public class AMQProtocolEngine implements ServerProtocolEngine, private long _maxMessageSize; private volatile boolean _transportBlockedForWriting; - private volatile boolean _messageAssignmentSuspended; + private final AtomicReference<Thread> _messageAssignmentSuspended = new AtomicReference<>(); @Override public boolean isMessageAssignmentSuspended() { - return _messageAssignmentSuspended; + Thread lock = _messageAssignmentSuspended.get(); + return lock != null && _messageAssignmentSuspended.get() != Thread.currentThread(); } @Override public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended) { - _messageAssignmentSuspended = messageAssignmentSuspended; + _messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread() : null); if(!messageAssignmentSuspended) { for(AMQSessionModel<?,?> session : getSessionModels()) diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java index 3d7712d1ad..0078235990 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java @@ -144,7 +144,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut private State _state = State.A; - private volatile boolean _messageAssignmentSuspended; + private final AtomicReference<Thread> _messageAssignmentSuspended = new AtomicReference<>(); @@ -166,13 +166,14 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut @Override public boolean isMessageAssignmentSuspended() { - return _messageAssignmentSuspended; + Thread lock = _messageAssignmentSuspended.get(); + return lock != null && _messageAssignmentSuspended.get() != Thread.currentThread(); } @Override public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended) { - _messageAssignmentSuspended = messageAssignmentSuspended; + _messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread() : null); if(!messageAssignmentSuspended) { |
