diff options
| author | Keith Wall <kwall@apache.org> | 2015-02-12 12:00:06 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2015-02-12 12:00:06 +0000 |
| commit | 825aceb7e885c793309557a3a886f10c475c4c1c (patch) | |
| tree | 11a5b5926a714c956f6cd77f56b373e96755df13 /qpid/java | |
| parent | 5701b4ba67b8e475326acfc9f28735aead8d9dfc (diff) | |
| download | qpid-python-825aceb7e885c793309557a3a886f10c475c4c1c.tar.gz | |
broswer consumer close is now pulled by IO rather than pushed by queue, fixing browser tests
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1659232 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
11 files changed, 36 insertions, 11 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java index e32613d700..be4ac9d427 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java @@ -54,14 +54,18 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget } @Override - public void processPendingMessages() + public void processPending() { while(hasMessagesToSend()) { sendNextMessage(); } + + processClosed(); } + protected abstract void processClosed(); + @Override public final boolean isSuspended() { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java index 32b12d2a44..cef566793f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java @@ -33,7 +33,7 @@ public interface ConsumerTarget void removeStateChangeListener(StateChangeListener<ConsumerTarget, State> listener); - void processPendingMessages(); + void processPending(); enum State { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index a545ce6e10..d3ce911406 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -2141,7 +2141,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> if (consumerDone) { sub.flushBatched(); - if (lastLoop && !sub.isSuspended()) + boolean noMore = getNextAvailableEntry(sub) == null; + if (lastLoop && noMore) { sub.queueEmpty(); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java index 53f1632933..caba0bd1d8 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java @@ -249,7 +249,7 @@ public class MockConsumer implements ConsumerTarget } @Override - public void processPendingMessages() + public void processPending() { } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java index 209f6663ec..a540318452 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java @@ -661,4 +661,10 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC { return _unacknowledgedCount.longValue(); } + + @Override + protected void processClosed() + { + + } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index 3659d6ce01..579c885053 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -1140,7 +1140,7 @@ public class ServerSession extends Session { for(ConsumerTarget target : getSubscriptions()) { - target.processPendingMessages(); + target.processPending(); } } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index be28024d13..c0cc7d55b0 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -3613,7 +3613,7 @@ public class AMQChannel for(ConsumerTarget target : _tag2SubscriptionTargetMap.values()) { - target.processPendingMessages(); + target.processPending(); } } } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java index f9560fa0d2..2bf2fc6d27 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java @@ -75,6 +75,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen private final AtomicLong _unacknowledgedCount = new AtomicLong(0); private final AtomicLong _unacknowledgedBytes = new AtomicLong(0); private final List<ConsumerImpl> _consumers = new CopyOnWriteArrayList<>(); + private final AtomicBoolean _needToClose = new AtomicBoolean(); public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel, @@ -513,6 +514,15 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen { if (isAutoClose()) { + _needToClose.set(true); + } + } + + @Override + protected void processClosed() + { + if (_needToClose.get() && getState() != State.CLOSED) + { close(); confirmAutoClose(); } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java index ebd23f31ae..f06f70b362 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java @@ -535,4 +535,9 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget return 0; } + @Override + protected void processClosed() + { + + } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index dd03469d0f..0b613d9a5a 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -905,7 +905,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio for(Consumer<?> consumer : getConsumers()) { - ((ConsumerImpl)consumer).getTarget().processPendingMessages(); + ((ConsumerImpl)consumer).getTarget().processPending(); } } diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java index 6b6b4a7b3c..8a4e22783f 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java @@ -148,7 +148,6 @@ public class QueueBrowserAutoAckTest extends QpidBrokerTestCase assertEquals("Session reports Queue expectedDepth not as expected", expectedDepth, queueDepth); - // Browse the queue to get a second opinion int msgCount = 0; Enumeration msgs = queueBrowser.getEnumeration(); @@ -268,7 +267,7 @@ public class QueueBrowserAutoAckTest extends QpidBrokerTestCase //validate all browsers get right message count. for (int count = 0; count < browserEnumerationCount; count++) { - assertEquals(msgCount[count], expectedMessages); + assertEquals("Unexpected count for browser " + count, expectedMessages, msgCount[count]); } try @@ -317,7 +316,7 @@ public class QueueBrowserAutoAckTest extends QpidBrokerTestCase //Close this new connection connection.close(); - _logger.info("All messages recevied from queue"); + _logger.info("All messages received from queue"); //ensure no message left. checkQueueDepth(0); @@ -344,7 +343,7 @@ public class QueueBrowserAutoAckTest extends QpidBrokerTestCase /* * Test Messages Remain on Queue - * Create a queu and send messages to it. Browse them and then receive them all to verify they were still there + * Create a queue and send messages to it. Browse them and then receive them all to verify they were still there * */ public void testQueueBrowserMsgsRemainOnQueue() throws Exception |
