diff options
| author | Robert Gemmell <robbie@apache.org> | 2009-07-21 09:05:21 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2009-07-21 09:05:21 +0000 |
| commit | 270c4f63c40417af21c7fb72ef1b027d9dfc5987 (patch) | |
| tree | d9e92258b01c5f223e3dc88cc2532dacaa7ef03e /qpid/java | |
| parent | aa39f625f7686b801922876e60f3ee9b505a6228 (diff) | |
| download | qpid-python-270c4f63c40417af21c7fb72ef1b027d9dfc5987.tar.gz | |
QPID-1961: expand viewMessages() queue operation to support long parameters, deprecate previous int version.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@796196 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
7 files changed, 181 insertions, 9 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 014b348822..4643326df3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -112,6 +112,17 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue> List<Long> getMessagesOnTheQueue(int num, int offest); QueueEntry getMessageOnTheQueue(long messageId); + + /** + * Returns a list of QueEntries from a given range of queue positions, eg messages 5 to 10 on the queue. + * + * The 'queue position' index starts from 1. Using 0 in 'from' will be ignored and continue from 1. + * Using 0 in the 'to' field will return an empty list regardless of the 'from' value. + * @param fromPosition + * @param toPosition + * @return + */ + public List<QueueEntry> getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition); void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index 785b668687..77c45d6d16 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -380,25 +380,45 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que /** * Returns the header contents of the messages stored in this queue in tabular form. + * Deprecated as of Qpid JMX API 1.3 */ + @Deprecated public TabularData viewMessages(int beginIndex, int endIndex) throws JMException { - if ((beginIndex > endIndex) || (beginIndex < 1)) + return viewMessages((long)beginIndex,(long)endIndex); + } + + + /** + * Returns the header contents of the messages stored in this queue in tabular form. + * @param startPosition The queue position of the first message to be viewed + * @param endPosition The queue position of the last message to be viewed + */ + public TabularData viewMessages(long startPosition, long endPosition) throws JMException + { + if ((startPosition > endPosition) || (startPosition < 1)) { - throw new OperationsException("From Index = " + beginIndex + ", To Index = " + endIndex + throw new OperationsException("From Index = " + startPosition + ", To Index = " + endPosition + "\n\"From Index\" should be greater than 0 and less than \"To Index\""); } + + if ((endPosition - startPosition) > Integer.MAX_VALUE) + { + throw new OperationsException("Specified MessageID interval is too large. Intervals must be less than 2^31 in size"); + } - List<QueueEntry> list = _queue.getMessagesOnTheQueue(); + List<QueueEntry> list = _queue.getMessagesRangeOnTheQueue(startPosition,endPosition); TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType); try { // Create the tabular list of message header contents - for (int i = beginIndex; (i <= endIndex) && (i <= list.size()); i++) + int size = list.size(); + + for (int i = 0; i < size ; i++) { - long position = i; - AMQMessage msg = list.get(i - 1).getMessage(); + long position = startPosition + i; + AMQMessage msg = list.get(i).getMessage(); ContentHeaderBody headerBody = msg.getContentHeaderBody(); // Create header attributes list String[] headerAttributes = getMessageHeaderProperties(headerBody); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 08daf715ef..e994967dc5 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -813,6 +813,43 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener return entryList; } + + /** + * Returns a list of QueEntries from a given range of queue positions, eg messages 5 to 10 on the queue. + * + * The 'queue position' index starts from 1. Using 0 in 'from' will be ignored and continue from 1. + * Using 0 in the 'to' field will return an empty list regardless of the 'from' value. + * @param fromPosition + * @param toPosition + * @return + */ + public List<QueueEntry> getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition) + { + List<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); + + QueueEntryIterator it = _entries.iterator(); + + long index = 1; + for ( ; index < fromPosition && !it.atTail(); index++) + { + it.advance(); + } + + if(index < fromPosition) + { + //The queue does not contain enough entries to reach our range. + //return the empty list. + return queueEntries; + } + + for ( ; index <= toPosition && !it.atTail(); index++) + { + it.advance(); + queueEntries.add(it.getNode()); + } + + return queueEntries; + } public void moveMessagesToAnotherQueue(final long fromMessageId, final long toMessageId, diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index ce986cf55b..80a9275954 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -187,7 +187,7 @@ public class AMQQueueMBeanTest extends TestCase { try { - _queueMBean.viewMessages(0, 3); + _queueMBean.viewMessages(0L, 3L); fail(); } catch (JMException ex) @@ -197,7 +197,7 @@ public class AMQQueueMBeanTest extends TestCase try { - _queueMBean.viewMessages(2, 1); + _queueMBean.viewMessages(2L, 1L); fail(); } catch (JMException ex) @@ -207,13 +207,25 @@ public class AMQQueueMBeanTest extends TestCase try { - _queueMBean.viewMessages(-1, 1); + _queueMBean.viewMessages(-1L, 1L); fail(); } catch (JMException ex) { } + + try + { + long end = Integer.MAX_VALUE; + end+=2; + _queueMBean.viewMessages(1L, end); + fail("Expected Exception due to oversized(> 2^31) message range"); + } + catch (JMException ex) + { + + } IncomingMessage msg = message(false, false); long id = msg.getMessageId(); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index 20503bf15c..f73366c197 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -209,6 +209,11 @@ public class MockAMQQueue implements AMQQueue { return null; //To change body of implemented methods use File | Settings | File Templates. } + + public List<QueueEntry> getMessagesRangeOnTheQueue(long fromPosition, long toPosition) + { + return null; + } public void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, StoreContext storeContext) { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 3084dc7fa1..1c11a7926d 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -335,6 +335,69 @@ public class SimpleAMQQueueTest extends TestCase assertEquals("Message ID was wrong", messageId, msgids.get(i)); } } + + public void testGetMessagesRangeOnTheQueue() throws Exception + { + for (int i = 1 ; i <= 10; i++) + { + // Create message + Long messageId = new Long(i); + AMQMessage message = createMessage(messageId); + // Put message on queue + _queue.enqueue(null, message); + } + + // Get non-existent 0th QueueEntry & check returned list was empty + // (the position parameters in this method are indexed from 1) + List<QueueEntry> entries = _queue.getMessagesRangeOnTheQueue(0, 0); + assertTrue(entries.size() == 0); + + // Check that when 'from' is 0 it is ignored and the range continues from 1 + entries = _queue.getMessagesRangeOnTheQueue(0, 2); + assertTrue(entries.size() == 2); + long msgID = entries.get(0).getMessage().getMessageId(); + assertEquals("Message ID was wrong", msgID, 1L); + msgID = entries.get(1).getMessage().getMessageId(); + assertEquals("Message ID was wrong", msgID, 2L); + + // Check that when 'from' is greater than 'to' the returned list is empty + entries = _queue.getMessagesRangeOnTheQueue(5, 4); + assertTrue(entries.size() == 0); + + // Get first QueueEntry & check id + entries = _queue.getMessagesRangeOnTheQueue(1, 1); + assertTrue(entries.size() == 1); + msgID = entries.get(0).getMessage().getMessageId(); + assertEquals("Message ID was wrong", msgID, 1L); + + // Get 5th,6th,7th entries and check id's + entries = _queue.getMessagesRangeOnTheQueue(5, 7); + assertTrue(entries.size() == 3); + msgID = entries.get(0).getMessage().getMessageId(); + assertEquals("Message ID was wrong", msgID, 5L); + msgID = entries.get(1).getMessage().getMessageId(); + assertEquals("Message ID was wrong", msgID, 6L); + msgID = entries.get(2).getMessage().getMessageId(); + assertEquals("Message ID was wrong", msgID, 7L); + + // Get 10th QueueEntry & check id + entries = _queue.getMessagesRangeOnTheQueue(10, 10); + assertTrue(entries.size() == 1); + msgID = entries.get(0).getMessage().getMessageId(); + assertEquals("Message ID was wrong", msgID, 10L); + + // Get non-existent 11th QueueEntry & check returned set was empty + entries = _queue.getMessagesRangeOnTheQueue(11, 11); + assertTrue(entries.size() == 0); + + // Get 9th,10th, and non-existent 11th entries & check result is of size 2 with correct IDs + entries = _queue.getMessagesRangeOnTheQueue(9, 11); + assertTrue(entries.size() == 2); + msgID = entries.get(0).getMessage().getMessageId(); + assertEquals("Message ID was wrong", msgID, 9L); + msgID = entries.get(1).getMessage().getMessageId(); + assertEquals("Message ID was wrong", msgID, 10L); + } public void testEnqueueDequeueOfPersistentMessageToNonDurableQueue() throws AMQException { diff --git a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java index 9c21d64cdf..1232cfc9b4 100644 --- a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java +++ b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java @@ -207,18 +207,42 @@ public interface ManagedQueue /** * Returns a subset of all the messages stored in the queue. The messages * are returned based on the given index numbers. + * + * Deprecated as of Qpid JMX API 1.3 * @param fromIndex * @param toIndex * @return * @throws IOException * @throws JMException */ + @Deprecated @MBeanOperation(name="viewMessages", description="Message headers for messages in this queue within given index range. eg. from index 1 - 100") TabularData viewMessages(@MBeanOperationParameter(name="from index", description="from index")int fromIndex, @MBeanOperationParameter(name="to index", description="to index")int toIndex) throws IOException, JMException; + + /** + * Returns a subset (up to 2^31 messages at a time) of all the messages stored on the queue. + * The messages are returned based on the given queue position range. + * @param startPosition + * @param endPosition + * @return + * @throws IOException + * @throws JMException + */ + @MBeanOperation(name="viewMessages", + description="Message headers for messages in this queue within given queue positions range. eg. from index 1 - 100") + TabularData viewMessages(@MBeanOperationParameter(name="start position", description="start position")long startPosition, + @MBeanOperationParameter(name="end position", description="end position")long endPosition) + throws IOException, JMException; + /** + * Returns the content for the given AMQ Message ID. + * + * @throws IOException + * @throws JMException + */ @MBeanOperation(name="viewMessageContent", description="The message content for given Message Id") CompositeData viewMessageContent(@MBeanOperationParameter(name="Message Id", description="Message Id")long messageId) throws IOException, JMException; |
