diff options
Diffstat (limited to 'qpid/java/broker/src/test')
6 files changed, 101 insertions, 198 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java index 8293240905..1d729a82a5 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java @@ -242,9 +242,9 @@ public class TxAckTest extends TestCase } - public boolean incrementReference() + public boolean incrementReference(int count) { - _count++; + _count+=count; return true; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryRecoveryTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryRecoveryTest.java index db0fc56303..a272da88ac 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryRecoveryTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryRecoveryTest.java @@ -29,16 +29,15 @@ public class MessageFactoryRecoveryTest extends TestCase public void setUp() { _factory = MessageFactory.getInstance(); - + _factory.reset(); } public void test() { - AMQMessage message = _factory.createMessage(null, false); - - _factory.enableRecover(); - Long messasgeID = message.getMessageId(); + Long messasgeID = 1L; + //Create initial message + _factory.createMessage(messasgeID, null); try { @@ -67,7 +66,7 @@ public class MessageFactoryRecoveryTest extends TestCase messasgeID += 100; try { - message = _factory.createMessage(messasgeID, null); + AMQMessage message = _factory.createMessage(messasgeID, null); assertEquals("Factory assigned incorrect id.", messasgeID, message.getMessageId()); } catch (Exception re) @@ -76,7 +75,7 @@ public class MessageFactoryRecoveryTest extends TestCase } // End the reovery process. - _factory.start(); + _factory.recoveryComplete(); //Check we cannot still create by id after ending recovery phase try @@ -96,7 +95,7 @@ public class MessageFactoryRecoveryTest extends TestCase try { - message = _factory.createMessage(null, false); + AMQMessage message = _factory.createMessage(null, false); assertEquals("Factory assigned incorrect id.", messasgeID, message.getMessageId()); } catch (Exception re) diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java new file mode 100644 index 0000000000..44e9851db7 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java @@ -0,0 +1,77 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import junit.framework.TestCase; + +public class MessageReferenceCountingTest extends TestCase +{ + AMQMessage _message; + + public void setUp() + { + _message = MessageFactory.getInstance().createMessage(null, false); + } + + public void testInitialState() + { + + assertTrue("New messages should have a reference", _message.isReferenced()); + } + + public void testIncrementReference() + { + assertTrue("Message should maintain Referenced state", _message.isReferenced()); + assertTrue("Incrementing should be allowed ",_message.incrementReference(1)); + assertTrue("Message should maintain Referenced state", _message.isReferenced()); + assertTrue("Incrementing should be allowed as much as we need",_message.incrementReference(1)); + assertTrue("Message should maintain Referenced state", _message.isReferenced()); + assertTrue("Incrementing should be allowed as much as we need",_message.incrementReference(2)); + assertTrue("Message should maintain Referenced state", _message.isReferenced()); + } + + public void testDecrementReference() + { + assertTrue("Message should maintain Referenced state", _message.isReferenced()); + try + { + _message.decrementReference(null); + } + catch (MessageCleanupException e) + { + fail("Decrement should be allowed:"+e.getMessage()); + } + + assertFalse("Message should not be Referenced state", _message.isReferenced()); + + try + { + _message.decrementReference(null); + fail("Decrement should not be allowed as we should have a ref count of 0"); + } + catch (MessageCleanupException e) + { + assertTrue("Incorrect exception thrown.",e.getMessage().contains("has gone below 0")); + } + + } + +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java index ed7b2923e7..12ff91cdad 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java @@ -20,193 +20,22 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.server.subscription.Subscription; - -public class MockQueueEntry implements QueueEntry +public class MockQueueEntry extends QueueEntryImpl { + static SimpleQueueEntryList _defaultList = new SimpleQueueEntryList(new MockAMQQueue("MockQueueEntry_DefaultQueue")); - private AMQMessage _message; - private boolean _redelivered; - - public boolean acquire() - { - return false; - } - - public boolean acquire(Subscription sub) - { - return false; - } - - public boolean acquiredBySubscription() - { - return false; - } - - public void addStateChangeListener(StateChangeListener listener) - { - - } - - public String debugIdentity() - { - return null; - } - - public boolean delete() - { - return false; - } - - public void dequeue(StoreContext storeContext) throws FailedDequeueException - { - - } - - public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException - { - - } - - public void dispose(StoreContext storeContext) throws MessageCleanupException - { - - } - - public boolean expired() throws AMQException - { - return false; - } - - public Subscription getDeliveredSubscription() - { - return null; - } - - public boolean getDeliveredToConsumer() - { - return false; - } - - public AMQMessage getMessage() - { - return _message; - } - - public AMQQueue getQueue() - { - return null; - } - - public long getSize() - { - return 0; - } - - public boolean immediateAndNotDelivered() - { - return false; - } - - public boolean isAcquired() - { - return false; - } - - public boolean isDeleted() - { - return false; - } - - - public boolean isQueueDeleted() - { - - return false; - } - - - public boolean isRejectedBy(Subscription subscription) - { - - return false; - } - - - public void reject() - { - - - } - - - public void reject(Subscription subscription) - { - - - } - - - public void release() - { - - - } - - - public boolean removeStateChangeListener(StateChangeListener listener) - { - - return false; - } - - - public void requeue(StoreContext storeContext) throws AMQException - { - - - } - - - public void setDeliveredToSubscription() - { - - - } - - - public void setRedelivered(boolean redelivered) - { - _redelivered = redelivered; - } - - - public int compareTo(QueueEntry o) - { - - return 0; - } - - public void setMessage(AMQMessage msg) - { - _message = msg; - } - - public ContentHeaderBody getContentHeaderBody() throws AMQException + public MockQueueEntry() { - return _message.getContentHeaderBody(); + super(_defaultList); } - public boolean isPersistent() throws AMQException + public MockQueueEntry(SimpleQueueEntryList queueEntryList, AMQMessage message) { - return _message.isPersistent(); + super(queueEntryList, message); } - public boolean isRedelivered() + public MockQueueEntry(AMQMessage message) { - return _redelivered; + super(_defaultList, message); } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index f95b1eb83e..665ca089da 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -296,7 +296,7 @@ public class SimpleAMQQueueTest extends TestCase public void testGetLastFiveMessageIds() throws Exception { AMQMessage message = createMessage(); - Long messageIdOffset = message.getMessageId() -1 ; + Long messageIdOffset = message.getMessageId() - 1; for (int i = 0; i < 10; i++) { // Put message on queue @@ -335,7 +335,6 @@ public class SimpleAMQQueueTest extends TestCase msg.enqueue(qs); msg.routingComplete(_store); - _store.storeMessageMetaData(null, messageId, new MessageMetaData(info, contentHeaderBody, 1)); // Check that it is enqueued @@ -343,14 +342,13 @@ public class SimpleAMQQueueTest extends TestCase assertNotNull(data); // Dequeue message - MockQueueEntry entry = new MockQueueEntry(); ContentHeaderBody header = new ContentHeaderBody(); header.bodySize = MESSAGE_SIZE; AMQMessage message = new MockPersistentAMQMessage(msg.getMessageId(), _store); message.setPublishAndContentHeaderBody(new StoreContext(), info, header); - entry.setMessage(message); + MockQueueEntry entry = new MockQueueEntry(message); _queue.dequeue(null, entry); // Check that it is dequeued @@ -408,9 +406,9 @@ public class SimpleAMQQueueTest extends TestCase _tag = getMessageId(); } - public boolean incrementReference() + public boolean incrementReference(int count) { - _count++; + _count+=count; return true; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java index 48d69c5bad..e8acfc2fda 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java @@ -59,7 +59,7 @@ public class TestReferenceCounting extends TestCase AMQMessage message = (MessageFactory.getInstance()).createMessage(_store, true); message.setPublishAndContentHeaderBody(_storeContext, info, chb); - message = message.takeReference(); + message.incrementReference(1); // we call routing complete to set up the handle // message.routingComplete(_store, _storeContext, new MessageHandleFactory()); @@ -89,10 +89,10 @@ public class TestReferenceCounting extends TestCase AMQMessage message = (MessageFactory.getInstance()).createMessage(_store, true); message.setPublishAndContentHeaderBody(_storeContext, info, chb); - message = message.takeReference(); + message.incrementReference(1); assertEquals(1, _store.getMessageMetaDataMap().size()); - message = message.takeReference(); + message.incrementReference(1); message.decrementReference(_storeContext); assertEquals(1, _store.getMessageMetaDataMap().size()); } |
