diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2009-02-20 14:50:01 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2009-02-20 14:50:01 +0000 |
| commit | 67d5fb5c48b224efea5134c455719670398dd5eb (patch) | |
| tree | 9f8e8e77d326bf37069eeae5b15399e92e3f4545 /qpid/java/broker/src/test | |
| parent | 0db29114e114d6073494de791785df237cfda475 (diff) | |
| download | qpid-python-67d5fb5c48b224efea5134c455719670398dd5eb.tar.gz | |
QPID-1632 - Removal of reference counting and update to tests, TxAckTest was reduced in size as reference counting is now not modified until the transaction completes.
Replaced MessageReferenceCoutingTest with PersistentMessageTest and further tests wil be created when DerbyDBMessageStore is also updated.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@746260 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src/test')
11 files changed, 284 insertions, 232 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java index 1d729a82a5..52b8b0ad19 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java @@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.transactionlog.TransactionLog; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.queue.AMQMessage; @@ -37,9 +38,10 @@ import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.TransientAMQMessage; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; -import org.apache.qpid.server.store.TestMemoryMessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.store.TestTransactionLog; +import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; @@ -81,20 +83,6 @@ public class TxAckTest extends TestCase combined.stop(); } - public void testPrepare() throws AMQException - { - individual.prepare(); - multiple.prepare(); - combined.prepare(); - } - - public void testUndoPrepare() throws AMQException - { - individual.undoPrepare(); - multiple.undoPrepare(); - combined.undoPrepare(); - } - public void testCommit() throws AMQException { individual.commit(); @@ -115,12 +103,13 @@ public class TxAckTest extends TestCase private final List<Long> _unacked; private StoreContext _storeContext = new StoreContext(); private AMQQueue _queue; + private TransactionLog _transactionLog = new TestableMemoryMessageStore(); private static final int MESSAGE_SIZE=100; Scenario(int messageCount, List<Long> acked, List<Long> unacked) throws Exception { - TransactionalContext txnContext = new NonTransactionalContext(new TestMemoryMessageStore(), + TransactionalContext txnContext = new NonTransactionalContext(_transactionLog, _storeContext, null, new LinkedList<RequiredDeliveryException>() ); @@ -138,12 +127,15 @@ public class TxAckTest extends TestCase MessagePublishInfo info = new MessagePublishInfoImpl(); - AMQMessage message = new TestMessage(deliveryTag, info); + AMQMessage message = new TestMessage(deliveryTag, info, (TestTransactionLog) _transactionLog); ContentHeaderBody header = new ContentHeaderBody(); header.bodySize = MESSAGE_SIZE; message.setPublishAndContentHeaderBody(_storeContext, info, header); + + + _map.add(deliveryTag, _queue.enqueue(new StoreContext(), message)); } _acked = acked; @@ -165,25 +157,6 @@ public class TxAckTest extends TestCase } } - void prepare() throws AMQException - { - _op.consolidate(); - _op.prepare(_storeContext); - - assertCount(_acked, -1); - assertCount(_unacked, 0); - - } - - void undoPrepare() - { - _op.consolidate(); - _op.undoPrepare(); - - assertCount(_acked, 1); - assertCount(_unacked, 0); - } - void commit() { _op.consolidate(); @@ -232,30 +205,22 @@ public class TxAckTest extends TestCase private class TestMessage extends TransientAMQMessage { private final long _tag; - private int _count; + private TestTransactionLog _transactionLog; - TestMessage(long tag, MessagePublishInfo publishBody) + public TestMessage(long tag, MessagePublishInfo publishBody, TestTransactionLog transactionLog) throws AMQException { super(createMessage( publishBody)); _tag = tag; + _transactionLog = transactionLog; } - public boolean incrementReference(int count) - { - _count+=count; - return true; - } - - public void decrementReference(StoreContext context) - { - _count--; - } - void assertCountEquals(int expected) { - assertEquals("Wrong count for message with tag " + _tag, expected, _count); + List<AMQQueue> list = _transactionLog.getMessageReferenceMap(_messageId); + int actual = (list == null ? 0 : list.size()); + assertEquals("Wrong count for message with tag " + _tag, expected, actual); } } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 40b08a2e39..78cf610f28 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -324,7 +324,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase //To change body of implemented methods use File | Settings | File Templates. } - public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException + public void dequeueAndDelete(StoreContext storeContext) throws FailedDequeueException { //To change body of implemented methods use File | Settings | File Templates. } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java index 98465eda20..9f8d5f9a99 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java @@ -30,14 +30,16 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; import org.apache.qpid.server.flow.LimitlessCreditManager; import org.apache.qpid.server.flow.Pre0_10CreditManager; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.store.TestMemoryMessageStore; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.util.NullApplicationRegistry; @@ -57,7 +59,7 @@ public class AckTest extends TestCase private MockProtocolSession _protocolSession; - private TestMemoryMessageStore _messageStore; + private TestableMemoryMessageStore _messageStore; private StoreContext _storeContext = new StoreContext(); @@ -72,14 +74,15 @@ public class AckTest extends TestCase super.setUp(); ApplicationRegistry.initialise(new NullApplicationRegistry(), 1); - _messageStore = new TestMemoryMessageStore(); + VirtualHost vhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"); + _messageStore = new TestableMemoryMessageStore((MemoryMessageStore)vhost.getTransactionLog()); _protocolSession = new MockProtocolSession(_messageStore); _channel = new AMQChannel(_protocolSession,5, _messageStore /*dont need exchange registry*/); _protocolSession.addChannel(_channel); - _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"), - null); + _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("myQ"), false, new AMQShortString("guest"), + true, vhost, null); } protected void tearDown() @@ -185,7 +188,7 @@ public class AckTest extends TestCase /** * Tests that in no-ack mode no messages are retained */ - public void testPersistentNoAckMode() throws AMQException + public void testPersistentNoAckMode() throws AMQException, InterruptedException { // false arg means no acks expected _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false,null,false, new LimitlessCreditManager()); @@ -194,7 +197,7 @@ public class AckTest extends TestCase UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); assertTrue(map.size() == 0); - assertTrue(_messageStore.getMessageMetaDataMap().size() == 0); + assertTrue("Size:" + _messageStore.getMessageMetaDataMap().size(), _messageStore.getMessageMetaDataMap().size() == 0); assertTrue(_messageStore.getContentBodyMap().size() == 0); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java deleted file mode 100644 index 44e9851db7..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import junit.framework.TestCase; - -public class MessageReferenceCountingTest extends TestCase -{ - AMQMessage _message; - - public void setUp() - { - _message = MessageFactory.getInstance().createMessage(null, false); - } - - public void testInitialState() - { - - assertTrue("New messages should have a reference", _message.isReferenced()); - } - - public void testIncrementReference() - { - assertTrue("Message should maintain Referenced state", _message.isReferenced()); - assertTrue("Incrementing should be allowed ",_message.incrementReference(1)); - assertTrue("Message should maintain Referenced state", _message.isReferenced()); - assertTrue("Incrementing should be allowed as much as we need",_message.incrementReference(1)); - assertTrue("Message should maintain Referenced state", _message.isReferenced()); - assertTrue("Incrementing should be allowed as much as we need",_message.incrementReference(2)); - assertTrue("Message should maintain Referenced state", _message.isReferenced()); - } - - public void testDecrementReference() - { - assertTrue("Message should maintain Referenced state", _message.isReferenced()); - try - { - _message.decrementReference(null); - } - catch (MessageCleanupException e) - { - fail("Decrement should be allowed:"+e.getMessage()); - } - - assertFalse("Message should not be Referenced state", _message.isReferenced()); - - try - { - _message.decrementReference(null); - fail("Decrement should not be allowed as we should have a ref count of 0"); - } - catch (MessageCleanupException e) - { - assertTrue("Incorrect exception thrown.",e.getMessage().contains("has gone below 0")); - } - - } - -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java index 12ff91cdad..92235648ec 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java @@ -38,4 +38,9 @@ public class MockQueueEntry extends QueueEntryImpl { super(_defaultList, message); } + + public MockQueueEntry(AMQMessage message, SimpleAMQQueue queue) + { + super(new SimpleQueueEntryList(queue) ,message); + } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java index fdaf2c309f..4551ae5af8 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java @@ -20,18 +20,46 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; +import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl; +import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.txn.NonTransactionalContext; +import org.apache.qpid.server.txn.TransactionalContext; +import org.apache.qpid.server.virtualhost.VirtualHost; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; public class PersistentMessageTest extends TransientMessageTest { - private MemoryMessageStore _messageStore; + private TestableMemoryMessageStore _messageStore; + + protected SimpleAMQQueue _queue; + protected AMQShortString _q1name = new AMQShortString("q1name"); + protected AMQShortString _owner = new AMQShortString("owner"); + protected AMQShortString _routingKey = new AMQShortString("routing key"); + private TransactionalContext _messageDeliveryContext; + private static final long MESSAGE_SIZE = 0L; + private List<RequiredDeliveryException> _returnMessages = new LinkedList<RequiredDeliveryException>(); - public void setUp() + public void setUp() throws Exception { - _messageStore = new MemoryMessageStore(); - _messageStore.configure(); + _messageStore = new TestableMemoryMessageStore(); + _storeContext = new StoreContext(); + VirtualHost vhost = new VirtualHost(PersistentMessageTest.class.getName(), _messageStore); + _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_q1name, false, _owner, false, vhost, null); + // Create IncomingMessage and nondurable queue + _messageDeliveryContext = new NonTransactionalContext(_messageStore, new StoreContext(), null, _returnMessages); + } @Override @@ -47,4 +75,86 @@ public class PersistentMessageTest extends TransientMessageTest assertTrue(_message.isPersistent()); } + /** + * Tests the returning of a single persistent message to a queue. An immediate message is sent to the queue and + * checked that it bounced. The transactionlog and returnMessasges are then checked to ensure they have the right + * contents. TransactionLog = Empty, returnMessages 1 item. + * + * @throws Exception + */ + public void testImmediateReturnNotInLog() throws Exception + { + MessagePublishInfo info = new MessagePublishInfoImpl(null, true, false, null); + IncomingMessage msg = createMessage(info); + + // Send persistent message + ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); + qs.add(_queue); + + // equivalent to amqChannel.routeMessage() + msg.enqueue(qs); + + msg.routingComplete(_messageStore); + + // equivalent to amqChannel.deliverCurrentMessageIfComplete + msg.deliverToQueues(); + + // Check that data has been stored to disk + long messageId = msg.getMessageId(); + checkMessageMetaDataExists(messageId); + + // Check that it was not enqueued + List<AMQQueue> queueList = _messageStore.getMessageReferenceMap(messageId); + assertNull("TransactionLog contains a queue reference for this messageID:" + messageId, queueList); + checkMessageMetaDataRemoved(messageId); + + assertEquals("Return message count not correct", 1, _returnMessages.size()); + } + + protected IncomingMessage createMessage(MessagePublishInfo info) throws AMQException + { + IncomingMessage msg = new IncomingMessage(info, _messageDeliveryContext, + new MockProtocolSession(_messageStore), _messageStore); + + // equivalent to amqChannel.publishContenHeader + ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); + contentHeaderBody.classId = BasicConsumeBodyImpl.CLASS_ID; + // This message has no bodies + contentHeaderBody.bodySize = MESSAGE_SIZE; + contentHeaderBody.properties = new BasicContentHeaderProperties(); + ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2); + + msg.setContentHeaderBody(contentHeaderBody); + msg.setExpiration(); + + return msg; + } + + protected void checkMessageMetaDataExists(long messageId) + { + try + { + _messageStore.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId); + } + catch (AMQException amqe) + { + fail("Message MetaData does not exist for message:" + messageId); + } + } + + protected void checkMessageMetaDataRemoved(long messageId) + { + try + { + assertNull("Message MetaData still exists for message:" + messageId, + _messageStore.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId)); + assertNull("Message still has values in the reference map:" + messageId, + _messageStore.getMessageReferenceMap(messageId)); + + } + catch (AMQException e) + { + fail("AMQE thrown whilst trying to getMessageMetaData:" + e.getMessage()); + } + } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 665ca089da..7a97837208 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -36,10 +36,12 @@ import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.store.TestTransactionLog; import org.apache.qpid.server.subscription.MockSubscription; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.transactionlog.TransactionLog; import java.util.ArrayList; import java.util.List; @@ -319,7 +321,7 @@ public class SimpleAMQQueueTest extends TestCase { // Create IncomingMessage and nondurable queue NonTransactionalContext txnContext = new NonTransactionalContext(_store, null, null, null); - IncomingMessage msg = new IncomingMessage(info, txnContext, null, _store); + IncomingMessage msg = new IncomingMessage(info, txnContext, new MockProtocolSession(_store), _store); ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); contentHeaderBody.properties = new BasicContentHeaderProperties(); @@ -338,21 +340,22 @@ public class SimpleAMQQueueTest extends TestCase _store.storeMessageMetaData(null, messageId, new MessageMetaData(info, contentHeaderBody, 1)); // Check that it is enqueued - AMQQueue data = _store.getMessages().get(messageId); + List<AMQQueue> data = _store.getMessageReferenceMap(messageId); assertNotNull(data); // Dequeue message - ContentHeaderBody header = new ContentHeaderBody(); header.bodySize = MESSAGE_SIZE; AMQMessage message = new MockPersistentAMQMessage(msg.getMessageId(), _store); message.setPublishAndContentHeaderBody(new StoreContext(), info, header); - MockQueueEntry entry = new MockQueueEntry(message); - _queue.dequeue(null, entry); + MockQueueEntry entry = new MockQueueEntry(message, _queue); + entry.getQueueEntryList().add(message); + entry.acquire(); + entry.dequeue(null); // Check that it is dequeued - data = _store.getMessages().get(messageId); + data = _store.getMessageReferenceMap(messageId); assertNull(data); } @@ -381,7 +384,7 @@ public class SimpleAMQQueueTest extends TestCase public AMQMessage createMessage() throws AMQException { - AMQMessage message = new TestMessage(info); + AMQMessage message = new TestMessage(info, _store); ContentHeaderBody header = new ContentHeaderBody(); header.bodySize = MESSAGE_SIZE; @@ -397,29 +400,21 @@ public class SimpleAMQQueueTest extends TestCase public class TestMessage extends TransientAMQMessage { private final long _tag; - private int _count; + private TestTransactionLog _transactionLog; - TestMessage(MessagePublishInfo publishBody) + TestMessage(MessagePublishInfo publishBody, TestTransactionLog transactionLog) throws AMQException { super(SimpleAMQQueueTest.createMessage(publishBody)); _tag = getMessageId(); + _transactionLog = transactionLog; } - public boolean incrementReference(int count) - { - _count+=count; - return true; - } - - public void decrementReference(StoreContext context) - { - _count--; - } void assertCountEquals(int expected) { - assertEquals("Wrong count for message with tag " + _tag, expected, _count); + assertEquals("Wrong count for message with tag " + _tag, expected, + _transactionLog.getMessageReferenceMap(_messageId).size()); } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java index e8acfc2fda..5a4c435e59 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java @@ -34,7 +34,7 @@ import org.apache.qpid.server.queue.AMQMessage; */ public class TestReferenceCounting extends TestCase { - private TestMemoryMessageStore _store; + private TestableMemoryMessageStore _store; private StoreContext _storeContext = new StoreContext(); @@ -42,7 +42,7 @@ public class TestReferenceCounting extends TestCase protected void setUp() throws Exception { super.setUp(); - _store = new TestMemoryMessageStore(); + _store = new TestableMemoryMessageStore(); } /** @@ -54,19 +54,9 @@ public class TestReferenceCounting extends TestCase MessagePublishInfo info = new MessagePublishInfoImpl(); - final long messageId = _store.getNewMessageId(); - AMQMessage message = (MessageFactory.getInstance()).createMessage(_store, true); message.setPublishAndContentHeaderBody(_storeContext, info, chb); - message.incrementReference(1); - - // we call routing complete to set up the handle - // message.routingComplete(_store, _storeContext, new MessageHandleFactory()); - - - assertEquals(1, _store.getMessageMetaDataMap().size()); - message.decrementReference(_storeContext); assertEquals(1, _store.getMessageMetaDataMap().size()); } @@ -84,16 +74,10 @@ public class TestReferenceCounting extends TestCase MessagePublishInfo info = new MessagePublishInfoImpl(); - final Long messageId = _store.getNewMessageId(); final ContentHeaderBody chb = createPersistentContentHeader(); AMQMessage message = (MessageFactory.getInstance()).createMessage(_store, true); message.setPublishAndContentHeaderBody(_storeContext, info, chb); - message.incrementReference(1); - - assertEquals(1, _store.getMessageMetaDataMap().size()); - message.incrementReference(1); - message.decrementReference(_storeContext); assertEquals(1, _store.getMessageMetaDataMap().size()); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java index 4e48435962..bb051693c3 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java @@ -20,32 +20,12 @@ */ package org.apache.qpid.server.store; -import org.apache.qpid.server.queue.MessageMetaData; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.server.queue.AMQQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +import java.util.Map; import java.util.List; -/** - * Adds some extra methods to the memory message store for testing purposes. - */ -public class TestMemoryMessageStore extends MemoryMessageStore +public interface TestTransactionLog { - public TestMemoryMessageStore() - { - _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(); - _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(); - } - - public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap() - { - return _metaDataMap; - } - - public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap() - { - return _contentBodyMap; - } + public List<AMQQueue> getMessageReferenceMap(Long messageID); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java index 9146fe88ae..882f88b8f3 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java @@ -23,22 +23,28 @@ package org.apache.qpid.server.store; import org.apache.qpid.AMQException; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageMetaData; -import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.server.routing.RoutingTable; +import org.apache.qpid.server.transactionlog.TransactionLog; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.commons.configuration.Configuration; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.HashMap; import java.util.List; +import java.util.Map; /** * Adds some extra methods to the memory message store for testing purposes. */ -public class TestableMemoryMessageStore extends MemoryMessageStore +public class TestableMemoryMessageStore implements TestTransactionLog, TransactionLog, RoutingTable { MemoryMessageStore _mms = null; - private HashMap<Long, AMQQueue> _messages = new HashMap<Long, AMQQueue>(); public TestableMemoryMessageStore(MemoryMessageStore mms) { @@ -47,46 +53,127 @@ public class TestableMemoryMessageStore extends MemoryMessageStore public TestableMemoryMessageStore() { - _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(); - _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(); + _mms = new MemoryMessageStore(); + _mms.configure(); } public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap() { - if (_mms != null) - { - return _mms._metaDataMap; - } - else - { - return _metaDataMap; - } + return _mms._metaDataMap; } public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap() { - if (_mms != null) - { - return _mms._contentBodyMap; - } - else - { - return _contentBodyMap; - } + return _mms._contentBodyMap; } - - public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException + + public List<AMQQueue> getMessageReferenceMap(Long messageId) + { + return _mms._messageEnqueueMap.get(messageId); + } + + public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception + { + _mms.configure(virtualHost,base,config); + } + + public void close() throws Exception + { + _mms.close(); + } + + public void createExchange(Exchange exchange) throws AMQException + { + _mms.createExchange(exchange); + } + + public void removeExchange(Exchange exchange) throws AMQException + { + _mms.removeExchange(exchange); + } + + public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + { + _mms.bindQueue(exchange,routingKey,queue,args); + } + + public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + { + _mms.unbindQueue(exchange,routingKey,queue,args); + } + + public void createQueue(AMQQueue queue) throws AMQException + { + _mms.createQueue(queue); + } + + public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException + { + _mms.createQueue(queue,arguments); + } + + public void removeQueue(AMQQueue queue) throws AMQException + { + _mms.removeQueue(queue); + } + + public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException + { + _mms.removeMessage(storeContext, messageId); + } + + public void enqueueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException + { + _mms.enqueueMessage(context,queue,messageId); + } + + public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException + { + _mms.dequeueMessage(context,queue,messageId); + } + + public void beginTran(StoreContext context) throws AMQException + { + _mms.beginTran(context); + } + + public void commitTran(StoreContext context) throws AMQException + { + _mms.commitTran(context); + } + + public void abortTran(StoreContext context) throws AMQException + { + _mms.abortTran(context); + } + + public boolean inTran(StoreContext context) + { + return _mms.inTran(context); + } + + public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException + { + _mms.storeContentBodyChunk(context,messageId,index,contentBody,lastContentBody); + } + + public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException + { + _mms.storeMessageMetaData(context,messageId,messageMetaData); + } + + public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException { - getMessages().put(messageId, queue); + return _mms.getMessageMetaData(context,messageId); } - public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException + public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException { - getMessages().remove(messageId); + return _mms.getContentBodyChunk(context,messageId,index); } - public HashMap<Long, AMQQueue> getMessages() + public boolean isPersistent() { - return _messages; + return _mms.isPersistent(); } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java index ca6644d141..26802b4210 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java @@ -22,8 +22,8 @@ package org.apache.qpid.server.txn; import junit.framework.TestCase; import org.apache.qpid.AMQException; -import org.apache.qpid.server.store.TestMemoryMessageStore; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.transactionlog.TransactionLog; import java.util.LinkedList; @@ -194,7 +194,7 @@ public class TxnBufferTest extends TestCase } } - class MockStore extends TestMemoryMessageStore + class MockStore extends TestableMemoryMessageStore { final Object BEGIN = "BEGIN"; final Object ABORT = "ABORT"; |
