diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2009-03-02 15:13:57 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2009-03-02 15:13:57 +0000 |
| commit | 36d2a03f183eecc1ccb7be9e89a56f4aec3dd945 (patch) | |
| tree | 35d120911544f87808c8f1b960f7bad3ad6b8b02 /qpid/java/broker/src/test | |
| parent | ce17ef36e3517d0c5506233d45764818f1a2ad57 (diff) | |
| download | qpid-python-36d2a03f183eecc1ccb7be9e89a56f4aec3dd945.tar.gz | |
QPID-1637 : Added Purger thread for Priority Queues and when threasholds are adjusted.
QueueEntries are now the point of entry to load/unload rather than the List. This is because it is only the QueueEntryList that the QueueEntry that is attached to that can correctly account for the inMemory usage. In the Priority Queue case the priority queue does not know which sub list the QueueEntry is on. As the QEI knows it makes sence to request load/unload through the entry.
Set the default Maximum InMemory to -1, disabled.
Removed the FlowableQueueEntryList interface, merged with QueueEntryList
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@749331 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src/test')
4 files changed, 141 insertions, 21 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 4716f6691a..c50770d7ba 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -351,7 +351,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase return false; //To change body of implemented methods use File | Settings | File Templates. } - public void unload() throws UnableToFlowMessageException + public void unload() { //To change body of implemented methods use File | Settings | File Templates. } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java index f73bafd3b4..623f57b224 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java @@ -24,18 +24,20 @@ import junit.framework.AssertionFailedError; import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.txn.NonTransactionalContext; import java.util.ArrayList; public class AMQPriorityQueueTest extends SimpleAMQQueueTest { - private static final long MESSAGE_SIZE = 100L; + private static final int PRIORITIES = 3; @Override protected void setUp() throws Exception - { + { _arguments = new FieldTable(); - _arguments.put(AMQQueueFactory.X_QPID_PRIORITIES, 3); + _arguments.put(AMQQueueFactory.X_QPID_PRIORITIES, PRIORITIES); super.setUp(); } @@ -84,7 +86,6 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest int index = 1; for (QueueEntry qe : msgs) { - System.err.println(index + ":" + qe.getMessage().getMessageId()); index++; } @@ -96,16 +97,91 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest protected AMQMessage createMessage(byte i) throws AMQException { AMQMessage message = super.createMessage(); - - ((BasicContentHeaderProperties)message.getContentHeaderBody().properties).setPriority(i); + + ((BasicContentHeaderProperties) message.getContentHeaderBody().properties).setPriority(i); return message; } - @Override - public void testMessagesFlowToDisk() throws AMQException, InterruptedException + + public void testMessagesFlowToDiskWithPriority() throws AMQException, InterruptedException { - //Disable this test pending completion of QPID-1637 + int PRIORITIES = 1; + FieldTable arguments = new FieldTable(); + arguments.put(AMQQueueFactory.X_QPID_PRIORITIES, PRIORITIES); + + // Create IncomingMessage and nondurable queue + NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null); + + //Create a priorityQueue + _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testMessagesFlowToDiskWithPriority"), false, _owner, false, _virtualHost, arguments); + + MESSAGE_SIZE = 1; + long MEMORY_MAX = PRIORITIES * 2; + int MESSAGE_COUNT = (int) MEMORY_MAX * 2; + //Set the Memory Usage to be very low + _queue.setMemoryUsageMaximum(MEMORY_MAX); + + for (int msgCount = 0; msgCount < MESSAGE_COUNT / 2; msgCount++) + { + sendMessage(txnContext, (msgCount % 10)); + } + + //Check that we can hold 10 messages without flowing + assertEquals(MESSAGE_COUNT / 2, _queue.getMessageCount()); + assertEquals(MEMORY_MAX, _queue.getMemoryUsageMaximum()); + assertEquals(_queue.getMemoryUsageMaximum(), _queue.getMemoryUsageCurrent()); + assertTrue("Queue is flowed.", !_queue.isFlowed()); + + // Send another and ensure we are flowed + sendMessage(txnContext); + + assertTrue("Queue is not flowed.", _queue.isFlowed()); + assertEquals(MESSAGE_COUNT / 2 + 1, _queue.getMessageCount()); + assertEquals(MESSAGE_COUNT / 2, _queue.getMemoryUsageCurrent()); + + + //send another 99 so there are 200msgs in total on the queue + for (int msgCount = 0; msgCount < (MESSAGE_COUNT / 2) - 1; msgCount++) + { + sendMessage(txnContext); + + long usage = _queue.getMemoryUsageCurrent(); + assertTrue("Queue has gone over quota:" + usage, + usage <= _queue.getMemoryUsageMaximum()); + + assertTrue("Queue has a negative quota:" + usage, usage > 0); + + } + assertEquals(MESSAGE_COUNT, _queue.getMessageCount()); + assertEquals(MEMORY_MAX, _queue.getMemoryUsageCurrent()); + assertTrue("Queue is not flowed.", _queue.isFlowed()); + + _queue.registerSubscription(_subscription, false); + + int slept = 0; + while (_subscription.getQueueEntries().size() != MESSAGE_COUNT && slept < 10) + { + Thread.sleep(500); + slept++; + } + + //Ensure the messages are retreived + assertEquals("Not all messages were received, slept:" + slept / 2 + "s", MESSAGE_COUNT, _subscription.getQueueEntries().size()); + + //Check the queue is still within it's limits. + assertTrue("Queue has gone over quota:" + _queue.getMemoryUsageCurrent(), + _queue.getMemoryUsageCurrent() <= _queue.getMemoryUsageMaximum()); + + assertTrue("Queue has a negative quota:" + _queue.getMemoryUsageCurrent(), _queue.getMemoryUsageCurrent() >= 0); + + for (int index = 0; index < MESSAGE_COUNT; index++) + { + // Ensure that we have received the messages and it wasn't flushed to disk before we received it. + AMQMessage message = _subscription.getMessages().get(index); + assertNotNull("Message:" + message.debugIdentity() + " was null.", message); + } + } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 0e2b17914c..5d7fa21d56 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -20,7 +20,6 @@ package org.apache.qpid.server.queue; * */ -import junit.framework.AssertionFailedError; import junit.framework.TestCase; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.qpid.AMQException; @@ -60,7 +59,7 @@ public class SimpleAMQQueueTest extends TestCase protected FieldTable _arguments = null; MessagePublishInfo info = new MessagePublishInfoImpl(); - private static long MESSAGE_SIZE = 100; + protected static long MESSAGE_SIZE = 100; @Override protected void setUp() throws Exception @@ -368,7 +367,7 @@ public class SimpleAMQQueueTest extends TestCase long MEMORY_MAX = 500; int MESSAGE_COUNT = (int) MEMORY_MAX * 2; //Set the Memory Usage to be very low - _queue.setMemoryUsageMaximum(MEMORY_MAX); + _queue.setMemoryUsageMaximum(MEMORY_MAX); for (int msgCount = 0; msgCount < MESSAGE_COUNT / 2; msgCount++) { @@ -395,7 +394,7 @@ public class SimpleAMQQueueTest extends TestCase assertTrue("Queue has gone over quota:" + usage, usage <= _queue.getMemoryUsageMaximum()); - assertTrue("Queue has a negative quota:" + usage,usage > 0); + assertTrue("Queue has a negative quota:" + usage, usage > 0); } assertEquals(MESSAGE_COUNT, _queue.getMessageCount()); @@ -412,13 +411,14 @@ public class SimpleAMQQueueTest extends TestCase } //Ensure the messages are retreived - assertEquals("Not all messages were received, slept:"+slept/2+"s", MESSAGE_COUNT, _subscription.getQueueEntries().size()); + assertEquals("Not all messages were received, slept:" + slept / 2 + "s", MESSAGE_COUNT, _subscription.getQueueEntries().size()); //Check the queue is still within it's limits. - assertTrue("Queue has gone over quota:" + _queue.getMemoryUsageCurrent(), - _queue.getMemoryUsageCurrent() <= _queue.getMemoryUsageMaximum()); + long current = _queue.getMemoryUsageCurrent(); + assertTrue("Queue has gone over quota:" + current+"/"+_queue.getMemoryUsageMaximum() , + current <= _queue.getMemoryUsageMaximum()); - assertTrue("Queue has a negative quota:" + _queue.getMemoryUsageCurrent(), _queue.getMemoryUsageCurrent() > 0); + assertTrue("Queue has a negative quota:" + _queue.getMemoryUsageCurrent(), _queue.getMemoryUsageCurrent() >= 0); for (int index = 0; index < MESSAGE_COUNT; index++) { @@ -426,10 +426,52 @@ public class SimpleAMQQueueTest extends TestCase AMQMessage message = _subscription.getMessages().get(index); assertNotNull("Message:" + message.debugIdentity() + " was null.", message); } + } + + public void testMessagesFlowToDiskPurger() throws AMQException, InterruptedException + { + // Create IncomingMessage and nondurable queue + NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null); + + MESSAGE_SIZE = 1; + long MEMORY_MAX = 10; + int MESSAGE_COUNT = (int) MEMORY_MAX; + //Set the Memory Usage to be very low + _queue.setMemoryUsageMaximum(MEMORY_MAX); + for (int msgCount = 0; msgCount < MESSAGE_COUNT; msgCount++) + { + sendMessage(txnContext); + } + + //Check that we can hold all messages without flowing + assertEquals(MESSAGE_COUNT, _queue.getMessageCount()); + assertEquals(MEMORY_MAX, _queue.getMemoryUsageCurrent()); + assertTrue("Queue is flowed.", !_queue.isFlowed()); + + // Send anothe and ensure we are flowed + sendMessage(txnContext); + assertEquals(MESSAGE_COUNT + 1, _queue.getMessageCount()); + assertEquals(MESSAGE_COUNT , _queue.getMemoryUsageCurrent()); + assertTrue("Queue is not flowed.", _queue.isFlowed()); + + _queue.setMemoryUsageMaximum(0L); + + //Give the purger time to work + Thread.sleep(200); + + assertEquals(MESSAGE_COUNT + 1, _queue.getMessageCount()); + assertEquals(0L , _queue.getMemoryUsageCurrent()); + assertTrue("Queue is not flowed.", _queue.isFlowed()); + + } + + protected void sendMessage(TransactionalContext txnContext) throws AMQException + { + sendMessage(txnContext, 5); } - private void sendMessage(TransactionalContext txnContext) throws AMQException + protected void sendMessage(TransactionalContext txnContext, int priority) throws AMQException { IncomingMessage msg = new IncomingMessage(info, txnContext, new MockProtocolSession(_transactionLog), _transactionLog); @@ -438,6 +480,7 @@ public class SimpleAMQQueueTest extends TestCase contentHeaderBody.bodySize = MESSAGE_SIZE; contentHeaderBody.properties = new BasicContentHeaderProperties(); ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2); + ((BasicContentHeaderProperties) contentHeaderBody.properties).setPriority((byte) priority); msg.setContentHeaderBody(contentHeaderBody); long messageId = msg.getMessageId(); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java index 0c33b406e6..40961a3d2e 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java @@ -48,8 +48,9 @@ public class SimpleAMQQueueThreadPoolTest extends TestCase //This is +2 because: // 1 - asyncDelivery Thread - // 2 - queueInhalerThread - assertEquals("References not increased", initialCount + 2, ReferenceCountingExecutorService.getInstance().getReferenceCount()); + // 2 - queue InhalerThread + // 3 - queue PurgerThread + assertEquals("References not increased", initialCount + 3, ReferenceCountingExecutorService.getInstance().getReferenceCount()); queue.stop(); |
