diff options
| author | Robert Gemmell <robbie@apache.org> | 2011-06-07 11:18:41 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2011-06-07 11:18:41 +0000 |
| commit | 67788db66c38665e454ecbf977e1d376bf7eea97 (patch) | |
| tree | fb6ee01c54b5350abc109315ae03b6e254cc22b1 /qpid/java/broker/src/test | |
| parent | 174789ad53876088c38ea365a65ae54ad2c48867 (diff) | |
| download | qpid-python-67788db66c38665e454ecbf977e1d376bf7eea97.tar.gz | |
QPID-3219: update handling of QueueEntries to exclude use of entries in the intermediate 'dequeued' state, simplify logic in general.
Applied patch from Oleksandr Rudyy <orudyy@gmail.com>
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1132959 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src/test')
5 files changed, 821 insertions, 1 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 9e831b2a8e..ae58a52046 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -482,6 +482,18 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase { return 0; //To change body of implemented methods use File | Settings | File Templates. } + + @Override + public boolean isDequeued() + { + return false; + } + + @Override + public boolean isDispensed() + { + return false; + } }; if(action != null) diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java index 5bdbe2c68e..0ba8eb4792 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java @@ -231,4 +231,16 @@ public class MockQueueEntry implements QueueEntry _message = msg; } + @Override + public boolean isDequeued() + { + return false; + } + + @Override + public boolean isDispensed() + { + return false; + } + } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java new file mode 100644 index 0000000000..0899f25cc5 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.queue; + +import java.lang.reflect.Field; + +import junit.framework.TestCase; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.server.queue.QueueEntry.EntryState; +import org.apache.qpid.server.subscription.MockSubscription; + +/** + * Tests for {@link QueueEntryImpl} + * + */ +public class QueueEntryImplTest extends TestCase +{ + // tested entry + private QueueEntryImpl _queueEntry; + + public void setUp() throws Exception + { + AMQMessage message = new MockAMQMessage(1); + SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test")); + _queueEntry = new QueueEntryImpl(queueEntryList, message, 1); + } + + public void testAquire() + { + assertTrue("Queue entry should be in AVAILABLE state before invoking of acquire method", + _queueEntry.isAvailable()); + acquire(); + } + + public void testDequeue() + { + dequeue(); + } + + public void testDelete() + { + delete(); + } + + /** + * Tests release method for entry in acquired state. + * <p> + * Entry in state ACQUIRED should be released and its status should be + * changed to AVAILABLE. + */ + public void testReleaseAquired() + { + acquire(); + _queueEntry.release(); + assertTrue("Queue entry should be in AVAILABLE state after invoking of release method", + _queueEntry.isAvailable()); + } + + /** + * Tests release method for entry in dequeued state. + * <p> + * Invoking release on dequeued entry should not have any effect on its + * state. + */ + public void testReleaseDequeued() + { + dequeue(); + _queueEntry.release(); + EntryState state = getState(); + assertEquals("Invoking of release on entry in DEQUEUED state should not have any effect", + QueueEntry.DEQUEUED_STATE, state); + } + + /** + * Tests release method for entry in deleted state. + * <p> + * Invoking release on deleted entry should not have any effect on its + * state. + */ + public void testReleaseDeleted() + { + delete(); + _queueEntry.release(); + assertTrue("Invoking of release on entry in DELETED state should not have any effect", + _queueEntry.isDeleted()); + } + + /** + * Tests if entries in DEQUQUED or DELETED state are not returned by getNext method. + */ + public void testGetNext() + { + int numberOfEntries = 5; + QueueEntryImpl[] entries = new QueueEntryImpl[numberOfEntries]; + SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test")); + + // create test entries + for(int i = 0; i < numberOfEntries ; i++) + { + AMQMessage message = null;; + try + { + message = new MockAMQMessage(i); + } + catch (AMQException e) + { + fail("Failure to create a mock message:" + e.getMessage()); + } + QueueEntryImpl entry = (QueueEntryImpl)queueEntryList.add(message); + entries[i] = entry; + } + + // test getNext for not acquired entries + for(int i = 0; i < numberOfEntries ; i++) + { + QueueEntryImpl queueEntry = entries[i]; + QueueEntryImpl next = queueEntry.getNext(); + if (i < numberOfEntries - 1) + { + assertEquals("Unexpected entry from QueueEntryImpl#getNext()", entries[i + 1], next); + } + else + { + assertNull("The next entry after the last should be null", next); + } + } + + // delete second + entries[1].acquire(); + entries[1].delete(); + + // dequeue third + entries[2].acquire(); + entries[2].dequeue(); + + QueueEntryImpl next = entries[0].getNext(); + assertEquals("expected forth entry",entries[3], next); + next = next.getNext(); + assertEquals("expected fifth entry", entries[4], next); + next = next.getNext(); + assertNull("The next entry after the last should be null", next); + } + /** + * A helper method to put tested object into deleted state and assert the state + */ + private void delete() + { + _queueEntry.delete(); + assertTrue("Queue entry should be in DELETED state after invoking of delete method", + _queueEntry.isDeleted()); + } + + /** + * A helper method to put tested entry into dequeue state and assert the sate + */ + private void dequeue() + { + acquire(); + _queueEntry.dequeue(); + EntryState state = getState(); + assertEquals("Queue entry should be in DEQUEUED state after invoking of dequeue method", + QueueEntry.DEQUEUED_STATE, state); + } + + /** + * A helper method to put tested entry into acquired state and assert the sate + */ + private void acquire() + { + _queueEntry.acquire(new MockSubscription()); + assertTrue("Queue entry should be in ACQUIRED state after invoking of acquire method", + _queueEntry.isAcquired()); + } + + /** + * A helper method to get entry state + * + * @return entry state + */ + private EntryState getState() + { + EntryState state = null; + try + { + Field f = QueueEntryImpl.class.getDeclaredField("_state"); + f.setAccessible(true); + state = (EntryState) f.get(_queueEntry); + } + catch (Exception e) + { + fail("Failure to get a state field: " + e.getMessage()); + } + return state; + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index abe2d1728f..4c21f38363 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -36,13 +36,16 @@ import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.message.MessageMetaData; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue.PostEnqueueAction; +import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.subscription.MockSubscription; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.AutoCommitTransaction; +import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.InternalBrokerBaseCase; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -51,6 +54,8 @@ import org.apache.qpid.server.virtualhost.VirtualHostImpl; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; public class SimpleAMQQueueTest extends InternalBrokerBaseCase { @@ -735,6 +740,533 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase verifyRecievedMessages(msgListSub3, sub3.getMessages()); } + /** + * Tests that dequeued message is not present in the list returned form + * {@link SimpleAMQQueue#getMessagesOnTheQueue()} + */ + public void testGetMessagesOnTheQueueWithDequeuedEntry() + { + int messageNumber = 4; + int dequeueMessageIndex = 1; + + // send test messages into a test queue + enqueueGivenNumberOfMessages(_queue, messageNumber); + + // dequeue message + dequeueMessage(_queue, dequeueMessageIndex); + + // get messages on the queue + List<QueueEntry> entries = _queue.getMessagesOnTheQueue(); + + // assert queue entries + assertEquals(messageNumber - 1, entries.size()); + int expectedId = 0; + for (int i = 0; i < messageNumber - 1; i++) + { + Long id = ((AMQMessage) entries.get(i).getMessage()).getMessageId(); + if (i == dequeueMessageIndex) + { + assertFalse("Message with id " + dequeueMessageIndex + + " was dequeued and should not be returned by method getMessagesOnTheQueue!", + new Long(expectedId).equals(id)); + expectedId++; + } + assertEquals("Expected message with id " + expectedId + " but got message with id " + id, + new Long(expectedId), id); + expectedId++; + } + } + + /** + * Tests that dequeued message is not present in the list returned form + * {@link SimpleAMQQueue#getMessagesOnTheQueue(QueueEntryFilter)} + */ + public void testGetMessagesOnTheQueueByQueueEntryFilterWithDequeuedEntry() + { + int messageNumber = 4; + int dequeueMessageIndex = 1; + + // send test messages into a test queue + enqueueGivenNumberOfMessages(_queue, messageNumber); + + // dequeue message + dequeueMessage(_queue, dequeueMessageIndex); + + // get messages on the queue with filter accepting all available messages + List<QueueEntry> entries = _queue.getMessagesOnTheQueue(new QueueEntryFilter() + { + + @Override + public boolean accept(QueueEntry entry) + { + return true; + } + + @Override + public boolean filterComplete() + { + return false; + } + }); + + // assert entries on the queue + assertEquals(messageNumber - 1, entries.size()); + int expectedId = 0; + for (int i = 0; i < messageNumber - 1; i++) + { + Long id = ((AMQMessage) entries.get(i).getMessage()).getMessageId(); + if (i == dequeueMessageIndex) + { + assertFalse("Message with id " + dequeueMessageIndex + + " was dequeued and should not be returned by method getMessagesOnTheQueue!", + new Long(expectedId).equals(id)); + expectedId++; + } + assertEquals("Expected message with id " + expectedId + " but got message with id " + id, + new Long(expectedId), id); + expectedId++; + } + } + + /** + * Tests that dequeued message is not copied as part of invocation of + * {@link SimpleAMQQueue#copyMessagesToAnotherQueue(long, long, String, StoreContext)} + */ + public void testCopyMessagesWithDequeuedEntry() + { + int messageNumber = 4; + int dequeueMessageIndex = 1; + String anotherQueueName = "testQueue2"; + + // put test messages into a test queue + enqueueGivenNumberOfMessages(_queue, messageNumber); + + // dequeue message + dequeueMessage(_queue, dequeueMessageIndex); + + // create another queue + SimpleAMQQueue queue = createQueue(anotherQueueName); + + // create transaction + ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog()); + + // copy messages into another queue + _queue.copyMessagesToAnotherQueue(0, messageNumber, anotherQueueName, txn); + + // commit transaction + txn.commit(); + + // get messages on another queue + List<QueueEntry> entries = queue.getMessagesOnTheQueue(); + + // assert another queue entries + assertEquals(messageNumber - 1, entries.size()); + int expectedId = 0; + for (int i = 0; i < messageNumber - 1; i++) + { + Long id = ((AMQMessage)entries.get(i).getMessage()).getMessageId(); + if (i == dequeueMessageIndex) + { + assertFalse("Message with id " + dequeueMessageIndex + + " was dequeued and should not been copied into another queue!", + new Long(expectedId).equals(id)); + expectedId++; + } + assertEquals("Expected message with id " + expectedId + " but got message with id " + id, + new Long(expectedId), id); + expectedId++; + } + } + + /** + * Tests that dequeued message is not moved as part of invocation of + * {@link SimpleAMQQueue#moveMessagesToAnotherQueue(long, long, String, StoreContext)} + */ + public void testMovedMessagesWithDequeuedEntry() + { + int messageNumber = 4; + int dequeueMessageIndex = 1; + String anotherQueueName = "testQueue2"; + + // put messages into a test queue + enqueueGivenNumberOfMessages(_queue, messageNumber); + + // dequeue message + dequeueMessage(_queue, dequeueMessageIndex); + + // create another queue + SimpleAMQQueue queue = createQueue(anotherQueueName); + + // create transaction + ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog()); + + // move messages into another queue + _queue.moveMessagesToAnotherQueue(0, messageNumber, anotherQueueName, txn); + + // commit transaction + txn.commit(); + + // get messages on another queue + List<QueueEntry> entries = queue.getMessagesOnTheQueue(); + + // assert another queue entries + assertEquals(messageNumber - 1, entries.size()); + int expectedId = 0; + for (int i = 0; i < messageNumber - 1; i++) + { + Long id = ((AMQMessage)entries.get(i).getMessage()).getMessageId(); + if (i == dequeueMessageIndex) + { + assertFalse("Message with id " + dequeueMessageIndex + + " was dequeued and should not been copied into another queue!", + new Long(expectedId).equals(id)); + expectedId++; + } + assertEquals("Expected message with id " + expectedId + " but got message with id " + id, + new Long(expectedId), id); + expectedId++; + } + } + + /** + * Tests that messages in given range including dequeued one are deleted + * from the queue on invocation of + * {@link SimpleAMQQueue#removeMessagesFromQueue(long, long, StoreContext)} + */ + public void testRemoveMessagesFromQueueWithDequeuedEntry() + { + int messageNumber = 4; + int dequeueMessageIndex = 1; + + // put messages into a test queue + enqueueGivenNumberOfMessages(_queue, messageNumber); + + // dequeue message + dequeueMessage(_queue, dequeueMessageIndex); + + // remove messages + _queue.removeMessagesFromQueue(0, messageNumber); + + // get queue entries + List<QueueEntry> entries = _queue.getMessagesOnTheQueue(); + + // assert queue entries + assertNotNull("Null is returned from getMessagesOnTheQueue", entries); + assertEquals("Queue should be empty", 0, entries.size()); + } + + /** + * Tests that dequeued message on the top is not accounted and next message + * is deleted from the queue on invocation of + * {@link SimpleAMQQueue#deleteMessageFromTop(StoreContext)} + */ + public void testDeleteMessageFromTopWithDequeuedEntryOnTop() + { + int messageNumber = 4; + int dequeueMessageIndex = 0; + + // put messages into a test queue + enqueueGivenNumberOfMessages(_queue, messageNumber); + + // dequeue message on top + dequeueMessage(_queue, dequeueMessageIndex); + + //delete message from top + _queue.deleteMessageFromTop(); + + //get queue netries + List<QueueEntry> entries = _queue.getMessagesOnTheQueue(); + + // assert queue entries + assertNotNull("Null is returned from getMessagesOnTheQueue", entries); + assertEquals("Expected " + (messageNumber - 2) + " number of messages but recieved " + entries.size(), + messageNumber - 2, entries.size()); + assertEquals("Expected first entry with id 2", new Long(2), + ((AMQMessage) entries.get(0).getMessage()).getMessageId()); + } + + /** + * Tests that all messages including dequeued one are deleted from the queue + * on invocation of {@link SimpleAMQQueue#clearQueue(StoreContext)} + */ + public void testClearQueueWithDequeuedEntry() + { + int messageNumber = 4; + int dequeueMessageIndex = 1; + + // put messages into a test queue + enqueueGivenNumberOfMessages(_queue, messageNumber); + + // dequeue message on a test queue + dequeueMessage(_queue, dequeueMessageIndex); + + // clean queue + try + { + _queue.clearQueue(); + } + catch (AMQException e) + { + fail("Failure to clear queue:" + e.getMessage()); + } + + // get queue entries + List<QueueEntry> entries = _queue.getMessagesOnTheQueue(); + + // assert queue entries + assertNotNull(entries); + assertEquals(0, entries.size()); + } + + /** + * Tests whether dequeued entry is sent to subscriber in result of + * invocation of {@link SimpleAMQQueue#processQueue(QueueRunner)} + */ + public void testProcessQueueWithDequeuedEntry() + { + // total number of messages to send + int messageNumber = 4; + int dequeueMessageIndex = 1; + + // create queue with overridden method deliverAsync + SimpleAMQQueue testQueue = new SimpleAMQQueue(new AMQShortString("test"), false, + new AMQShortString("testOwner"), false, false, _virtualHost, null) + { + @Override + public void deliverAsync(Subscription sub) + { + // do nothing + } + }; + + // put messages + List<QueueEntry> entries = enqueueGivenNumberOfMessages(testQueue, messageNumber); + + // dequeue message + dequeueMessage(testQueue, dequeueMessageIndex); + + // latch to wait for message receipt + final CountDownLatch latch = new CountDownLatch(messageNumber -1); + + // create a subscription + MockSubscription subscription = new MockSubscription() + { + /** + * Send a message and decrement latch + */ + public void send(QueueEntry msg) throws AMQException + { + super.send(msg); + latch.countDown(); + } + }; + + try + { + // subscribe + testQueue.registerSubscription(subscription, false); + + // process queue + testQueue.processQueue(new QueueRunner(testQueue, 1) + { + public void run() + { + // do nothing + } + }); + } + catch (AMQException e) + { + fail("Failure to process queue:" + e.getMessage()); + } + // wait up to 1 minute for message receipt + try + { + latch.await(1, TimeUnit.MINUTES); + } + catch (InterruptedException e1) + { + Thread.currentThread().interrupt(); + } + List<QueueEntry> expected = createEntriesList(entries.get(0), entries.get(2), entries.get(3)); + verifyRecievedMessages(expected, subscription.getMessages()); + } + + /** + * Tests that entry in dequeued state are not enqueued and not delivered to subscription + */ + public void testEqueueDequeuedEntry() + { + // create a queue where each even entry is considered a dequeued + SimpleAMQQueue queue = new SimpleAMQQueue(new AMQShortString("test"), false, new AMQShortString("testOwner"), + false, false, _virtualHost, new QueueEntryListFactory() + { + public QueueEntryList createQueueEntryList(AMQQueue queue) + { + /** + * Override SimpleQueueEntryList to create a dequeued + * entries for messages with even id + */ + return new SimpleQueueEntryList(queue) + { + /** + * Entries with even message id are considered + * dequeued! + */ + protected QueueEntryImpl createQueueEntry(final ServerMessage message) + { + return new QueueEntryImpl(this, message) + { + public boolean isDequeued() + { + return (((AMQMessage) message).getMessageId().longValue() % 2 == 0); + } + + public boolean isDispensed() + { + return (((AMQMessage) message).getMessageId().longValue() % 2 == 0); + } + + public boolean isAvailable() + { + return !(((AMQMessage) message).getMessageId().longValue() % 2 == 0); + } + }; + } + }; + } + }, null); + // create a subscription + MockSubscription subscription = new MockSubscription(); + + // register subscription + try + { + queue.registerSubscription(subscription, false); + } + catch (AMQException e) + { + fail("Failure to register subscription:" + e.getMessage()); + } + + // put test messages into a queue + putGivenNumberOfMessages(queue, 4); + + // assert received messages + List<QueueEntry> messages = subscription.getMessages(); + assertEquals("Only 2 messages should be returned", 2, messages.size()); + assertEquals("ID of first message should be 1", new Long(1), + ((AMQMessage) messages.get(0).getMessage()).getMessageId()); + assertEquals("ID of second message should be 3", new Long(3), + ((AMQMessage) messages.get(1).getMessage()).getMessageId()); + } + + /** + * A helper method to create a queue with given name + * + * @param name + * queue name + * @return queue + */ + private SimpleAMQQueue createQueue(String name) + { + SimpleAMQQueue queue = null; + try + { + AMQShortString queueName = new AMQShortString(name); + AMQShortString ownerName = new AMQShortString(name + "Owner"); + queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(queueName, false, ownerName, false, false, + _virtualHost, _arguments); + } + catch (AMQException e) + { + fail("Failure to create a queue:" + e.getMessage()); + } + assertNotNull("Queue was not created", queue); + return queue; + } + + /** + * A helper method to put given number of messages into queue + * <p> + * All messages are asserted that they are present on queue + * + * @param queue + * queue to put messages into + * @param messageNumber + * number of messages to put into queue + */ + private List<QueueEntry> enqueueGivenNumberOfMessages(AMQQueue queue, int messageNumber) + { + putGivenNumberOfMessages(queue, messageNumber); + + // make sure that all enqueued messages are on the queue + List<QueueEntry> entries = queue.getMessagesOnTheQueue(); + assertEquals(messageNumber, entries.size()); + for (int i = 0; i < messageNumber; i++) + { + assertEquals(new Long(i), ((AMQMessage)entries.get(i).getMessage()).getMessageId()); + } + return entries; + } + + /** + * A helper method to put given number of messages into queue + * <p> + * Queue is not checked if messages are added into queue + * + * @param queue + * queue to put messages into + * @param messageNumber + * number of messages to put into queue + * @param queue + * @param messageNumber + */ + private void putGivenNumberOfMessages(AMQQueue queue, int messageNumber) + { + for (int i = 0; i < messageNumber; i++) + { + // Create message + Long messageId = new Long(i); + AMQMessage message = null; + try + { + message = createMessage(messageId); + } + catch (AMQException e) + { + fail("Failure to create a test message:" + e.getMessage()); + } + // Put message on queue + try + { + queue.enqueue(message); + } + catch (AMQException e) + { + fail("Failure to put message on queue:" + e.getMessage()); + } + } + } + + /** + * A helper method to dequeue an entry on queue with given index + * + * @param queue + * queue to dequeue message on + * @param dequeueMessageIndex + * entry index to dequeue. + */ + private QueueEntry dequeueMessage(AMQQueue queue, int dequeueMessageIndex) + { + List<QueueEntry> entries = queue.getMessagesOnTheQueue(); + QueueEntry entry = entries.get(dequeueMessageIndex); + entry.acquire(); + entry.dequeue(); + assertTrue(entry.isDequeued()); + return entry; + } + private List<QueueEntry> createEntriesList(QueueEntry... entries) { ArrayList<QueueEntry> entriesList = new ArrayList<QueueEntry>(); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java index 320a75045a..7136f07ca5 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.queue; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.apache.qpid.AMQException; import org.apache.qpid.server.message.AMQMessage; import junit.framework.TestCase; @@ -155,5 +156,55 @@ public class SimpleQueueEntryListTest extends TestCase assertEquals("Count should have been equal",count,remainingMessages.size()); } - + + public void testDequedMessagedNotPresentInIterator() + { + int numberOfMessages = 10; + SimpleQueueEntryList entryList = new SimpleQueueEntryList(new MockAMQQueue("test")); + QueueEntry[] entries = new QueueEntry[numberOfMessages]; + + for(int i = 0; i < numberOfMessages ; i++) + { + AMQMessage message = null;; + try + { + message = new MockAMQMessage(i); + } + catch (AMQException e) + { + fail("Failure to create a mock message:" + e.getMessage()); + } + QueueEntry entry = entryList.add(message); + assertNotNull("QE should not be null", entry); + entries[i]= entry; + } + + // dequeue all even messages + for (QueueEntry queueEntry : entries) + { + long i = ((AMQMessage)queueEntry.getMessage()).getMessageId().longValue(); + if (i%2 == 0) + { + queueEntry.acquire(); + queueEntry.dequeue(); + } + } + + // iterate and check that dequeued messages are not returned by iterator + QueueEntryIterator it = entryList.iterator(); + int counter = 0; + int i = 1; + while (it.advance()) + { + QueueEntry entry = it.getNode(); + Long id = ((AMQMessage)entry.getMessage()).getMessageId(); + assertEquals("Expected message with id " + i + " but got message with id " + + id, new Long(i), id); + counter++; + i += 2; + } + int expectedNumber = numberOfMessages / 2; + assertEquals("Expected " + expectedNumber + " number of entries in iterator but got " + counter, + expectedNumber, counter); + } } |
