diff options
| author | Aidan Skinner <aidan@apache.org> | 2008-09-18 16:12:46 +0000 |
|---|---|---|
| committer | Aidan Skinner <aidan@apache.org> | 2008-09-18 16:12:46 +0000 |
| commit | 09f60acd6ba474bfeed068f10d966938f806ff77 (patch) | |
| tree | 6f3eaf7a84674f5f8bcb1c7b3211770d0b245907 /qpid/java | |
| parent | b208766dcaf114eac162d6f230fb05370b01e04b (diff) | |
| download | qpid-python-09f60acd6ba474bfeed068f10d966938f806ff77.tar.gz | |
QPID-1286: make sure priority queues don't mess with deleted subscriptions
AMQPriorityQueue: don't advance deleted subscriptions
AMQPriorityQueueTest: Add test class for priority queues
SimpleAMQQueueTest: Add more tests
PriorityTest: Check for more message orders
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@696686 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
4 files changed, 406 insertions, 76 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java index e14ed0f41d..d7aef34a10 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java @@ -52,8 +52,12 @@ public class AMQPriorityQueue extends SimpleAMQQueue while(subIter.advance() && !entry.isAcquired()) { final Subscription subscription = subIter.getNode().getSubscription(); + if (subIter.getNode().isDeleted()) + { + continue; + } QueueEntry subnode = subscription.getLastSeenEntry(); - while((entry.compareTo(subnode) < 0) && !entry.isAcquired()) + while(entry.compareTo(subnode) < 0 && !entry.isAcquired()) { if(subscription.setLastSeenEntry(subnode,entry)) { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java new file mode 100644 index 0000000000..e479a1e489 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java @@ -0,0 +1,90 @@ +package org.apache.qpid.server.queue; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import java.util.ArrayList; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.FieldTable; + +public class AMQPriorityQueueTest extends SimpleAMQQueueTest +{ + + @Override + protected void setUp() throws Exception + { + arguments = new FieldTable(); + arguments.put(AMQQueueFactory.X_QPID_PRIORITIES, 3); + super.setUp(); + } + + public void testPriorityOrdering() throws AMQException, InterruptedException + { + + // Enqueue messages in order + queue.enqueue(null, createMessage(1L, (byte) 10)); + queue.enqueue(null, createMessage(2L, (byte) 4)); + queue.enqueue(null, createMessage(3L, (byte) 0)); + + // Enqueue messages in reverse order + queue.enqueue(null, createMessage(4L, (byte) 0)); + queue.enqueue(null, createMessage(5L, (byte) 4)); + queue.enqueue(null, createMessage(6L, (byte) 10)); + + // Enqueue messages out of order + queue.enqueue(null, createMessage(7L, (byte) 4)); + queue.enqueue(null, createMessage(8L, (byte) 10)); + queue.enqueue(null, createMessage(9L, (byte) 0)); + + // Register subscriber + queue.registerSubscription(subscription, false); + Thread.sleep(150); + + ArrayList<QueueEntry> msgs = subscription.getMessages(); + assertEquals(new Long(1L), msgs.get(0).getMessage().getMessageId()); + assertEquals(new Long(6L), msgs.get(1).getMessage().getMessageId()); + assertEquals(new Long(8L), msgs.get(2).getMessage().getMessageId()); + + assertEquals(new Long(2L), msgs.get(3).getMessage().getMessageId()); + assertEquals(new Long(5L), msgs.get(4).getMessage().getMessageId()); + assertEquals(new Long(7L), msgs.get(5).getMessage().getMessageId()); + + assertEquals(new Long(3L), msgs.get(6).getMessage().getMessageId()); + assertEquals(new Long(4L), msgs.get(7).getMessage().getMessageId()); + assertEquals(new Long(9L), msgs.get(8).getMessage().getMessageId()); + } + + protected AMQMessage createMessage(Long id, byte i) throws AMQException + { + AMQMessage msg = super.createMessage(id); + BasicContentHeaderProperties props = new BasicContentHeaderProperties(); + props.setPriority(i); + msg.getContentHeaderBody().properties = props; + return msg; + } + + protected AMQMessage createMessage(Long id) throws AMQException + { + return createMessage(id, (byte) 0); + } + +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 3aa6e5a36c..e5a287c037 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -23,29 +23,37 @@ package org.apache.qpid.server.queue; import java.util.List; +import junit.framework.TestCase; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.exchange.DirectExchange; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.qpid.server.txn.NonTransactionalContext; -import org.apache.qpid.server.txn.TransactionalContext; +import org.apache.qpid.server.subscription.MockSubscription; +import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.subscription.SubscriptionImpl; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.registry.ApplicationRegistry; - -import junit.framework.TestCase; public class SimpleAMQQueueTest extends TestCase { - private SimpleAMQQueue _queue; - private VirtualHost _virtualHost; - private MessageStore store = new TestableMemoryMessageStore(); - private TransactionalContext ctx = new NonTransactionalContext(store, new StoreContext(), null, null); - private MessageHandleFactory factory = new MessageHandleFactory(); - + protected SimpleAMQQueue queue; + protected VirtualHost virtualHost; + protected MessageStore store = new TestableMemoryMessageStore(); + protected AMQShortString qname = new AMQShortString("qname"); + protected AMQShortString owner = new AMQShortString("owner"); + protected AMQShortString routingKey = new AMQShortString("routing key"); + protected DirectExchange exchange = new DirectExchange(); + protected MockSubscription subscription = new MockSubscription(); + protected FieldTable arguments = null; + MessagePublishInfo info = new MessagePublishInfo() { @@ -74,7 +82,7 @@ public class SimpleAMQQueueTest extends TestCase return null; } }; - + @Override protected void setUp() throws Exception { @@ -82,30 +90,198 @@ public class SimpleAMQQueueTest extends TestCase //Create Application Registry for test ApplicationRegistry applicationRegistry = (ApplicationRegistry)ApplicationRegistry.getInstance(1); - AMQShortString qname = new AMQShortString("qname"); - AMQShortString owner = new AMQShortString("owner"); - _virtualHost = new VirtualHost("vhost", store); - _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(qname, false, owner, false, _virtualHost, null); - - applicationRegistry .getVirtualHostRegistry().registerVirtualHost(_virtualHost); + virtualHost = new VirtualHost("vhost", store); + applicationRegistry.getVirtualHostRegistry().registerVirtualHost(virtualHost); + + queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(qname, false, owner, false, virtualHost, arguments); } @Override protected void tearDown() { + queue.stop(); ApplicationRegistry.remove(1); } + public void testCreateQueue() throws AMQException + { + queue.stop(); + try { + queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(null, false, owner, false, virtualHost, arguments ); + assertNull("Queue was created", queue); + } + catch (IllegalArgumentException e) + { + assertTrue("Exception was not about missing name", + e.getMessage().contains("name")); + } + + try { + queue = new SimpleAMQQueue(qname, false, owner, false, null); + assertNull("Queue was created", queue); + } + catch (IllegalArgumentException e) + { + assertTrue("Exception was not about missing vhost", + e.getMessage().contains("Host")); + } + + queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(qname, false, owner, false, + virtualHost, arguments); + assertNotNull("Queue was not created", queue); + } + + public void testGetVirtualHost() + { + assertEquals("Virtual host was wrong", virtualHost, queue.getVirtualHost()); + } + + public void testBinding() + { + try + { + queue.bind(exchange, routingKey, null); + assertTrue("Routing key was not bound", + exchange.getBindings().containsKey(routingKey)); + assertEquals("Queue was not bound to key", + exchange.getBindings().get(routingKey).get(0), + queue); + assertEquals("Exchange binding count", 1, + queue.getExchangeBindings().size()); + assertEquals("Wrong exchange bound", routingKey, + queue.getExchangeBindings().get(0).getRoutingKey()); + assertEquals("Wrong exchange bound", exchange, + queue.getExchangeBindings().get(0).getExchange()); + + queue.unBind(exchange, routingKey, null); + assertFalse("Routing key was still bound", + exchange.getBindings().containsKey(routingKey)); + assertNull("Routing key was not empty", + exchange.getBindings().get(routingKey)); + } + catch (AMQException e) + { + assertNull("Unexpected exception", e); + } + } + + public void testSubscription() throws AMQException + { + // Check adding a subscription adds it to the queue + queue.registerSubscription(subscription, false); + assertEquals("Subscription did not get queue", queue, + subscription.getQueue()); + assertEquals("Queue does not have consumer", 1, + queue.getConsumerCount()); + assertEquals("Queue does not have active consumer", 1, + queue.getActiveConsumerCount()); + + // Check sending a message ends up with the subscriber + AMQMessage messageA = createMessage(new Long(24)); + queue.enqueue(null, messageA); + assertEquals(messageA, subscription.getLastSeenEntry().getMessage()); + + // Check removing the subscription removes it's information from the queue + queue.unregisterSubscription(subscription); + assertTrue("Subscription still had queue", subscription.isClosed()); + assertFalse("Queue still has consumer", 1 == queue.getConsumerCount()); + assertFalse("Queue still has active consumer", + 1 == queue.getActiveConsumerCount()); + + AMQMessage messageB = createMessage(new Long (25)); + queue.enqueue(null, messageB); + QueueEntry entry = subscription.getLastSeenEntry(); + assertNull(entry); + } + + public void testQueueNoSubscriber() throws AMQException, InterruptedException + { + AMQMessage messageA = createMessage(new Long(24)); + queue.enqueue(null, messageA); + queue.registerSubscription(subscription, false); + Thread.sleep(150); + assertEquals(messageA, subscription.getLastSeenEntry().getMessage()); + } + + public void testExclusiveConsumer() throws AMQException + { + // Check adding an exclusive subscription adds it to the queue + queue.registerSubscription(subscription, true); + assertEquals("Subscription did not get queue", queue, + subscription.getQueue()); + assertEquals("Queue does not have consumer", 1, + queue.getConsumerCount()); + assertEquals("Queue does not have active consumer", 1, + queue.getActiveConsumerCount()); + + // Check sending a message ends up with the subscriber + AMQMessage messageA = createMessage(new Long(24)); + queue.enqueue(null, messageA); + assertEquals(messageA, subscription.getLastSeenEntry().getMessage()); + + // Check we cannot add a second subscriber to the queue + Subscription subB = new MockSubscription(); + Exception ex = null; + try + { + queue.registerSubscription(subB, false); + } + catch (AMQException e) + { + ex = e; + } + assertNotNull(ex); + assertTrue(ex instanceof AMQException); + + // Check we cannot add an exclusive subscriber to a queue with an + // existing subscription + queue.unregisterSubscription(subscription); + queue.registerSubscription(subscription, false); + try + { + queue.registerSubscription(subB, true); + } + catch (AMQException e) + { + ex = e; + } + assertNotNull(ex); + } + + public void testAutoDeleteQueue() throws Exception + { + queue.stop(); + queue = new SimpleAMQQueue(qname, false, owner, true, virtualHost); + queue.registerSubscription(subscription, false); + AMQMessage message = createMessage(new Long(25)); + queue.enqueue(null, message); + queue.unregisterSubscription(subscription); + assertTrue("Queue was not deleted when subscription was removed", + queue.isDeleted()); + } + + public void testResend() throws Exception + { + queue.registerSubscription(subscription, false); + Long id = new Long(26); + AMQMessage message = createMessage(id); + queue.enqueue(null, message); + QueueEntry entry = subscription.getLastSeenEntry(); + entry.setRedelivered(true); + queue.resend(entry, subscription); + + } + public void testGetFirstMessageId() throws Exception { // Create message Long messageId = new Long(23); - AMQMessage message = new TestMessage(messageId, messageId, info, new StoreContext()); + AMQMessage message = createMessage(messageId); // Put message on queue - _queue.enqueue(null, message); + queue.enqueue(null, message); // Get message id - Long testmsgid = _queue.getMessagesOnTheQueue(1).get(0); + Long testmsgid = queue.getMessagesOnTheQueue(1).get(0); // Check message id assertEquals("Message ID was wrong", messageId, testmsgid); @@ -117,12 +293,12 @@ public class SimpleAMQQueueTest extends TestCase { // Create message Long messageId = new Long(i); - AMQMessage message = new TestMessage(messageId, messageId, info, new StoreContext()); + AMQMessage message = createMessage(messageId); // Put message on queue - _queue.enqueue(null, message); + queue.enqueue(null, message); } // Get message ids - List<Long> msgids = _queue.getMessagesOnTheQueue(5); + List<Long> msgids = queue.getMessagesOnTheQueue(5); // Check message id for (int i = 0; i < 5; i++) @@ -138,12 +314,12 @@ public class SimpleAMQQueueTest extends TestCase { // Create message Long messageId = new Long(i); - AMQMessage message = new TestMessage(messageId, messageId, info, new StoreContext()); + AMQMessage message = createMessage(messageId); // Put message on queue - _queue.enqueue(null, message); + queue.enqueue(null, message); } // Get message ids - List<Long> msgids = _queue.getMessagesOnTheQueue(5, 5); + List<Long> msgids = queue.getMessagesOnTheQueue(5, 5); // Check message id for (int i = 0; i < 5; i++) @@ -210,4 +386,10 @@ public class SimpleAMQQueueTest extends TestCase assertEquals("Wrong count for message with tag " + _tag, expected, _count); } } + + protected AMQMessage createMessage(Long id) throws AMQException + { + AMQMessage messageA = new TestMessage(id, id, info, new StoreContext()); + return messageA; + } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java index 0dbf95052f..b8e5980f26 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java @@ -52,9 +52,19 @@ public class PriorityTest extends TestCase protected final String VHOST = "/test"; protected final String QUEUE = "PriorityQueue"; - private static final int MSG_COUNT = 50; + private Context context = null; + private Connection producerConnection; + private MessageProducer producer; + private Session producerSession; + private Queue queue; + private Connection consumerConnection; + private Session consumerSession; + + + private MessageConsumer consumer; + protected void setUp() throws Exception { super.setUp(); @@ -64,7 +74,21 @@ public class PriorityTest extends TestCase TransportConnection.createVMBroker(1); } + InitialContextFactory factory = new PropertiesFileInitialContextFactory(); + Hashtable<String, String> env = new Hashtable<String, String>(); + + env.put("connectionfactory.connection", "amqp://guest:guest@PRIORITY_TEST_ID" + VHOST + "?brokerlist='" + BROKER + "'"); + env.put("queue.queue", QUEUE); + + context = factory.getInitialContext(env); + producerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection(); + producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + producerConnection.start(); + + consumerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection(); + consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } private boolean usingInVMBroker() @@ -74,6 +98,8 @@ public class PriorityTest extends TestCase protected void tearDown() throws Exception { + producerConnection.close(); + consumerConnection.close(); if (usingInVMBroker()) { TransportConnection.killAllVMBrokers(); @@ -83,65 +109,25 @@ public class PriorityTest extends TestCase public void testPriority() throws JMSException, NamingException, AMQException { - InitialContextFactory factory = new PropertiesFileInitialContextFactory(); - - Hashtable<String, String> env = new Hashtable<String, String>(); - - env.put("connectionfactory.connection", "amqp://guest:guest@PRIORITY_TEST_ID" + VHOST + "?brokerlist='" + BROKER + "'"); - env.put("queue.queue", QUEUE); - - Context context = factory.getInitialContext(env); - - Connection producerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection(); - - Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final Map<String,Object> arguments = new HashMap<String, Object>(); arguments.put("x-qpid-priorities",10); - ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); - - Queue queue = new AMQQueue("amq.direct",QUEUE); - + queue = new AMQQueue("amq.direct",QUEUE); ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); - - - - - - - producerConnection.start(); - - - MessageProducer producer = producerSession.createProducer(queue); - - - - + producer = producerSession.createProducer(queue); for (int msg = 0; msg < MSG_COUNT; msg++) { producer.setPriority(msg % 10); producer.send(nextMessage(msg, false, producerSession, producer)); } - producer.close(); producerSession.close(); producerConnection.close(); - - Connection consumerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection(); - Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = consumerSession.createConsumer(queue); - - - - + consumer = consumerSession.createConsumer(queue); consumerConnection.start(); - Message received; - //Receive Message 0 - StringBuilder buf = new StringBuilder(); int receivedCount = 0; Message previous = null; int messageCount = 0; @@ -158,10 +144,78 @@ public class PriorityTest extends TestCase } assertEquals("Incorrect number of message received", 50, receivedCount); - - producerSession.close(); - producer.close(); - + } + + public void testOddOrdering() throws AMQException, JMSException + { + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("x-qpid-priorities",3); + ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); + queue = new AMQQueue("amq.direct",QUEUE); + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + producer = producerSession.createProducer(queue); + + // In order ABC + producer.setPriority(9); + producer.send(nextMessage(1, false, producerSession, producer)); + producer.setPriority(4); + producer.send(nextMessage(2, false, producerSession, producer)); + producer.setPriority(1); + producer.send(nextMessage(3, false, producerSession, producer)); + + // Out of order BAC + producer.setPriority(4); + producer.send(nextMessage(4, false, producerSession, producer)); + producer.setPriority(9); + producer.send(nextMessage(5, false, producerSession, producer)); + producer.setPriority(1); + producer.send(nextMessage(6, false, producerSession, producer)); + + // Out of order BCA + producer.setPriority(4); + producer.send(nextMessage(7, false, producerSession, producer)); + producer.setPriority(1); + producer.send(nextMessage(8, false, producerSession, producer)); + producer.setPriority(9); + producer.send(nextMessage(9, false, producerSession, producer)); + + // Reverse order CBA + producer.setPriority(1); + producer.send(nextMessage(10, false, producerSession, producer)); + producer.setPriority(4); + producer.send(nextMessage(11, false, producerSession, producer)); + producer.setPriority(9); + producer.send(nextMessage(12, false, producerSession, producer)); + + consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + Message msg = consumer.receive(500); + assertEquals(1, msg.getIntProperty("msg")); + msg = consumer.receive(500); + assertEquals(5, msg.getIntProperty("msg")); + msg = consumer.receive(500); + assertEquals(9, msg.getIntProperty("msg")); + msg = consumer.receive(500); + assertEquals(12, msg.getIntProperty("msg")); + + msg = consumer.receive(500); + assertEquals(2, msg.getIntProperty("msg")); + msg = consumer.receive(500); + assertEquals(4, msg.getIntProperty("msg")); + msg = consumer.receive(500); + assertEquals(7, msg.getIntProperty("msg")); + msg = consumer.receive(500); + assertEquals(11, msg.getIntProperty("msg")); + + msg = consumer.receive(500); + assertEquals(3, msg.getIntProperty("msg")); + msg = consumer.receive(500); + assertEquals(6, msg.getIntProperty("msg")); + msg = consumer.receive(500); + assertEquals(8, msg.getIntProperty("msg")); + msg = consumer.receive(500); + assertEquals(10, msg.getIntProperty("msg")); } private Message nextMessage(int msg, boolean first, Session producerSession, MessageProducer producer) throws JMSException |
