summaryrefslogtreecommitdiff
path: root/java/systests/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2007-12-18 23:00:40 +0000
committerRobert Godfrey <rgodfrey@apache.org>2007-12-18 23:00:40 +0000
commit8a916973a6a6209d49af5363b10d3b29af2b151f (patch)
tree6f909e235183684d703e3b4d9ff22afe53297474 /java/systests/src
parentb9857867db0ea728abb837027fc164f8d01b1191 (diff)
downloadqpid-python-8a916973a6a6209d49af5363b10d3b29af2b151f.tar.gz
QPID-711 : create a QueueEntry class and move message-on-queue functions (such as taken()) to this class
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@605352 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/systests/src')
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java5
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java5
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java10
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java8
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java8
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java8
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java24
7 files changed, 35 insertions, 33 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
index 3ee8277eba..10189a8017 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
@@ -29,6 +29,7 @@ import org.apache.qpid.framing.AMQFrameDecodingException;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
@@ -133,7 +134,7 @@ public class TxAckTest extends TestCase
};
TestMessage message = new TestMessage(deliveryTag, i, info, txnContext);
- _map.add(deliveryTag, new UnacknowledgedMessage(null, message, null, deliveryTag));
+ _map.add(deliveryTag, new UnacknowledgedMessage(new QueueEntry(null,message), null, deliveryTag));
}
_acked = acked;
_unacked = unacked;
@@ -150,7 +151,7 @@ public class TxAckTest extends TestCase
{
UnacknowledgedMessage u = _map.get(tag);
assertTrue("Message not found for tag " + tag, u != null);
- ((TestMessage) u.message).assertCountEquals(expected);
+ ((TestMessage) u.getMessage()).assertCountEquals(expected);
}
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index ff5517bdd5..58323086b5 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -27,6 +27,7 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageHandleFactory;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.SkeletonMessageStore;
@@ -250,9 +251,9 @@ public class AbstractHeadersExchangeTestBase extends TestCase
* @param deliverFirst
* @throws AMQException
*/
- public void process(StoreContext context, AMQMessage msg, boolean deliverFirst) throws AMQException
+ public void process(StoreContext context, QueueEntry msg, boolean deliverFirst) throws AMQException
{
- messages.add(new HeadersExchangeTest.Message(msg));
+ messages.add(new HeadersExchangeTest.Message(msg.getMessage()));
}
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
index be788a02da..790607e268 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
@@ -142,7 +142,7 @@ public class AckTest extends TestCase
msg.incrementReference();
msg.routingComplete(_messageStore, _storeContext, factory);
// we manually send the message to the subscription
- _subscription.send(msg, _queue);
+ _subscription.send(new QueueEntry(_queue,msg), _queue);
}
}
@@ -167,7 +167,7 @@ public class AckTest extends TestCase
assertTrue(deliveryTag == i);
i++;
UnacknowledgedMessage unackedMsg = map.get(deliveryTag);
- assertTrue(unackedMsg.queue == _queue);
+ assertTrue(unackedMsg.getQueue() == _queue);
}
assertTrue(map.size() == msgCount);
@@ -228,7 +228,7 @@ public class AckTest extends TestCase
{
assertTrue(deliveryTag == i);
UnacknowledgedMessage unackedMsg = map.get(deliveryTag);
- assertTrue(unackedMsg.queue == _queue);
+ assertTrue(unackedMsg.getQueue() == _queue);
// 5 is the delivery tag of the message that *should* be removed
if (++i == 5)
{
@@ -257,7 +257,7 @@ public class AckTest extends TestCase
{
assertTrue(deliveryTag == i + 5);
UnacknowledgedMessage unackedMsg = map.get(deliveryTag);
- assertTrue(unackedMsg.queue == _queue);
+ assertTrue(unackedMsg.getQueue() == _queue);
++i;
}
}
@@ -281,7 +281,7 @@ public class AckTest extends TestCase
{
assertTrue(deliveryTag == i + 5);
UnacknowledgedMessage unackedMsg = map.get(deliveryTag);
- assertTrue(unackedMsg.queue == _queue);
+ assertTrue(unackedMsg.getQueue() == _queue);
++i;
}
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java
index 068f37574d..282ad3ed5e 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java
@@ -42,9 +42,9 @@ public class ConcurrencyTestDisabled extends MessageTestHelper
private final List<SubscriptionTestHelper> _subscribers = new ArrayList<SubscriptionTestHelper>();
private final Set<Subscription> _active = new HashSet<Subscription>();
- private final List<AMQMessage> _messages = new ArrayList<AMQMessage>();
+ private final List<QueueEntry> _messages = new ArrayList<QueueEntry>();
private int next = 0;//index to next message to send
- private final List<AMQMessage> _received = Collections.synchronizedList(new ArrayList<AMQMessage>());
+ private final List<QueueEntry> _received = Collections.synchronizedList(new ArrayList<QueueEntry>());
private final Executor _executor = new OnCurrentThreadExecutor();
private final List<Thread> _threads = new ArrayList<Thread>();
@@ -159,7 +159,7 @@ public class ConcurrencyTestDisabled extends MessageTestHelper
}
}
- private AMQMessage nextMessage()
+ private QueueEntry nextMessage()
{
synchronized (_messages)
{
@@ -191,7 +191,7 @@ public class ConcurrencyTestDisabled extends MessageTestHelper
{
void doRun() throws Throwable
{
- AMQMessage msg = nextMessage();
+ QueueEntry msg = nextMessage();
if (msg != null)
{
_deliveryMgr.deliver(null, new AMQShortString(toString()), msg, false);
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
index dc5a6d3cf6..b33259cfba 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
@@ -40,7 +40,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper
public void testStartInQueueingMode() throws AMQException
{
- AMQMessage[] messages = new AMQMessage[10];
+ QueueEntry[] messages = new QueueEntry[10];
for (int i = 0; i < messages.length; i++)
{
messages[i] = message();
@@ -85,7 +85,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper
public void testStartInDirectMode() throws AMQException
{
- AMQMessage[] messages = new AMQMessage[10];
+ QueueEntry[] messages = new QueueEntry[10];
for (int i = 0; i < messages.length; i++)
{
messages[i] = message();
@@ -132,7 +132,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper
{
try
{
- AMQMessage msg = message(true);
+ QueueEntry msg = message(true);
_mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg, false);
msg.checkDeliveredToConsumer();
fail("expected exception did not occur");
@@ -154,7 +154,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper
SubscriptionTestHelper s = new SubscriptionTestHelper("A");
_subscriptions.addSubscriber(s);
s.setSuspended(true);
- AMQMessage msg = message(true);
+ QueueEntry msg = message(true);
_mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg, false);
msg.checkDeliveredToConsumer();
fail("expected exception did not occur");
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java b/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
index 88272023e8..812aec6a5d 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
@@ -55,12 +55,12 @@ class MessageTestHelper extends TestCase
ApplicationRegistry.initialise(new NullApplicationRegistry());
}
- AMQMessage message() throws AMQException
+ QueueEntry message() throws AMQException
{
return message(false);
}
- AMQMessage message(final boolean immediate) throws AMQException
+ QueueEntry message(final boolean immediate) throws AMQException
{
MessagePublishInfo publish = new MessagePublishInfo()
{
@@ -86,8 +86,8 @@ class MessageTestHelper extends TestCase
}
};
- return new AMQMessage(_messageStore.getNewMessageId(), publish, _txnContext,
- new ContentHeaderBody());
+ return new QueueEntry(null,new AMQMessage(_messageStore.getNewMessageId(), publish, _txnContext,
+ new ContentHeaderBody()));
}
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
index fe947ef3bc..5846ad0a9d 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
@@ -28,13 +28,13 @@ import java.util.Queue;
public class SubscriptionTestHelper implements Subscription
{
- private final List<AMQMessage> messages;
+ private final List<QueueEntry> messages;
private final Object key;
private boolean isSuspended;
public SubscriptionTestHelper(Object key)
{
- this(key, new ArrayList<AMQMessage>());
+ this(key, new ArrayList<QueueEntry>());
}
public SubscriptionTestHelper(final Object key, final boolean isSuspended)
@@ -43,18 +43,18 @@ public class SubscriptionTestHelper implements Subscription
setSuspended(isSuspended);
}
- SubscriptionTestHelper(Object key, List<AMQMessage> messages)
+ SubscriptionTestHelper(Object key, List<QueueEntry> messages)
{
this.key = key;
this.messages = messages;
}
- List<AMQMessage> getMessages()
+ List<QueueEntry> getMessages()
{
return messages;
}
- public void send(AMQMessage msg, AMQQueue queue)
+ public void send(QueueEntry msg, AMQQueue queue)
{
messages.add(msg);
}
@@ -69,12 +69,12 @@ public class SubscriptionTestHelper implements Subscription
return isSuspended;
}
- public boolean wouldSuspend(AMQMessage msg)
+ public boolean wouldSuspend(QueueEntry msg)
{
return isSuspended;
}
- public void addToResendQueue(AMQMessage msg)
+ public void addToResendQueue(QueueEntry msg)
{
//no-op
}
@@ -98,27 +98,27 @@ public class SubscriptionTestHelper implements Subscription
return false;
}
- public boolean hasInterest(AMQMessage msg)
+ public boolean hasInterest(QueueEntry msg)
{
return true;
}
- public Queue<AMQMessage> getPreDeliveryQueue()
+ public Queue<QueueEntry> getPreDeliveryQueue()
{
return null;
}
- public Queue<AMQMessage> getResendQueue()
+ public Queue<QueueEntry> getResendQueue()
{
return null;
}
- public Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages)
+ public Queue<QueueEntry> getNextQueue(Queue<QueueEntry> messages)
{
return messages;
}
- public void enqueueForPreDelivery(AMQMessage msg, boolean deliverFirst)
+ public void enqueueForPreDelivery(QueueEntry msg, boolean deliverFirst)
{
//no-op
}