diff options
author | Aidan Skinner <aidan@apache.org> | 2008-08-08 10:31:26 +0000 |
---|---|---|
committer | Aidan Skinner <aidan@apache.org> | 2008-08-08 10:31:26 +0000 |
commit | ea3c6610750f8659233aec3be65279e16f3264d9 (patch) | |
tree | 4f71e4b0ed364799a9fa0479eef0788dd0ef8b00 /java/broker | |
parent | f16bebd4cc0380d83c1f66dff9db10cad2f9854e (diff) | |
download | qpid-python-ea3c6610750f8659233aec3be65279e16f3264d9.tar.gz |
QPID-1224: add methods to get the list of message ids from a queue, with optional offset. Test class for this.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@683932 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
3 files changed, 204 insertions, 0 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index f7bc2ddafa..c9c252f06d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -104,6 +104,10 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue> List<QueueEntry> getMessagesOnTheQueue(long fromMessageId, long toMessageId); + List<Long> getMessagesOnTheQueue(int num); + + List<Long> getMessagesOnTheQueue(int num, int offest); + QueueEntry getMessageOnTheQueue(long messageId); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 1674c26232..b0f700d4a1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -1613,4 +1613,26 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener deliverAsync(_sub); } } + + public List<Long> getMessagesOnTheQueue(int num) + { + return getMessagesOnTheQueue(num, 0); + } + + public List<Long> getMessagesOnTheQueue(int num, int offset) + { + ArrayList<Long> ids = new ArrayList<Long>(num); + QueueEntryIterator it = _entries.iterator(); + for (int i = 0; i < offset; i++) + { + it.advance(); + } + + for (int i = 0; i < num && !it.atTail(); i++) + { + it.advance(); + ids.add(it.getNode().getMessage().getMessageId()); + } + return ids; + } }
\ No newline at end of file diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java new file mode 100644 index 0000000000..c69ca507ef --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -0,0 +1,178 @@ +package org.apache.qpid.server.queue; + +import java.util.List; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.txn.NonTransactionalContext; +import org.apache.qpid.server.txn.TransactionalContext; +import org.apache.qpid.server.virtualhost.VirtualHost; + +import junit.framework.TestCase; + +public class SimpleAMQQueueTest extends TestCase +{ + + private SimpleAMQQueue _queue; + private MessageStore store = new TestableMemoryMessageStore(); + private TransactionalContext ctx = new NonTransactionalContext(store, new StoreContext(), null, null); + private MessageHandleFactory factory = new MessageHandleFactory(); + + MessagePublishInfo info = new MessagePublishInfo() + { + + public AMQShortString getExchange() + { + return null; + } + + public void setExchange(AMQShortString exchange) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isImmediate() + { + return false; + } + + public boolean isMandatory() + { + return false; + } + + public AMQShortString getRoutingKey() + { + return null; + } + }; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + AMQShortString qname = new AMQShortString("qname"); + AMQShortString owner = new AMQShortString("owner"); + _queue = new SimpleAMQQueue(qname, false, owner, false, new VirtualHost("vhost", store)); + } + + public void testGetFirstMessageId() throws Exception + { + // Create message + Long messageId = new Long(23); + AMQMessage message = new TestMessage(messageId, messageId, info, new StoreContext()); + + // Put message on queue + _queue.enqueue(null, message); + // Get message id + Long testmsgid = _queue.getMessagesOnTheQueue(1).get(0); + + // Check message id + assertEquals("Message ID was wrong", messageId, testmsgid); + } + + public void testGetFirstFiveMessageIds() throws Exception + { + for (int i = 0 ; i < 5; i++) + { + // Create message + Long messageId = new Long(i); + AMQMessage message = new TestMessage(messageId, messageId, info, new StoreContext()); + // Put message on queue + _queue.enqueue(null, message); + } + // Get message ids + List<Long> msgids = _queue.getMessagesOnTheQueue(5); + + // Check message id + for (int i = 0; i < 5; i++) + { + Long messageId = new Long(i); + assertEquals("Message ID was wrong", messageId, msgids.get(i)); + } + } + + public void testGetLastFiveMessageIds() throws Exception + { + for (int i = 0 ; i < 10; i++) + { + // Create message + Long messageId = new Long(i); + AMQMessage message = new TestMessage(messageId, messageId, info, new StoreContext()); + // Put message on queue + _queue.enqueue(null, message); + } + // Get message ids + List<Long> msgids = _queue.getMessagesOnTheQueue(5, 5); + + // Check message id + for (int i = 0; i < 5; i++) + { + Long messageId = new Long(i+5); + assertEquals("Message ID was wrong", messageId, msgids.get(i)); + } + } + + + // FIXME: move this to somewhere useful + private static AMQMessageHandle createMessageHandle(final long messageId, final MessagePublishInfo publishBody) + { + final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, + null, + false); + try + { + amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(), + publishBody, + new ContentHeaderBody() + { + public int getSize() + { + return 1; + } + }); + } + catch (AMQException e) + { + // won't happen + } + + + return amqMessageHandle; + } + + public class TestMessage extends AMQMessage + { + private final long _tag; + private int _count; + + TestMessage(long tag, long messageId, MessagePublishInfo publishBody, StoreContext storeContext) + throws AMQException + { + super(createMessageHandle(messageId, publishBody), storeContext, publishBody); + _tag = tag; + } + + + public boolean incrementReference() + { + _count++; + return true; + } + + public void decrementReference(StoreContext context) + { + _count--; + } + + void assertCountEquals(int expected) + { + assertEquals("Wrong count for message with tag " + _tag, expected, _count); + } + } +} |