summaryrefslogtreecommitdiff
path: root/qpid/java/broker
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2008-09-18 16:12:46 +0000
committerAidan Skinner <aidan@apache.org>2008-09-18 16:12:46 +0000
commit09f60acd6ba474bfeed068f10d966938f806ff77 (patch)
tree6f3eaf7a84674f5f8bcb1c7b3211770d0b245907 /qpid/java/broker
parentb208766dcaf114eac162d6f230fb05370b01e04b (diff)
downloadqpid-python-09f60acd6ba474bfeed068f10d966938f806ff77.tar.gz
QPID-1286: make sure priority queues don't mess with deleted subscriptions
AMQPriorityQueue: don't advance deleted subscriptions AMQPriorityQueueTest: Add test class for priority queues SimpleAMQQueueTest: Add more tests PriorityTest: Check for more message orders git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@696686 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java6
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java90
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java236
3 files changed, 304 insertions, 28 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
index e14ed0f41d..d7aef34a10 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
@@ -52,8 +52,12 @@ public class AMQPriorityQueue extends SimpleAMQQueue
while(subIter.advance() && !entry.isAcquired())
{
final Subscription subscription = subIter.getNode().getSubscription();
+ if (subIter.getNode().isDeleted())
+ {
+ continue;
+ }
QueueEntry subnode = subscription.getLastSeenEntry();
- while((entry.compareTo(subnode) < 0) && !entry.isAcquired())
+ while(entry.compareTo(subnode) < 0 && !entry.isAcquired())
{
if(subscription.setLastSeenEntry(subnode,entry))
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
new file mode 100644
index 0000000000..e479a1e489
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
@@ -0,0 +1,90 @@
+package org.apache.qpid.server.queue;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import java.util.ArrayList;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.FieldTable;
+
+public class AMQPriorityQueueTest extends SimpleAMQQueueTest
+{
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ arguments = new FieldTable();
+ arguments.put(AMQQueueFactory.X_QPID_PRIORITIES, 3);
+ super.setUp();
+ }
+
+ public void testPriorityOrdering() throws AMQException, InterruptedException
+ {
+
+ // Enqueue messages in order
+ queue.enqueue(null, createMessage(1L, (byte) 10));
+ queue.enqueue(null, createMessage(2L, (byte) 4));
+ queue.enqueue(null, createMessage(3L, (byte) 0));
+
+ // Enqueue messages in reverse order
+ queue.enqueue(null, createMessage(4L, (byte) 0));
+ queue.enqueue(null, createMessage(5L, (byte) 4));
+ queue.enqueue(null, createMessage(6L, (byte) 10));
+
+ // Enqueue messages out of order
+ queue.enqueue(null, createMessage(7L, (byte) 4));
+ queue.enqueue(null, createMessage(8L, (byte) 10));
+ queue.enqueue(null, createMessage(9L, (byte) 0));
+
+ // Register subscriber
+ queue.registerSubscription(subscription, false);
+ Thread.sleep(150);
+
+ ArrayList<QueueEntry> msgs = subscription.getMessages();
+ assertEquals(new Long(1L), msgs.get(0).getMessage().getMessageId());
+ assertEquals(new Long(6L), msgs.get(1).getMessage().getMessageId());
+ assertEquals(new Long(8L), msgs.get(2).getMessage().getMessageId());
+
+ assertEquals(new Long(2L), msgs.get(3).getMessage().getMessageId());
+ assertEquals(new Long(5L), msgs.get(4).getMessage().getMessageId());
+ assertEquals(new Long(7L), msgs.get(5).getMessage().getMessageId());
+
+ assertEquals(new Long(3L), msgs.get(6).getMessage().getMessageId());
+ assertEquals(new Long(4L), msgs.get(7).getMessage().getMessageId());
+ assertEquals(new Long(9L), msgs.get(8).getMessage().getMessageId());
+ }
+
+ protected AMQMessage createMessage(Long id, byte i) throws AMQException
+ {
+ AMQMessage msg = super.createMessage(id);
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+ props.setPriority(i);
+ msg.getContentHeaderBody().properties = props;
+ return msg;
+ }
+
+ protected AMQMessage createMessage(Long id) throws AMQException
+ {
+ return createMessage(id, (byte) 0);
+ }
+
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 3aa6e5a36c..e5a287c037 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -23,29 +23,37 @@ package org.apache.qpid.server.queue;
import java.util.List;
+import junit.framework.TestCase;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.exchange.DirectExchange;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.subscription.MockSubscription;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionImpl;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-
-import junit.framework.TestCase;
public class SimpleAMQQueueTest extends TestCase
{
- private SimpleAMQQueue _queue;
- private VirtualHost _virtualHost;
- private MessageStore store = new TestableMemoryMessageStore();
- private TransactionalContext ctx = new NonTransactionalContext(store, new StoreContext(), null, null);
- private MessageHandleFactory factory = new MessageHandleFactory();
-
+ protected SimpleAMQQueue queue;
+ protected VirtualHost virtualHost;
+ protected MessageStore store = new TestableMemoryMessageStore();
+ protected AMQShortString qname = new AMQShortString("qname");
+ protected AMQShortString owner = new AMQShortString("owner");
+ protected AMQShortString routingKey = new AMQShortString("routing key");
+ protected DirectExchange exchange = new DirectExchange();
+ protected MockSubscription subscription = new MockSubscription();
+ protected FieldTable arguments = null;
+
MessagePublishInfo info = new MessagePublishInfo()
{
@@ -74,7 +82,7 @@ public class SimpleAMQQueueTest extends TestCase
return null;
}
};
-
+
@Override
protected void setUp() throws Exception
{
@@ -82,30 +90,198 @@ public class SimpleAMQQueueTest extends TestCase
//Create Application Registry for test
ApplicationRegistry applicationRegistry = (ApplicationRegistry)ApplicationRegistry.getInstance(1);
- AMQShortString qname = new AMQShortString("qname");
- AMQShortString owner = new AMQShortString("owner");
- _virtualHost = new VirtualHost("vhost", store);
- _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(qname, false, owner, false, _virtualHost, null);
-
- applicationRegistry .getVirtualHostRegistry().registerVirtualHost(_virtualHost);
+ virtualHost = new VirtualHost("vhost", store);
+ applicationRegistry.getVirtualHostRegistry().registerVirtualHost(virtualHost);
+
+ queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(qname, false, owner, false, virtualHost, arguments);
}
@Override
protected void tearDown()
{
+ queue.stop();
ApplicationRegistry.remove(1);
}
+ public void testCreateQueue() throws AMQException
+ {
+ queue.stop();
+ try {
+ queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(null, false, owner, false, virtualHost, arguments );
+ assertNull("Queue was created", queue);
+ }
+ catch (IllegalArgumentException e)
+ {
+ assertTrue("Exception was not about missing name",
+ e.getMessage().contains("name"));
+ }
+
+ try {
+ queue = new SimpleAMQQueue(qname, false, owner, false, null);
+ assertNull("Queue was created", queue);
+ }
+ catch (IllegalArgumentException e)
+ {
+ assertTrue("Exception was not about missing vhost",
+ e.getMessage().contains("Host"));
+ }
+
+ queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(qname, false, owner, false,
+ virtualHost, arguments);
+ assertNotNull("Queue was not created", queue);
+ }
+
+ public void testGetVirtualHost()
+ {
+ assertEquals("Virtual host was wrong", virtualHost, queue.getVirtualHost());
+ }
+
+ public void testBinding()
+ {
+ try
+ {
+ queue.bind(exchange, routingKey, null);
+ assertTrue("Routing key was not bound",
+ exchange.getBindings().containsKey(routingKey));
+ assertEquals("Queue was not bound to key",
+ exchange.getBindings().get(routingKey).get(0),
+ queue);
+ assertEquals("Exchange binding count", 1,
+ queue.getExchangeBindings().size());
+ assertEquals("Wrong exchange bound", routingKey,
+ queue.getExchangeBindings().get(0).getRoutingKey());
+ assertEquals("Wrong exchange bound", exchange,
+ queue.getExchangeBindings().get(0).getExchange());
+
+ queue.unBind(exchange, routingKey, null);
+ assertFalse("Routing key was still bound",
+ exchange.getBindings().containsKey(routingKey));
+ assertNull("Routing key was not empty",
+ exchange.getBindings().get(routingKey));
+ }
+ catch (AMQException e)
+ {
+ assertNull("Unexpected exception", e);
+ }
+ }
+
+ public void testSubscription() throws AMQException
+ {
+ // Check adding a subscription adds it to the queue
+ queue.registerSubscription(subscription, false);
+ assertEquals("Subscription did not get queue", queue,
+ subscription.getQueue());
+ assertEquals("Queue does not have consumer", 1,
+ queue.getConsumerCount());
+ assertEquals("Queue does not have active consumer", 1,
+ queue.getActiveConsumerCount());
+
+ // Check sending a message ends up with the subscriber
+ AMQMessage messageA = createMessage(new Long(24));
+ queue.enqueue(null, messageA);
+ assertEquals(messageA, subscription.getLastSeenEntry().getMessage());
+
+ // Check removing the subscription removes it's information from the queue
+ queue.unregisterSubscription(subscription);
+ assertTrue("Subscription still had queue", subscription.isClosed());
+ assertFalse("Queue still has consumer", 1 == queue.getConsumerCount());
+ assertFalse("Queue still has active consumer",
+ 1 == queue.getActiveConsumerCount());
+
+ AMQMessage messageB = createMessage(new Long (25));
+ queue.enqueue(null, messageB);
+ QueueEntry entry = subscription.getLastSeenEntry();
+ assertNull(entry);
+ }
+
+ public void testQueueNoSubscriber() throws AMQException, InterruptedException
+ {
+ AMQMessage messageA = createMessage(new Long(24));
+ queue.enqueue(null, messageA);
+ queue.registerSubscription(subscription, false);
+ Thread.sleep(150);
+ assertEquals(messageA, subscription.getLastSeenEntry().getMessage());
+ }
+
+ public void testExclusiveConsumer() throws AMQException
+ {
+ // Check adding an exclusive subscription adds it to the queue
+ queue.registerSubscription(subscription, true);
+ assertEquals("Subscription did not get queue", queue,
+ subscription.getQueue());
+ assertEquals("Queue does not have consumer", 1,
+ queue.getConsumerCount());
+ assertEquals("Queue does not have active consumer", 1,
+ queue.getActiveConsumerCount());
+
+ // Check sending a message ends up with the subscriber
+ AMQMessage messageA = createMessage(new Long(24));
+ queue.enqueue(null, messageA);
+ assertEquals(messageA, subscription.getLastSeenEntry().getMessage());
+
+ // Check we cannot add a second subscriber to the queue
+ Subscription subB = new MockSubscription();
+ Exception ex = null;
+ try
+ {
+ queue.registerSubscription(subB, false);
+ }
+ catch (AMQException e)
+ {
+ ex = e;
+ }
+ assertNotNull(ex);
+ assertTrue(ex instanceof AMQException);
+
+ // Check we cannot add an exclusive subscriber to a queue with an
+ // existing subscription
+ queue.unregisterSubscription(subscription);
+ queue.registerSubscription(subscription, false);
+ try
+ {
+ queue.registerSubscription(subB, true);
+ }
+ catch (AMQException e)
+ {
+ ex = e;
+ }
+ assertNotNull(ex);
+ }
+
+ public void testAutoDeleteQueue() throws Exception
+ {
+ queue.stop();
+ queue = new SimpleAMQQueue(qname, false, owner, true, virtualHost);
+ queue.registerSubscription(subscription, false);
+ AMQMessage message = createMessage(new Long(25));
+ queue.enqueue(null, message);
+ queue.unregisterSubscription(subscription);
+ assertTrue("Queue was not deleted when subscription was removed",
+ queue.isDeleted());
+ }
+
+ public void testResend() throws Exception
+ {
+ queue.registerSubscription(subscription, false);
+ Long id = new Long(26);
+ AMQMessage message = createMessage(id);
+ queue.enqueue(null, message);
+ QueueEntry entry = subscription.getLastSeenEntry();
+ entry.setRedelivered(true);
+ queue.resend(entry, subscription);
+
+ }
+
public void testGetFirstMessageId() throws Exception
{
// Create message
Long messageId = new Long(23);
- AMQMessage message = new TestMessage(messageId, messageId, info, new StoreContext());
+ AMQMessage message = createMessage(messageId);
// Put message on queue
- _queue.enqueue(null, message);
+ queue.enqueue(null, message);
// Get message id
- Long testmsgid = _queue.getMessagesOnTheQueue(1).get(0);
+ Long testmsgid = queue.getMessagesOnTheQueue(1).get(0);
// Check message id
assertEquals("Message ID was wrong", messageId, testmsgid);
@@ -117,12 +293,12 @@ public class SimpleAMQQueueTest extends TestCase
{
// Create message
Long messageId = new Long(i);
- AMQMessage message = new TestMessage(messageId, messageId, info, new StoreContext());
+ AMQMessage message = createMessage(messageId);
// Put message on queue
- _queue.enqueue(null, message);
+ queue.enqueue(null, message);
}
// Get message ids
- List<Long> msgids = _queue.getMessagesOnTheQueue(5);
+ List<Long> msgids = queue.getMessagesOnTheQueue(5);
// Check message id
for (int i = 0; i < 5; i++)
@@ -138,12 +314,12 @@ public class SimpleAMQQueueTest extends TestCase
{
// Create message
Long messageId = new Long(i);
- AMQMessage message = new TestMessage(messageId, messageId, info, new StoreContext());
+ AMQMessage message = createMessage(messageId);
// Put message on queue
- _queue.enqueue(null, message);
+ queue.enqueue(null, message);
}
// Get message ids
- List<Long> msgids = _queue.getMessagesOnTheQueue(5, 5);
+ List<Long> msgids = queue.getMessagesOnTheQueue(5, 5);
// Check message id
for (int i = 0; i < 5; i++)
@@ -210,4 +386,10 @@ public class SimpleAMQQueueTest extends TestCase
assertEquals("Wrong count for message with tag " + _tag, expected, _count);
}
}
+
+ protected AMQMessage createMessage(Long id) throws AMQException
+ {
+ AMQMessage messageA = new TestMessage(id, id, info, new StoreContext());
+ return messageA;
+ }
}