summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/test')
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java4
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryRecoveryTest.java15
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java77
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java187
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java10
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java6
6 files changed, 101 insertions, 198 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
index 8293240905..1d729a82a5 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
@@ -242,9 +242,9 @@ public class TxAckTest extends TestCase
}
- public boolean incrementReference()
+ public boolean incrementReference(int count)
{
- _count++;
+ _count+=count;
return true;
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryRecoveryTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryRecoveryTest.java
index db0fc56303..a272da88ac 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryRecoveryTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryRecoveryTest.java
@@ -29,16 +29,15 @@ public class MessageFactoryRecoveryTest extends TestCase
public void setUp()
{
_factory = MessageFactory.getInstance();
-
+ _factory.reset();
}
public void test()
{
- AMQMessage message = _factory.createMessage(null, false);
-
- _factory.enableRecover();
- Long messasgeID = message.getMessageId();
+ Long messasgeID = 1L;
+ //Create initial message
+ _factory.createMessage(messasgeID, null);
try
{
@@ -67,7 +66,7 @@ public class MessageFactoryRecoveryTest extends TestCase
messasgeID += 100;
try
{
- message = _factory.createMessage(messasgeID, null);
+ AMQMessage message = _factory.createMessage(messasgeID, null);
assertEquals("Factory assigned incorrect id.", messasgeID, message.getMessageId());
}
catch (Exception re)
@@ -76,7 +75,7 @@ public class MessageFactoryRecoveryTest extends TestCase
}
// End the reovery process.
- _factory.start();
+ _factory.recoveryComplete();
//Check we cannot still create by id after ending recovery phase
try
@@ -96,7 +95,7 @@ public class MessageFactoryRecoveryTest extends TestCase
try
{
- message = _factory.createMessage(null, false);
+ AMQMessage message = _factory.createMessage(null, false);
assertEquals("Factory assigned incorrect id.", messasgeID, message.getMessageId());
}
catch (Exception re)
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java
new file mode 100644
index 0000000000..44e9851db7
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import junit.framework.TestCase;
+
+public class MessageReferenceCountingTest extends TestCase
+{
+ AMQMessage _message;
+
+ public void setUp()
+ {
+ _message = MessageFactory.getInstance().createMessage(null, false);
+ }
+
+ public void testInitialState()
+ {
+
+ assertTrue("New messages should have a reference", _message.isReferenced());
+ }
+
+ public void testIncrementReference()
+ {
+ assertTrue("Message should maintain Referenced state", _message.isReferenced());
+ assertTrue("Incrementing should be allowed ",_message.incrementReference(1));
+ assertTrue("Message should maintain Referenced state", _message.isReferenced());
+ assertTrue("Incrementing should be allowed as much as we need",_message.incrementReference(1));
+ assertTrue("Message should maintain Referenced state", _message.isReferenced());
+ assertTrue("Incrementing should be allowed as much as we need",_message.incrementReference(2));
+ assertTrue("Message should maintain Referenced state", _message.isReferenced());
+ }
+
+ public void testDecrementReference()
+ {
+ assertTrue("Message should maintain Referenced state", _message.isReferenced());
+ try
+ {
+ _message.decrementReference(null);
+ }
+ catch (MessageCleanupException e)
+ {
+ fail("Decrement should be allowed:"+e.getMessage());
+ }
+
+ assertFalse("Message should not be Referenced state", _message.isReferenced());
+
+ try
+ {
+ _message.decrementReference(null);
+ fail("Decrement should not be allowed as we should have a ref count of 0");
+ }
+ catch (MessageCleanupException e)
+ {
+ assertTrue("Incorrect exception thrown.",e.getMessage().contains("has gone below 0"));
+ }
+
+ }
+
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
index ed7b2923e7..12ff91cdad 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
@@ -20,193 +20,22 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.subscription.Subscription;
-
-public class MockQueueEntry implements QueueEntry
+public class MockQueueEntry extends QueueEntryImpl
{
+ static SimpleQueueEntryList _defaultList = new SimpleQueueEntryList(new MockAMQQueue("MockQueueEntry_DefaultQueue"));
- private AMQMessage _message;
- private boolean _redelivered;
-
- public boolean acquire()
- {
- return false;
- }
-
- public boolean acquire(Subscription sub)
- {
- return false;
- }
-
- public boolean acquiredBySubscription()
- {
- return false;
- }
-
- public void addStateChangeListener(StateChangeListener listener)
- {
-
- }
-
- public String debugIdentity()
- {
- return null;
- }
-
- public boolean delete()
- {
- return false;
- }
-
- public void dequeue(StoreContext storeContext) throws FailedDequeueException
- {
-
- }
-
- public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException
- {
-
- }
-
- public void dispose(StoreContext storeContext) throws MessageCleanupException
- {
-
- }
-
- public boolean expired() throws AMQException
- {
- return false;
- }
-
- public Subscription getDeliveredSubscription()
- {
- return null;
- }
-
- public boolean getDeliveredToConsumer()
- {
- return false;
- }
-
- public AMQMessage getMessage()
- {
- return _message;
- }
-
- public AMQQueue getQueue()
- {
- return null;
- }
-
- public long getSize()
- {
- return 0;
- }
-
- public boolean immediateAndNotDelivered()
- {
- return false;
- }
-
- public boolean isAcquired()
- {
- return false;
- }
-
- public boolean isDeleted()
- {
- return false;
- }
-
-
- public boolean isQueueDeleted()
- {
-
- return false;
- }
-
-
- public boolean isRejectedBy(Subscription subscription)
- {
-
- return false;
- }
-
-
- public void reject()
- {
-
-
- }
-
-
- public void reject(Subscription subscription)
- {
-
-
- }
-
-
- public void release()
- {
-
-
- }
-
-
- public boolean removeStateChangeListener(StateChangeListener listener)
- {
-
- return false;
- }
-
-
- public void requeue(StoreContext storeContext) throws AMQException
- {
-
-
- }
-
-
- public void setDeliveredToSubscription()
- {
-
-
- }
-
-
- public void setRedelivered(boolean redelivered)
- {
- _redelivered = redelivered;
- }
-
-
- public int compareTo(QueueEntry o)
- {
-
- return 0;
- }
-
- public void setMessage(AMQMessage msg)
- {
- _message = msg;
- }
-
- public ContentHeaderBody getContentHeaderBody() throws AMQException
+ public MockQueueEntry()
{
- return _message.getContentHeaderBody();
+ super(_defaultList);
}
- public boolean isPersistent() throws AMQException
+ public MockQueueEntry(SimpleQueueEntryList queueEntryList, AMQMessage message)
{
- return _message.isPersistent();
+ super(queueEntryList, message);
}
- public boolean isRedelivered()
+ public MockQueueEntry(AMQMessage message)
{
- return _redelivered;
+ super(_defaultList, message);
}
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index f95b1eb83e..665ca089da 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -296,7 +296,7 @@ public class SimpleAMQQueueTest extends TestCase
public void testGetLastFiveMessageIds() throws Exception
{
AMQMessage message = createMessage();
- Long messageIdOffset = message.getMessageId() -1 ;
+ Long messageIdOffset = message.getMessageId() - 1;
for (int i = 0; i < 10; i++)
{
// Put message on queue
@@ -335,7 +335,6 @@ public class SimpleAMQQueueTest extends TestCase
msg.enqueue(qs);
msg.routingComplete(_store);
-
_store.storeMessageMetaData(null, messageId, new MessageMetaData(info, contentHeaderBody, 1));
// Check that it is enqueued
@@ -343,14 +342,13 @@ public class SimpleAMQQueueTest extends TestCase
assertNotNull(data);
// Dequeue message
- MockQueueEntry entry = new MockQueueEntry();
ContentHeaderBody header = new ContentHeaderBody();
header.bodySize = MESSAGE_SIZE;
AMQMessage message = new MockPersistentAMQMessage(msg.getMessageId(), _store);
message.setPublishAndContentHeaderBody(new StoreContext(), info, header);
- entry.setMessage(message);
+ MockQueueEntry entry = new MockQueueEntry(message);
_queue.dequeue(null, entry);
// Check that it is dequeued
@@ -408,9 +406,9 @@ public class SimpleAMQQueueTest extends TestCase
_tag = getMessageId();
}
- public boolean incrementReference()
+ public boolean incrementReference(int count)
{
- _count++;
+ _count+=count;
return true;
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
index 48d69c5bad..e8acfc2fda 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
@@ -59,7 +59,7 @@ public class TestReferenceCounting extends TestCase
AMQMessage message = (MessageFactory.getInstance()).createMessage(_store, true);
message.setPublishAndContentHeaderBody(_storeContext, info, chb);
- message = message.takeReference();
+ message.incrementReference(1);
// we call routing complete to set up the handle
// message.routingComplete(_store, _storeContext, new MessageHandleFactory());
@@ -89,10 +89,10 @@ public class TestReferenceCounting extends TestCase
AMQMessage message = (MessageFactory.getInstance()).createMessage(_store, true);
message.setPublishAndContentHeaderBody(_storeContext, info, chb);
- message = message.takeReference();
+ message.incrementReference(1);
assertEquals(1, _store.getMessageMetaDataMap().size());
- message = message.takeReference();
+ message.incrementReference(1);
message.decrementReference(_storeContext);
assertEquals(1, _store.getMessageMetaDataMap().size());
}