summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2015-02-12 12:00:06 +0000
committerKeith Wall <kwall@apache.org>2015-02-12 12:00:06 +0000
commit825aceb7e885c793309557a3a886f10c475c4c1c (patch)
tree11a5b5926a714c956f6cd77f56b373e96755df13 /qpid/java
parent5701b4ba67b8e475326acfc9f28735aead8d9dfc (diff)
downloadqpid-python-825aceb7e885c793309557a3a886f10c475c4c1c.tar.gz
broswer consumer close is now pulled by IO rather than pushed by queue, fixing browser tests
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1659232 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java3
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java6
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java10
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java5
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java2
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java7
11 files changed, 36 insertions, 11 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
index e32613d700..be4ac9d427 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
@@ -54,14 +54,18 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget
}
@Override
- public void processPendingMessages()
+ public void processPending()
{
while(hasMessagesToSend())
{
sendNextMessage();
}
+
+ processClosed();
}
+ protected abstract void processClosed();
+
@Override
public final boolean isSuspended()
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
index 32b12d2a44..cef566793f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
@@ -33,7 +33,7 @@ public interface ConsumerTarget
void removeStateChangeListener(StateChangeListener<ConsumerTarget, State> listener);
- void processPendingMessages();
+ void processPending();
enum State
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index a545ce6e10..d3ce911406 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -2141,7 +2141,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
if (consumerDone)
{
sub.flushBatched();
- if (lastLoop && !sub.isSuspended())
+ boolean noMore = getNextAvailableEntry(sub) == null;
+ if (lastLoop && noMore)
{
sub.queueEmpty();
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
index 53f1632933..caba0bd1d8 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
@@ -249,7 +249,7 @@ public class MockConsumer implements ConsumerTarget
}
@Override
- public void processPendingMessages()
+ public void processPending()
{
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index 209f6663ec..a540318452 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -661,4 +661,10 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
{
return _unacknowledgedCount.longValue();
}
+
+ @Override
+ protected void processClosed()
+ {
+
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 3659d6ce01..579c885053 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -1140,7 +1140,7 @@ public class ServerSession extends Session
{
for(ConsumerTarget target : getSubscriptions())
{
- target.processPendingMessages();
+ target.processPending();
}
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index be28024d13..c0cc7d55b0 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -3613,7 +3613,7 @@ public class AMQChannel
for(ConsumerTarget target : _tag2SubscriptionTargetMap.values())
{
- target.processPendingMessages();
+ target.processPending();
}
}
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
index f9560fa0d2..2bf2fc6d27 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
@@ -75,6 +75,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
private final List<ConsumerImpl> _consumers = new CopyOnWriteArrayList<>();
+ private final AtomicBoolean _needToClose = new AtomicBoolean();
public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel,
@@ -513,6 +514,15 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
{
if (isAutoClose())
{
+ _needToClose.set(true);
+ }
+ }
+
+ @Override
+ protected void processClosed()
+ {
+ if (_needToClose.get() && getState() != State.CLOSED)
+ {
close();
confirmAutoClose();
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index ebd23f31ae..f06f70b362 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -535,4 +535,9 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
return 0;
}
+ @Override
+ protected void processClosed()
+ {
+
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index dd03469d0f..0b613d9a5a 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -905,7 +905,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
for(Consumer<?> consumer : getConsumers())
{
- ((ConsumerImpl)consumer).getTarget().processPendingMessages();
+ ((ConsumerImpl)consumer).getTarget().processPending();
}
}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
index 6b6b4a7b3c..8a4e22783f 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
@@ -148,7 +148,6 @@ public class QueueBrowserAutoAckTest extends QpidBrokerTestCase
assertEquals("Session reports Queue expectedDepth not as expected", expectedDepth, queueDepth);
-
// Browse the queue to get a second opinion
int msgCount = 0;
Enumeration msgs = queueBrowser.getEnumeration();
@@ -268,7 +267,7 @@ public class QueueBrowserAutoAckTest extends QpidBrokerTestCase
//validate all browsers get right message count.
for (int count = 0; count < browserEnumerationCount; count++)
{
- assertEquals(msgCount[count], expectedMessages);
+ assertEquals("Unexpected count for browser " + count, expectedMessages, msgCount[count]);
}
try
@@ -317,7 +316,7 @@ public class QueueBrowserAutoAckTest extends QpidBrokerTestCase
//Close this new connection
connection.close();
- _logger.info("All messages recevied from queue");
+ _logger.info("All messages received from queue");
//ensure no message left.
checkQueueDepth(0);
@@ -344,7 +343,7 @@ public class QueueBrowserAutoAckTest extends QpidBrokerTestCase
/*
* Test Messages Remain on Queue
- * Create a queu and send messages to it. Browse them and then receive them all to verify they were still there
+ * Create a queue and send messages to it. Browse them and then receive them all to verify they were still there
*
*/
public void testQueueBrowserMsgsRemainOnQueue() throws Exception