summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/test
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2009-02-27 13:35:38 +0000
committerMartin Ritchie <ritchiem@apache.org>2009-02-27 13:35:38 +0000
commit5ea9fa4baa5219c7666ccddb04703289e56cbd6f (patch)
tree9a60102c9c04bc9155120fdf022196b11d803260 /qpid/java/broker/src/test
parent415ab7c63ae295e275eed2b7d3838555b6519d44 (diff)
downloadqpid-python-5ea9fa4baa5219c7666ccddb04703289e56cbd6f.tar.gz
QPID-1635,QPID-1636 : Moved additional properties from AMQMessage up to QueueEntry to allow processing whilst messasge has been flowed. Moved : _flags (for Immediate and delivered status), expiry, messageID. Created base class to maintain counts of data and objects in queue. Removed this responsibility from the AMQQueues and on to the QueueEntryLists. This will more easily allow the QEL structure to be flowed to disk at a later stage. Updated tests as a result of moves.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@748516 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src/test')
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java31
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java7
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java199
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java114
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java177
6 files changed, 328 insertions, 202 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 78cf610f28..6021f100f5 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -40,6 +40,7 @@ import org.apache.qpid.server.queue.MessageCleanupException;
import org.apache.qpid.server.queue.MockProtocolSession;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.SimpleAMQQueue;
+import org.apache.qpid.server.queue.UnableToFlowMessageException;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.store.SkeletonMessageStore;
@@ -229,6 +230,11 @@ public class AbstractHeadersExchangeTestBase extends TestCase
return false; //To change body of implemented methods use File | Settings | File Templates.
}
+ public void setExpiration(long expiration)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public boolean isAcquired()
{
return false; //To change body of implemented methods use File | Settings | File Templates.
@@ -264,6 +270,11 @@ public class AbstractHeadersExchangeTestBase extends TestCase
//To change body of implemented methods use File | Settings | File Templates.
}
+ public void setDeliveredToConsumer()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void release()
{
//To change body of implemented methods use File | Settings | File Templates.
@@ -314,32 +325,38 @@ public class AbstractHeadersExchangeTestBase extends TestCase
//To change body of implemented methods use File | Settings | File Templates.
}
- public void dispose(final StoreContext storeContext) throws MessageCleanupException
+
+ public void dequeueAndDelete(StoreContext storeContext) throws FailedDequeueException
{
//To change body of implemented methods use File | Settings | File Templates.
}
- public void restoreCredit()
+ public boolean isQueueDeleted()
{
- //To change body of implemented methods use File | Settings | File Templates.
+ return false; //To change body of implemented methods use File | Settings | File Templates.
}
- public void dequeueAndDelete(StoreContext storeContext) throws FailedDequeueException
+ public void addStateChangeListener(StateChangeListener listener)
{
//To change body of implemented methods use File | Settings | File Templates.
}
- public boolean isQueueDeleted()
+ public boolean removeStateChangeListener(StateChangeListener listener)
{
return false; //To change body of implemented methods use File | Settings | File Templates.
}
- public void addStateChangeListener(StateChangeListener listener)
+ public void flow() throws UnableToFlowMessageException
{
//To change body of implemented methods use File | Settings | File Templates.
}
- public boolean removeStateChangeListener(StateChangeListener listener)
+ public void recover()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isFlowed()
{
return false; //To change body of implemented methods use File | Settings | File Templates.
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
index ba02e6f6bd..a11e60d7de 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
@@ -102,4 +102,11 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
return message;
}
+
+ @Override
+ public void testMessagesFlowToDisk() throws AMQException, InterruptedException
+ {
+ //Disable this test pending completion of QPID-1637
+ }
+
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
index cc6c486e11..b38da53406 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
public class MockAMQMessage extends TransientAMQMessage
{
@@ -29,6 +30,7 @@ public class MockAMQMessage extends TransientAMQMessage
throws AMQException
{
super(messageId);
+ _messagePublishInfo = new MessagePublishInfoImpl(null,false,false,null);
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java
index f7cd860c22..9e12e1bef7 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java
@@ -21,16 +21,25 @@
package org.apache.qpid.server.queue;
import junit.framework.TestCase;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.ContentHeaderProperties;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.server.store.StoreContext;
+
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
public class QueueEntryImplTest extends TestCase
{
- /**
- * Test the Redelivered state of a QueueEntryImpl
- */
+ /** Test the Redelivered state of a QueueEntryImpl */
public void testRedelivered()
{
- QueueEntry entry = new QueueEntryImpl(null, null);
+ QueueEntry entry = new MockQueueEntry(null);
assertFalse("New message should not be redelivered", entry.isRedelivered());
@@ -45,5 +54,187 @@ public class QueueEntryImplTest extends TestCase
}
+ public void testImmediateAndNotDelivered()
+ {
+ AMQMessage message = MessageFactory.getInstance().createMessage(null, false);
+
+ MessagePublishInfo mpi = new MessagePublishInfoImpl(null, true, false, null);
+ int bodySize = 0;
+
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+
+ props.setAppId("HandleTest");
+
+ ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
+
+ try
+ {
+ message.setPublishAndContentHeaderBody(null, mpi, chb);
+
+ QueueEntry queueEntry = new MockQueueEntry(message);
+
+ assertTrue("Undelivered Immediate message should still be marked as so", queueEntry.immediateAndNotDelivered());
+
+ assertFalse("Undelivered Message should not say it is delivered.", queueEntry.getDeliveredToConsumer());
+
+ queueEntry.setDeliveredToConsumer();
+
+ assertTrue("Delivered Message should say it is delivered.", queueEntry.getDeliveredToConsumer());
+
+ assertFalse("Delivered Immediate message now be marked as so", queueEntry.immediateAndNotDelivered());
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+ }
+
+ public void testNotImmediateAndNotDelivered()
+ {
+ AMQMessage message = MessageFactory.getInstance().createMessage(null, false);
+
+ MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null);
+ int bodySize = 0;
+
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+
+ props.setAppId("HandleTest");
+
+ ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
+
+ try
+ {
+ message.setPublishAndContentHeaderBody(null, mpi, chb);
+
+ QueueEntry queueEntry = new MockQueueEntry(message);
+
+ assertFalse("Undelivered Non-Immediate message should not result in true.", queueEntry.immediateAndNotDelivered());
+
+ assertFalse("Undelivered Message should not say it is delivered.", queueEntry.getDeliveredToConsumer());
+
+ queueEntry.setDeliveredToConsumer();
+
+ assertTrue("Delivered Message should say it is delivered.", queueEntry.getDeliveredToConsumer());
+
+ assertFalse("Delivered Non-Immediate message not change this return", queueEntry.immediateAndNotDelivered());
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+ }
+
+ public void testExpiry()
+ {
+ AMQMessage message = MessageFactory.getInstance().createMessage(null, false);
+
+ MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null);
+ int bodySize = 0;
+
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+
+ props.setAppId("HandleTest");
+
+ ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
+
+ ReentrantLock waitLock = new ReentrantLock();
+ Condition wait = waitLock.newCondition();
+ try
+ {
+ message.setExpiration(System.currentTimeMillis() + 10L);
+
+ message.setPublishAndContentHeaderBody(null, mpi, chb);
+
+ QueueEntry queueEntry = new MockQueueEntry(message);
+
+ assertFalse("New messages should not be expired.", queueEntry.expired());
+
+ final long MILLIS = 1000000L;
+ long waitTime = 20 * MILLIS;
+
+ while (waitTime > 0)
+ {
+ try
+ {
+ waitLock.lock();
+
+ waitTime = wait.awaitNanos(waitTime);
+ }
+ catch (InterruptedException e)
+ {
+ //Stop if we are interrupted
+ fail(e.getMessage());
+ }
+ finally
+ {
+ waitLock.unlock();
+ }
+
+ }
+
+ assertTrue("After a sleep messages should now be expired.", queueEntry.expired());
+
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+ }
+
+ public void testNoExpiry()
+ {
+ AMQMessage message = MessageFactory.getInstance().createMessage(null, false);
+
+ MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null);
+ int bodySize = 0;
+
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+
+ props.setAppId("HandleTest");
+
+ ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
+
+ ReentrantLock waitLock = new ReentrantLock();
+ Condition wait = waitLock.newCondition();
+ try
+ {
+
+ message.setPublishAndContentHeaderBody(null, mpi, chb);
+
+ QueueEntry queueEntry = new MockQueueEntry(message);
+
+ assertFalse("New messages should not be expired.", queueEntry.expired());
+
+ final long MILLIS = 1000000L;
+ long waitTime = 10 * MILLIS;
+
+ while (waitTime > 0)
+ {
+ try
+ {
+ waitLock.lock();
+
+ waitTime = wait.awaitNanos(waitTime);
+ }
+ catch (InterruptedException e)
+ {
+ //Stop if we are interrupted
+ fail(e.getMessage());
+ }
+ finally
+ {
+ waitLock.unlock();
+ }
+
+ }
+
+ assertFalse("After a sleep messages without an expiry should not expire.", queueEntry.expired());
+
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+ }
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 7a97837208..9a5f7f20c6 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -21,8 +21,6 @@ package org.apache.qpid.server.queue;
*/
import junit.framework.TestCase;
-
-import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
@@ -35,13 +33,13 @@ import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.store.TestTransactionLog;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.subscription.MockSubscription;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.transactionlog.TransactionLog;
import java.util.ArrayList;
import java.util.List;
@@ -51,7 +49,7 @@ public class SimpleAMQQueueTest extends TestCase
protected SimpleAMQQueue _queue;
protected VirtualHost _virtualHost;
- protected TestableMemoryMessageStore _store = new TestableMemoryMessageStore();
+ protected TestableMemoryMessageStore _transactionLog = new TestableMemoryMessageStore();
protected AMQShortString _qname = new AMQShortString("qname");
protected AMQShortString _owner = new AMQShortString("owner");
protected AMQShortString _routingKey = new AMQShortString("routing key");
@@ -70,7 +68,7 @@ public class SimpleAMQQueueTest extends TestCase
ApplicationRegistry applicationRegistry = (ApplicationRegistry) ApplicationRegistry.getInstance(1);
PropertiesConfiguration env = new PropertiesConfiguration();
- _virtualHost = new VirtualHost(new VirtualHostConfiguration(getClass().getName(), env), _store);
+ _virtualHost = new VirtualHost(new VirtualHostConfiguration(getClass().getName(), env), _transactionLog);
applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost);
_queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, _virtualHost, _arguments);
@@ -320,8 +318,8 @@ public class SimpleAMQQueueTest extends TestCase
public void testEnqueueDequeueOfPersistentMessageToNonDurableQueue() throws AMQException
{
// Create IncomingMessage and nondurable queue
- NonTransactionalContext txnContext = new NonTransactionalContext(_store, null, null, null);
- IncomingMessage msg = new IncomingMessage(info, txnContext, new MockProtocolSession(_store), _store);
+ NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
+ IncomingMessage msg = new IncomingMessage(info, txnContext, new MockProtocolSession(_transactionLog), _transactionLog);
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
contentHeaderBody.properties = new BasicContentHeaderProperties();
@@ -335,18 +333,18 @@ public class SimpleAMQQueueTest extends TestCase
// Send persistent message
qs.add(_queue);
msg.enqueue(qs);
- msg.routingComplete(_store);
+ msg.routingComplete(_transactionLog);
- _store.storeMessageMetaData(null, messageId, new MessageMetaData(info, contentHeaderBody, 1));
+ _transactionLog.storeMessageMetaData(null, messageId, new MessageMetaData(info, contentHeaderBody, 1));
// Check that it is enqueued
- List<AMQQueue> data = _store.getMessageReferenceMap(messageId);
+ List<AMQQueue> data = _transactionLog.getMessageReferenceMap(messageId);
assertNotNull(data);
// Dequeue message
ContentHeaderBody header = new ContentHeaderBody();
header.bodySize = MESSAGE_SIZE;
- AMQMessage message = new MockPersistentAMQMessage(msg.getMessageId(), _store);
+ AMQMessage message = new MockPersistentAMQMessage(msg.getMessageId(), _transactionLog);
message.setPublishAndContentHeaderBody(new StoreContext(), info, header);
MockQueueEntry entry = new MockQueueEntry(message, _queue);
@@ -355,10 +353,97 @@ public class SimpleAMQQueueTest extends TestCase
entry.dequeue(null);
// Check that it is dequeued
- data = _store.getMessageReferenceMap(messageId);
+ data = _transactionLog.getMessageReferenceMap(messageId);
assertNull(data);
}
+ public void testMessagesFlowToDisk() throws AMQException, InterruptedException
+ {
+ // Create IncomingMessage and nondurable queue
+ NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
+
+ //Set the Memory Usage to be very low
+ _queue.setMemoryUsageMaximum(10);
+
+ for (int msgCount = 0; msgCount < 10; msgCount++)
+ {
+ sendMessage(txnContext);
+ }
+
+ //Check that we can hold 10 messages without flowing
+ assertEquals(10, _queue.getMessageCount());
+ assertEquals(10, _queue.getMemoryUsageCurrent());
+ assertTrue("Queue is flowed.", !_queue.isFlowed());
+
+ // Send anothe and ensure we are flowed
+ sendMessage(txnContext);
+ assertEquals(11, _queue.getMessageCount());
+ assertEquals(10, _queue.getMemoryUsageCurrent());
+ assertTrue("Queue is not flowed.", _queue.isFlowed());
+
+ //send another 9 so there are 20msgs in total on the queue
+ for (int msgCount = 0; msgCount < 9; msgCount++)
+ {
+ sendMessage(txnContext);
+ }
+ assertEquals(20, _queue.getMessageCount());
+ assertEquals(10, _queue.getMemoryUsageCurrent());
+ assertTrue("Queue is not flowed.", _queue.isFlowed());
+
+ _queue.registerSubscription(_subscription, false);
+
+ Thread.sleep(200);
+
+ //Ensure the messages are retreived
+ assertEquals("Not all messages were received.", 20, _subscription.getMessages().size());
+
+ //Ensure we got the content
+ for (int index = 0; index < 10; index++)
+ {
+ QueueEntry entry = _subscription.getMessages().get(index);
+ assertNotNull("Message:" + index + " was null.", entry.getMessage());
+ assertTrue(!entry.isFlowed());
+ }
+
+ //ensure we were received 10 flowed messages
+ for (int index = 10; index < 20; index++)
+ {
+ QueueEntry entry = _subscription.getMessages().get(index);
+ assertNull("Message:" + index + " was not null.", entry.getMessage());
+ assertTrue(entry.isFlowed());
+ }
+ }
+
+ private void sendMessage(TransactionalContext txnContext) throws AMQException
+ {
+ IncomingMessage msg = new IncomingMessage(info, txnContext, new MockProtocolSession(_transactionLog), _transactionLog);
+
+ ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
+ contentHeaderBody.bodySize = 1;
+ contentHeaderBody.properties = new BasicContentHeaderProperties();
+ ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2);
+ msg.setContentHeaderBody(contentHeaderBody);
+
+ long messageId = msg.getMessageId();
+
+ ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+
+ // Send persistent 10 messages
+
+ qs.add(_queue);
+ msg.enqueue(qs);
+
+ msg.routingComplete(_transactionLog);
+
+ msg.addContentBodyFrame(new MockContentChunk(1));
+
+ msg.deliverToQueues();
+
+ //Check message was correctly enqueued
+ List<AMQQueue> data = _transactionLog.getMessageReferenceMap(messageId);
+ assertNotNull(data);
+ }
+
// FIXME: move this to somewhere useful
private static AMQMessage createMessage(final MessagePublishInfo publishBody)
{
@@ -384,7 +469,7 @@ public class SimpleAMQQueueTest extends TestCase
public AMQMessage createMessage() throws AMQException
{
- AMQMessage message = new TestMessage(info, _store);
+ AMQMessage message = new TestMessage(info, _transactionLog);
ContentHeaderBody header = new ContentHeaderBody();
header.bodySize = MESSAGE_SIZE;
@@ -410,7 +495,6 @@ public class SimpleAMQQueueTest extends TestCase
_transactionLog = transactionLog;
}
-
void assertCountEquals(int expected)
{
assertEquals("Wrong count for message with tag " + _tag, expected,
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java
index 16d1ab60f3..6fd153f398 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java
@@ -287,180 +287,5 @@ public class TransientMessageTest extends TestCase
assertFalse(_message.isPersistent());
}
- public void testImmediateAndNotDelivered()
- {
- _message = newMessage();
-
- MessagePublishInfo mpi = new MessagePublishInfoImpl(null, true, false, null);
- int bodySize = 0;
-
- BasicContentHeaderProperties props = new BasicContentHeaderProperties();
-
- props.setAppId("HandleTest");
-
- ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
-
- try
- {
- _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb);
-
- assertTrue("Undelivered Immediate message should still be marked as so", _message.immediateAndNotDelivered());
-
- assertFalse("Undelivered Message should not say it is delivered.", _message.getDeliveredToConsumer());
-
- _message.setDeliveredToConsumer();
-
- assertTrue("Delivered Message should say it is delivered.", _message.getDeliveredToConsumer());
-
- assertFalse("Delivered Immediate message now be marked as so", _message.immediateAndNotDelivered());
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- }
- }
-
- public void testNotImmediateAndNotDelivered()
- {
- _message = newMessage();
-
- MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null);
- int bodySize = 0;
-
- BasicContentHeaderProperties props = new BasicContentHeaderProperties();
-
- props.setAppId("HandleTest");
-
- ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
-
- try
- {
- _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb);
-
- assertFalse("Undelivered Non-Immediate message should not result in true.", _message.immediateAndNotDelivered());
-
- assertFalse("Undelivered Message should not say it is delivered.", _message.getDeliveredToConsumer());
-
- _message.setDeliveredToConsumer();
-
- assertTrue("Delivered Message should say it is delivered.", _message.getDeliveredToConsumer());
-
- assertFalse("Delivered Non-Immediate message not change this return", _message.immediateAndNotDelivered());
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- }
- }
-
- public void testExpiry()
- {
- _message = newMessage();
-
- MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null);
- int bodySize = 0;
-
- BasicContentHeaderProperties props = new BasicContentHeaderProperties();
-
- props.setAppId("HandleTest");
-
- ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
-
- ReentrantLock waitLock = new ReentrantLock();
- Condition wait = waitLock.newCondition();
- try
- {
- _message.setExpiration(System.currentTimeMillis() + 10L);
-
- _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb);
-
- assertFalse("New messages should not be expired.", _message.expired());
-
- final long MILLIS =1000000L;
- long waitTime = 20 * MILLIS;
-
- while (waitTime > 0)
- {
- try
- {
- waitLock.lock();
-
- waitTime = wait.awaitNanos(waitTime);
- }
- catch (InterruptedException e)
- {
- //Stop if we are interrupted
- fail(e.getMessage());
- }
- finally
- {
- waitLock.unlock();
- }
-
- }
-
- assertTrue("After a sleep messages should now be expired.", _message.expired());
-
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- }
- }
-
-
- public void testNoExpiry()
- {
- _message = newMessage();
-
- MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null);
- int bodySize = 0;
-
- BasicContentHeaderProperties props = new BasicContentHeaderProperties();
-
- props.setAppId("HandleTest");
-
- ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
-
- ReentrantLock waitLock = new ReentrantLock();
- Condition wait = waitLock.newCondition();
- try
- {
-
- _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb);
-
- assertFalse("New messages should not be expired.", _message.expired());
-
- final long MILLIS =1000000L;
- long waitTime = 10 * MILLIS;
-
- while (waitTime > 0)
- {
- try
- {
- waitLock.lock();
-
- waitTime = wait.awaitNanos(waitTime);
- }
- catch (InterruptedException e)
- {
- //Stop if we are interrupted
- fail(e.getMessage());
- }
- finally
- {
- waitLock.unlock();
- }
-
- }
-
- assertFalse("After a sleep messages without an expiry should not expire.", _message.expired());
-
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- }
- }
-
+
}