summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java')
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java24
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java16
2 files changed, 28 insertions, 12 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
index 5d90cfab19..b448919188 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
@@ -92,15 +92,23 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
{
_messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread() : null);
- if(!messageAssignmentSuspended)
+ for(AMQSessionModel<?,?> session : _connection.getSessionModels())
{
- for(AMQSessionModel<?,?> session : _connection.getSessionModels())
- {
- for(Consumer<?> consumer : session.getConsumers())
- {
- ((ConsumerImpl)consumer).getTarget().notifyCurrentState();
- }
- }
+ for (Consumer<?> consumer : session.getConsumers())
+ {
+ ConsumerImpl consumerImpl = (ConsumerImpl) consumer;
+ if (!messageAssignmentSuspended)
+ {
+ consumerImpl.getTarget().notifyCurrentState();
+ }
+ else
+ {
+ // ensure that by the time the method returns, no consumer can be in the process of
+ // delivering a message.
+ consumerImpl.getSendLock();
+ consumerImpl.releaseSendLock();
+ }
+ }
}
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
index 28e15f04d3..0cb83d33a7 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
@@ -175,13 +175,21 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
{
_messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread() : null);
- if(!messageAssignmentSuspended)
+ for(AMQSessionModel<?,?> session : _connection.getSessionModels())
{
- for(AMQSessionModel<?,?> session : _connection.getSessionModels())
+ for(Consumer<?> consumer : session.getConsumers())
{
- for(Consumer<?> consumer : session.getConsumers())
+ ConsumerImpl consumerImpl = (ConsumerImpl) consumer;
+ if (!messageAssignmentSuspended)
{
- ((ConsumerImpl)consumer).getTarget().notifyCurrentState();
+ consumerImpl.getTarget().notifyCurrentState();
+ }
+ else
+ {
+ // ensure that by the time the method returns, no consumer can be in the process of
+ // delivering a message.
+ consumerImpl.getSendLock();
+ consumerImpl.releaseSendLock();
}
}
}