summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-03-16 22:03:45 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-03-16 22:03:45 +0000
commitd2e8a2447a4ce4022f2a640d02366bcbf2fa98e4 (patch)
tree89bb6e1668093ef7b15894c4362c9fecfcaf7324
parentc70428e05dabc3093343204b1211e22ecf5e5ed4 (diff)
downloadqpid-python-d2e8a2447a4ce4022f2a640d02366bcbf2fa98e4.tar.gz
QPID-6429 : ensure that when message suspension is set, all targets have finished any in flight deliveries
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1667144 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-xjava/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java24
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java16
2 files changed, 28 insertions, 12 deletions
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
index 5d90cfab19..b448919188 100755
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
@@ -92,15 +92,23 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
{
_messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread() : null);
- if(!messageAssignmentSuspended)
+ for(AMQSessionModel<?,?> session : _connection.getSessionModels())
{
- for(AMQSessionModel<?,?> session : _connection.getSessionModels())
- {
- for(Consumer<?> consumer : session.getConsumers())
- {
- ((ConsumerImpl)consumer).getTarget().notifyCurrentState();
- }
- }
+ for (Consumer<?> consumer : session.getConsumers())
+ {
+ ConsumerImpl consumerImpl = (ConsumerImpl) consumer;
+ if (!messageAssignmentSuspended)
+ {
+ consumerImpl.getTarget().notifyCurrentState();
+ }
+ else
+ {
+ // ensure that by the time the method returns, no consumer can be in the process of
+ // delivering a message.
+ consumerImpl.getSendLock();
+ consumerImpl.releaseSendLock();
+ }
+ }
}
}
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
index 28e15f04d3..0cb83d33a7 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
@@ -175,13 +175,21 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
{
_messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread() : null);
- if(!messageAssignmentSuspended)
+ for(AMQSessionModel<?,?> session : _connection.getSessionModels())
{
- for(AMQSessionModel<?,?> session : _connection.getSessionModels())
+ for(Consumer<?> consumer : session.getConsumers())
{
- for(Consumer<?> consumer : session.getConsumers())
+ ConsumerImpl consumerImpl = (ConsumerImpl) consumer;
+ if (!messageAssignmentSuspended)
{
- ((ConsumerImpl)consumer).getTarget().notifyCurrentState();
+ consumerImpl.getTarget().notifyCurrentState();
+ }
+ else
+ {
+ // ensure that by the time the method returns, no consumer can be in the process of
+ // delivering a message.
+ consumerImpl.getSendLock();
+ consumerImpl.releaseSendLock();
}
}
}