diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2009-04-03 18:00:24 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2009-04-03 18:00:24 +0000 |
| commit | 29f2af3ba16e9e89600cbea3535429ed932c5c9e (patch) | |
| tree | 32842290cc8a8d97a33e63716a746bd265e85025 /qpid/java | |
| parent | e714748d137694d61c789fab97bb45f3b4941bb4 (diff) | |
| download | qpid-python-29f2af3ba16e9e89600cbea3535429ed932c5c9e.tar.gz | |
QPID-1764 : Resolved ConcurrentME. Perils of using the 'syntax sugar' for loop hides the message iterator that you need to call .remove(). Calling remove on the underlying Map will cause the resulting CME.
Merged from r761700 from trunk
QPID-1764 : Updated all tests to use the TestTransactionLog interface and split testing code into subclasses. TestableTransactionLog will now correctly wrap a TransactionLog for testing. To enable testing of the BaseTransactionLog a TestableBaseTransactionLog was needed to only return values that are actually stored in the BaseTL the TestableTransactionLog actually stores single enqueues so that they can be queried by the test.
Merged from r761741 from trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5-release@761742 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
17 files changed, 414 insertions, 263 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index d57b81c362..f5819716cb 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -51,7 +51,7 @@ import java.util.concurrent.atomic.AtomicLong; */ public class MemoryMessageStore implements TransactionLog, RoutingTable { - private static final Logger _log = Logger.getLogger(MemoryMessageStore.class); + protected static final Logger _log = Logger.getLogger(MemoryMessageStore.class); private static final int DEFAULT_HASHTABLE_CAPACITY = 50000; @@ -154,13 +154,7 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable public void enqueueMessage(StoreContext context, final ArrayList<AMQQueue> queues, Long messageId) throws AMQException { - for (AMQQueue q : queues) - { - if (q.isDurable()) - { - enqueueMessage(context,q,messageId); - } - } + // Not required to do anything } public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException @@ -232,25 +226,13 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable _metaDataMap.put(messageId, messageMetaData); } - public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException - { - checkNotClosed(); - return _metaDataMap.get(messageId); - } - - public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException - { - checkNotClosed(); - List<ContentChunk> bodyList = _contentBodyMap.get(messageId); - return bodyList.get(index); - } public boolean isPersistent() { return false; } - private void checkNotClosed() throws MessageStoreClosedException + protected void checkNotClosed() throws MessageStoreClosedException { if (_closed.get()) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java index 973ecd6c09..dded7f7142 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java @@ -33,6 +33,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.Iterator; public class BaseTransactionLog implements TransactionLog { @@ -80,15 +81,18 @@ public class BaseTransactionLog implements TransactionLog Map<Long, ArrayList<AMQQueue>> messageMap = context.getDequeueMap(); //For each Message ID that is in the map check - for (Long messageID : messageMap.keySet()) + Iterator iterator = messageMap.keySet().iterator(); + + while (iterator.hasNext()) { + Long messageID = (Long) iterator.next(); //If we don't have a gloabl reference for this message then there is only a single enqueue if (_idToQueues.get(messageID) == null) { // Add the removal of the message to this transaction _delegate.removeMessage(context,messageID); // Remove this message ID as we have processed it so we don't reprocess after the main commmit - messageMap.remove(messageID); + iterator.remove(); } } } @@ -179,6 +183,15 @@ public class BaseTransactionLog implements TransactionLog } else { + //When a message is on more than one queue it is possible that this code section is exectuted + // by one thread per enqueue. + // It is however, thread safe because there is only removes being performed and so the + // last thread that does the remove will see the empty queue and remove the message + // At this stage there is nothing that is going to cause this operation to abort. So we don't + // need to worry about any potential adds. + // The message will no longer be enqueued as that operation has been committed before now so + // this is clean up of the data. + // Update the enqueued list enqueuedList.remove(queue); @@ -195,6 +208,8 @@ public class BaseTransactionLog implements TransactionLog //Commit the removes on the delegate. _delegate.commitTran(removeContext); + // Mark this context as committed. + removeContext.commitTransaction(); } finally { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java index 52b8b0ad19..e034143596 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java @@ -103,7 +103,7 @@ public class TxAckTest extends TestCase private final List<Long> _unacked; private StoreContext _storeContext = new StoreContext(); private AMQQueue _queue; - private TransactionLog _transactionLog = new TestableMemoryMessageStore(); + private TransactionLog _transactionLog = new TestableMemoryMessageStore().configure(); private static final int MESSAGE_SIZE=100; diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 89dbc4f959..6ae2324e5f 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -32,6 +32,7 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.transactionlog.TransactionLog; +import org.apache.qpid.server.transactionlog.TestableTransactionLog; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionFactory; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; @@ -43,14 +44,13 @@ import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.server.store.MemoryMessageStore; -import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.mina.common.ByteBuffer; import javax.management.JMException; import java.util.ArrayList; import java.util.LinkedList; +import java.util.List; /** * Test class to test AMQQueueMBean attribtues and operations @@ -70,7 +70,7 @@ public class AMQQueueMBeanTest extends TestCase public void testMessageCountTransient() throws Exception { int messageCount = 10; - sendMessages(messageCount, false); + List<AMQMessage> messages = sendMessages(messageCount, false); assertTrue(_queueMBean.getMessageCount() == messageCount); assertTrue(_queueMBean.getReceivedMessageCount() == messageCount); long queueDepth = (messageCount * MESSAGE_SIZE); @@ -85,13 +85,13 @@ public class AMQQueueMBeanTest extends TestCase assertTrue(_queueMBean.getReceivedMessageCount() == messageCount); //Ensure that the data has been removed from the Store - verifyBrokerState(); + verifyBrokerState(messages); } public void testMessageCountPersistent() throws Exception { int messageCount = 10; - sendMessages(messageCount, true); + List<AMQMessage> messages = sendMessages(messageCount, true); assertEquals("", messageCount, _queueMBean.getMessageCount().intValue()); assertTrue(_queueMBean.getReceivedMessageCount() == messageCount); long queueDepth = (messageCount * MESSAGE_SIZE); @@ -106,20 +106,38 @@ public class AMQQueueMBeanTest extends TestCase assertTrue(_queueMBean.getReceivedMessageCount() == messageCount); //Ensure that the data has been removed from the Store - verifyBrokerState(); + verifyBrokerState(messages); } // todo: collect to a general testing class -duplicated from Systest/MessageReturntest - private void verifyBrokerState() + private void verifyBrokerState(List<AMQMessage> messages) { - TestableMemoryMessageStore store = new TestableMemoryMessageStore(_virtualHost.getTransactionLog()); + TestableTransactionLog store = new TestableTransactionLog(_virtualHost.getTransactionLog()); - // Unlike MessageReturnTest there is no need for a delay as there this thread does the clean up. - assertNotNull("ContentBodyMap should not be null", store.getContentBodyMap()); - assertEquals("Expected the store to have no content:" + store.getContentBodyMap(), 0, store.getContentBodyMap().size()); - assertNotNull("MessageMetaDataMap should not be null", store.getMessageMetaDataMap()); - assertEquals("Expected the store to have no metadata:" + store.getMessageMetaDataMap(), 0, store.getMessageMetaDataMap().size()); + // We can only now check messageData and ConentBodyChunks by MessageID. + for (AMQMessage message : messages) + { + // Check we have no message metadata for the messages we sent + try + { + assertNull(store.getMessageMetaData(new StoreContext(), message.getMessageId())); + } + catch (AMQException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + + try + { + assertNull(store.getContentBodyChunk(new StoreContext(), message.getMessageId(),0)); + } + catch (AMQException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + + } } public void testConsumerCount() throws AMQException @@ -297,8 +315,9 @@ public class AMQQueueMBeanTest extends TestCase ApplicationRegistry.remove(1); } - private void sendMessages(int messageCount, boolean persistent) throws AMQException + private List<AMQMessage> sendMessages(int messageCount, boolean persistent) throws AMQException { + List<AMQMessage> messages = new LinkedList<AMQMessage>(); for (int i = 0; i < messageCount; i++) { IncomingMessage currentMessage = message(false, persistent); @@ -316,9 +335,10 @@ public class AMQQueueMBeanTest extends TestCase .convertToContentChunk( new ContentBody(ByteBuffer.allocate((int) MESSAGE_SIZE), MESSAGE_SIZE))); - currentMessage.deliverToQueues(); + messages.add(currentMessage.deliverToQueues()); } + return messages; } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java index 3280516b56..58073e52b6 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java @@ -30,6 +30,7 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.transactionlog.TestableTransactionLog; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; @@ -38,8 +39,6 @@ import org.apache.qpid.server.flow.Pre0_10CreditManager; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.util.NullApplicationRegistry; @@ -47,6 +46,7 @@ import org.apache.qpid.server.util.NullApplicationRegistry; import java.util.ArrayList; import java.util.LinkedList; import java.util.Set; +import java.util.List; /** * Tests that acknowledgements are handled correctly. @@ -59,7 +59,7 @@ public class AckTest extends TestCase private MockProtocolSession _protocolSession; - private TestableMemoryMessageStore _messageStore; + private TestableTransactionLog _transactionLog; private StoreContext _storeContext = new StoreContext(); @@ -75,9 +75,9 @@ public class AckTest extends TestCase ApplicationRegistry.initialise(new NullApplicationRegistry(), 1); VirtualHost vhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"); - _messageStore = new TestableMemoryMessageStore(vhost.getTransactionLog()); - _protocolSession = new MockProtocolSession(_messageStore); - _channel = new AMQChannel(_protocolSession,5, _messageStore /*dont need exchange registry*/); + _transactionLog = new TestableTransactionLog(vhost.getTransactionLog()); + _protocolSession = new MockProtocolSession(_transactionLog); + _channel = new AMQChannel(_protocolSession,5, _transactionLog /*dont need exchange registry*/); _protocolSession.addChannel(_channel); @@ -95,13 +95,13 @@ public class AckTest extends TestCase publishMessages(count, false); } - private void publishMessages(int count, boolean persistent) throws AMQException + private List<AMQMessage> publishMessages(int count, boolean persistent) throws AMQException { - TransactionalContext txnContext = new NonTransactionalContext(_messageStore, _storeContext, null, + TransactionalContext txnContext = new NonTransactionalContext(_transactionLog, _storeContext, null, new LinkedList<RequiredDeliveryException>() ); _queue.registerSubscription(_subscription,false); - MessageFactory factory = MessageFactory.getInstance(); + List<AMQMessage> sentMessages = new LinkedList<AMQMessage>(); for (int i = 1; i <= count; i++) { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) @@ -109,7 +109,7 @@ public class AckTest extends TestCase MessagePublishInfo publishBody = new MessagePublishInfoImpl(new AMQShortString("someExchange"), false, false, new AMQShortString("rk")); - IncomingMessage msg = new IncomingMessage(publishBody, txnContext,_protocolSession, _messageStore); + IncomingMessage msg = new IncomingMessage(publishBody, txnContext,_protocolSession, _transactionLog); //IncomingMessage msg2 = null; if (persistent) { @@ -130,14 +130,16 @@ public class AckTest extends TestCase ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); qs.add(_queue); msg.enqueue(qs); - msg.routingComplete(_messageStore); + msg.routingComplete(_transactionLog); if(msg.allContentReceived()) { - msg.deliverToQueues(); + sentMessages.add(msg.deliverToQueues()); } // we manually send the message to the subscription //_subscription.send(new QueueEntry(_queue,msg), _queue); } + + return sentMessages; } /** @@ -148,11 +150,16 @@ public class AckTest extends TestCase { _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true, null, false, new LimitlessCreditManager()); final int msgCount = 10; - publishMessages(msgCount, true); + List<AMQMessage> sentMessages = publishMessages(msgCount, true); UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); assertTrue(map.size() == msgCount); - assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount); + for (AMQMessage message : sentMessages) + { + List<AMQQueue> enqueuedQueues = _transactionLog.getMessageReferenceMap(message.getMessageId()); + assertNotNull("Expected message to be enqueued",enqueuedQueues); + assertEquals("Message is not enqueued on expected number of queues.",1, enqueuedQueues.size()); + } Set<Long> deliveryTagSet = map.getDeliveryTags(); int i = 1; @@ -165,7 +172,6 @@ public class AckTest extends TestCase } assertTrue(map.size() == msgCount); - assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount); } /** @@ -180,8 +186,8 @@ public class AckTest extends TestCase UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); assertTrue(map.size() == 0); - assertTrue(_messageStore.getMessageMetaDataMap().size() == 0); - assertTrue(_messageStore.getContentBodyMap().size() == 0); + assertEquals("There was more MetaData objects than expected", 0, _transactionLog.getMessageMetaDataSize()); +// assertTrue(_messageStore.getContentBodyMap().size() == 0);to be } @@ -197,8 +203,8 @@ public class AckTest extends TestCase UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); assertTrue(map.size() == 0); - assertTrue("Size:" + _messageStore.getMessageMetaDataMap().size(), _messageStore.getMessageMetaDataMap().size() == 0); - assertTrue(_messageStore.getContentBodyMap().size() == 0); + assertEquals("There was more MetaData objects than expected", 0, _transactionLog.getMessageMetaDataSize()); +// assertTrue(_messageStore.getContentBodyMap().size() == 0); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java index d007913a4f..4b4c404229 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.queue; +import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; @@ -31,10 +32,10 @@ import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.transactionlog.TestableTransactionLog; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.commons.configuration.PropertiesConfiguration; import java.util.ArrayList; import java.util.LinkedList; @@ -42,7 +43,7 @@ import java.util.List; public class PersistentMessageTest extends TransientMessageTest { - private TestableMemoryMessageStore _messageStore; + private TestableTransactionLog _transactionLog; protected SimpleAMQQueue _queue; protected AMQShortString _q1name = new AMQShortString("q1name"); @@ -54,22 +55,22 @@ public class PersistentMessageTest extends TransientMessageTest public void setUp() throws Exception { - _messageStore = new TestableMemoryMessageStore(); + _transactionLog = new TestableTransactionLog(new TestableMemoryMessageStore().configure()); _storeContext = new StoreContext(); VirtualHost vhost = new VirtualHost(new VirtualHostConfiguration(PersistentMessageTest.class.getName(), new PropertiesConfiguration()), - _messageStore); + _transactionLog); _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_q1name, false, _owner, false, vhost, null); // Create IncomingMessage and nondurable queue - _messageDeliveryContext = new NonTransactionalContext(_messageStore, new StoreContext(), null, _returnMessages); + _messageDeliveryContext = new NonTransactionalContext(_transactionLog, new StoreContext(), null, _returnMessages); } @Override protected AMQMessage newMessage() { - return MessageFactory.getInstance().createMessage(_messageStore, true); + return MessageFactory.getInstance().createMessage(_transactionLog, true); } @Override @@ -82,7 +83,7 @@ public class PersistentMessageTest extends TransientMessageTest /** * Tests the returning of a single persistent message to a queue. An immediate message is sent to the queue and * checked that it bounced. The transactionlog and returnMessasges are then checked to ensure they have the right - * contents. TransactionLog = Empty, returnMessages 1 item. + * contents. TransactionLog = Empty, returnMessages 1 item. * * @throws Exception */ @@ -98,17 +99,16 @@ public class PersistentMessageTest extends TransientMessageTest // equivalent to amqChannel.routeMessage() msg.enqueue(qs); - msg.routingComplete(_messageStore); + msg.routingComplete(_transactionLog); // equivalent to amqChannel.deliverCurrentMessageIfComplete msg.deliverToQueues(); // Check that data has been stored to disk long messageId = msg.getMessageId(); - checkMessageMetaDataExists(messageId); // Check that it was not enqueued - List<AMQQueue> queueList = _messageStore.getMessageReferenceMap(messageId); + List<AMQQueue> queueList = _transactionLog.getMessageReferenceMap(messageId); assertTrue("TransactionLog contains a queue reference for this messageID:" + messageId, queueList == null || queueList.isEmpty()); checkMessageMetaDataRemoved(messageId); @@ -118,7 +118,7 @@ public class PersistentMessageTest extends TransientMessageTest protected IncomingMessage createMessage(MessagePublishInfo info) throws AMQException { IncomingMessage msg = new IncomingMessage(info, _messageDeliveryContext, - new MockProtocolSession(_messageStore), _messageStore); + new MockProtocolSession(_transactionLog), _transactionLog); // equivalent to amqChannel.publishContenHeader ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); @@ -138,7 +138,8 @@ public class PersistentMessageTest extends TransientMessageTest { try { - _messageStore.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId); + assertNotNull("Message MetaData does not exist for message:" + messageId, + _transactionLog.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId)); } catch (AMQException amqe) { @@ -151,8 +152,8 @@ public class PersistentMessageTest extends TransientMessageTest try { assertNull("Message MetaData still exists for message:" + messageId, - _messageStore.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId)); - List ids = _messageStore.getMessageReferenceMap(messageId); + _transactionLog.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId)); + List ids = _transactionLog.getMessageReferenceMap(messageId); assertTrue("Message still has values in the reference map:" + messageId, ids == null || ids.isEmpty()); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index d5e873ebc0..4e7bad06ae 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -35,12 +35,13 @@ import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.store.TestTransactionLog; -import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.subscription.MockSubscription; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.transactionlog.TestableTransactionLog; import java.util.ArrayList; import java.util.List; @@ -50,7 +51,7 @@ public class SimpleAMQQueueTest extends TestCase protected SimpleAMQQueue _queue; protected VirtualHost _virtualHost; - protected TestableMemoryMessageStore _transactionLog = new TestableMemoryMessageStore(); + protected TestableTransactionLog _transactionLog; protected AMQShortString _qname = new AMQShortString("qname"); protected AMQShortString _owner = new AMQShortString("owner"); protected AMQShortString _routingKey = new AMQShortString("routing key"); @@ -68,6 +69,7 @@ public class SimpleAMQQueueTest extends TestCase //Create Application Registry for test ApplicationRegistry applicationRegistry = (ApplicationRegistry) ApplicationRegistry.getInstance(1); + _transactionLog = new TestableTransactionLog(new MemoryMessageStore().configure()); PropertiesConfiguration env = new PropertiesConfiguration(); _virtualHost = new VirtualHost(new VirtualHostConfiguration(getClass().getSimpleName(), env), _transactionLog); applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost); @@ -340,7 +342,9 @@ public class SimpleAMQQueueTest extends TestCase // Check that it is enqueued List<AMQQueue> data = _transactionLog.getMessageReferenceMap(messageId); - assertNotNull(data); + assertNotNull("Message has no enqueued information.", data); + assertTrue("Message is not enqueued on correct queue.", data.contains(_queue)); + assertEquals("Message not enqueued on the right queues.", 1, data.size()); // Dequeue message ContentHeaderBody header = new ContentHeaderBody(); @@ -355,7 +359,7 @@ public class SimpleAMQQueueTest extends TestCase // Check that it is dequeued data = _transactionLog.getMessageReferenceMap(messageId); - assertTrue(data == null || data.isEmpty()); + assertNull("Message still has enqueue data.", data); } public void testMessagesFlowToDisk() throws AMQException, InterruptedException @@ -509,7 +513,9 @@ public class SimpleAMQQueueTest extends TestCase //Check message was correctly enqueued List<AMQQueue> data = _transactionLog.getMessageReferenceMap(messageId); - assertNotNull(data); + assertNotNull("Message has no enqueued information.", data); + assertTrue("Message is not enqueued on correct queue.", data.contains(_queue)); + assertEquals("Message not enqueued on the right queues.", 1, data.size()); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java index abcd9855d9..3a4746eb2c 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java @@ -39,7 +39,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MockAMQQueue; import org.apache.qpid.server.queue.MockProtocolSession; -import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.registry.ApplicationRegistry; public class ACLManagerTest extends TestCase @@ -66,7 +66,7 @@ public class ACLManagerTest extends TestCase _pluginManager = new MockPluginManager(""); _authzManager = new ACLManager(_conf, _pluginManager); - _session = new MockProtocolSession(new TestableMemoryMessageStore()); + _session = new MockProtocolSession(new MemoryMessageStore().configure()); } public void tearDown() throws Exception diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java index ff1fb8c97d..251f6d45f7 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java @@ -37,7 +37,7 @@ import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.protocol.AMQMinaProtocolSession; import org.apache.qpid.server.protocol.TestIoSession; import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult; -import org.apache.qpid.server.store.TestableMemoryMessageStore; + import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; @@ -81,14 +81,12 @@ public class FirewallPluginTest extends TestCase } } - private TestableMemoryMessageStore _store; private VirtualHost _virtualHost; private AMQMinaProtocolSession _session; @Override public void setUp() throws Exception { - _store = new TestableMemoryMessageStore(); PropertiesConfiguration env = new PropertiesConfiguration(); _virtualHost = new VirtualHost(new VirtualHostConfiguration("test", env)); TestIoSession iosession = new TestIoSession(); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java index 5a4c435e59..4c03a57cc8 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java @@ -26,28 +26,24 @@ import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; -import org.apache.qpid.server.queue.MessageFactory; import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.MessageFactory; +import org.apache.qpid.server.transactionlog.TestableTransactionLog; -/** - * Tests that reference counting works correctly with AMQMessage and the message store - */ +/** Tests that reference counting works correctly with AMQMessage and the message store */ public class TestReferenceCounting extends TestCase { - private TestableMemoryMessageStore _store; + private TestableTransactionLog _store; private StoreContext _storeContext = new StoreContext(); - protected void setUp() throws Exception { super.setUp(); - _store = new TestableMemoryMessageStore(); + _store = new TestableTransactionLog(new TestableMemoryMessageStore().configure()); } - /** - * Check that when the reference count is decremented the message removes itself from the store - */ + /** Check that when the reference count is decremented the message removes itself from the store */ public void testMessageGetsRemoved() throws AMQException { ContentHeaderBody chb = createPersistentContentHeader(); @@ -57,14 +53,15 @@ public class TestReferenceCounting extends TestCase AMQMessage message = (MessageFactory.getInstance()).createMessage(_store, true); message.setPublishAndContentHeaderBody(_storeContext, info, chb); - assertEquals(1, _store.getMessageMetaDataMap().size()); + assertNotNull("Message Metadata did not exist for new message", + _store.getMessageMetaData(new StoreContext(), message.getMessageId())); } private ContentHeaderBody createPersistentContentHeader() { ContentHeaderBody chb = new ContentHeaderBody(); BasicContentHeaderProperties bchp = new BasicContentHeaderProperties(); - bchp.setDeliveryMode((byte)2); + bchp.setDeliveryMode((byte) 2); chb.properties = bchp; return chb; } @@ -77,8 +74,9 @@ public class TestReferenceCounting extends TestCase final ContentHeaderBody chb = createPersistentContentHeader(); AMQMessage message = (MessageFactory.getInstance()).createMessage(_store, true); message.setPublishAndContentHeaderBody(_storeContext, info, chb); - - assertEquals(1, _store.getMessageMetaDataMap().size()); + + assertNotNull("Message Metadata did not exist for new message", + _store.getMessageMetaData(new StoreContext(), message.getMessageId())); } public static junit.framework.Test suite() diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java index 38d139e94c..5d0fdfb727 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java @@ -21,12 +21,23 @@ package org.apache.qpid.server.store; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.transactionlog.TransactionLog; +import org.apache.qpid.server.transactionlog.BaseTransactionLog; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.AMQException; import java.util.Map; import java.util.List; public interface TestTransactionLog extends TransactionLog { + public void setBaseTransactionLog(BaseTransactionLog base); + public List<AMQQueue> getMessageReferenceMap(Long messageID); + public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException; + public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException; + public long getMessageMetaDataSize(); + public TransactionLog getDelegate(); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java index fa5cdc1aa5..2099181a76 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java @@ -20,192 +20,80 @@ */ package org.apache.qpid.server.store; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.server.configuration.VirtualHostConfiguration; -import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageMetaData; -import org.apache.qpid.server.routing.RoutingTable; import org.apache.qpid.server.transactionlog.BaseTransactionLog; +import org.apache.qpid.server.transactionlog.TestableTransactionLog; import org.apache.qpid.server.transactionlog.TransactionLog; -import org.apache.qpid.server.virtualhost.VirtualHost; -import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ConcurrentMap; /** Adds some extra methods to the memory message store for testing purposes. */ -public class TestableMemoryMessageStore implements TestTransactionLog, TransactionLog, RoutingTable +public class TestableMemoryMessageStore extends MemoryMessageStore implements TestTransactionLog { - private TransactionLog _transactionLog; - private RoutingTable _routingTable; - private MemoryMessageStore _mms; + private TestableTransactionLog _base; - public TestableMemoryMessageStore(TransactionLog log) + public void setBaseTransactionLog(BaseTransactionLog base) { - _transactionLog = log; - if (log instanceof BaseTransactionLog) + if (!(base instanceof TestableTransactionLog)) { - TransactionLog delegate = ((BaseTransactionLog) log).getDelegate(); - if (delegate instanceof RoutingTable) - { - _routingTable = (RoutingTable) delegate; - } - else - { - throw new RuntimeException("Specified BaseTransactionLog does not delegate to a RoutingTable:" + log); - } - - if (delegate instanceof MemoryMessageStore) - { - _mms = (MemoryMessageStore) delegate; - } - - } - else - { - throw new RuntimeException("Specified BaseTransactionLog is not testable:" + log); + throw new RuntimeException("base must be a TestableTransactionLog for correct operation in a TestMemoryMessageStore"); } + _base = (TestableTransactionLog) base; } - public TestableMemoryMessageStore(MemoryMessageStore mms) - { - _routingTable = mms; - _transactionLog = mms.configure(); - } - - public TestableMemoryMessageStore() - { - _mms = new MemoryMessageStore(); - _transactionLog = _mms.configure(); - _routingTable = _mms; - } - - public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap() - { - return ((MemoryMessageStore) _routingTable)._metaDataMap; - } - - public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap() - { - return ((MemoryMessageStore) _routingTable)._contentBodyMap; - } - - public List<AMQQueue> getMessageReferenceMap(Long messageId) - { -// return _mms._messageEnqueueMap.get(messageId); -// ((BaseTransactionLog)_transactionLog). - return new ArrayList<AMQQueue>(); - } - - public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception + @Override + public TransactionLog configure() { - _transactionLog = (TransactionLog) _transactionLog.configure(virtualHost, base, config); - return _transactionLog; - } + BaseTransactionLog base = (BaseTransactionLog) super.configure(); - public void close() throws Exception - { - _transactionLog.close(); - _routingTable.close(); - } + _base = new TestableTransactionLog(base.getDelegate()); - public void createExchange(Exchange exchange) throws AMQException - { - _routingTable.createExchange(exchange); + return _base; } - public void removeExchange(Exchange exchange) throws AMQException + @Override + public TransactionLog configure(String base, VirtualHostConfiguration config) { - _routingTable.removeExchange(exchange); - } - - public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException - { - _routingTable.bindQueue(exchange, routingKey, queue, args); - } - - public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException - { - _routingTable.unbindQueue(exchange, routingKey, queue, args); - } - - public void createQueue(AMQQueue queue) throws AMQException - { - _routingTable.createQueue(queue); - } - - public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException - { - _routingTable.createQueue(queue, arguments); - } - - public void removeQueue(AMQQueue queue) throws AMQException - { - _routingTable.removeQueue(queue); - } - - public void enqueueMessage(StoreContext context, ArrayList<AMQQueue> queues, Long messageId) throws AMQException - { - _transactionLog.enqueueMessage(context, queues, messageId); - } - - public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException - { - _transactionLog.dequeueMessage(context, queue, messageId); - } - - public void removeMessage(StoreContext context, Long messageId) throws AMQException - { - _transactionLog.removeMessage(context, messageId); - } - - public void beginTran(StoreContext context) throws AMQException - { - _transactionLog.beginTran(context); - } + //Only initialise when called with current 'store' configs i.e. don't reinit when used as a 'RoutingTable' + if (base.equals("store")) + { + super.configure(); - public void commitTran(StoreContext context) throws AMQException - { - _transactionLog.commitTran(context); - } + _base = new TestableTransactionLog(this); - public void abortTran(StoreContext context) throws AMQException - { - _transactionLog.abortTran(context); - } + return _base; + } - public boolean inTran(StoreContext context) - { - return _transactionLog.inTran(context); + return super.configure(); } - public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException + public List<AMQQueue> getMessageReferenceMap(Long messageId) { - _transactionLog.storeContentBodyChunk(context, messageId, index, contentBody, lastContentBody); + return _base.getMessageReferenceMap(messageId); } - public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException + public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) { - _transactionLog.storeMessageMetaData(context, messageId, messageMetaData); + return _metaDataMap.get(messageId); } - public boolean isPersistent() + public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) { - return _transactionLog.isPersistent(); + List<ContentChunk> bodyList = _contentBodyMap.get(messageId); + return bodyList.get(index); } - public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException + public long getMessageMetaDataSize() { - return _mms.getMessageMetaData(context, messageId); + return _metaDataMap.size(); } - public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException + public TransactionLog getDelegate() { - return _mms.getContentBodyChunk(context, messageId, index); + return _base; } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java index 0a2a1c2327..a0c38ff0ad 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java @@ -33,6 +33,7 @@ import org.apache.qpid.server.queue.MockAMQQueue; import org.apache.qpid.server.queue.MockContentChunk; import org.apache.qpid.server.queue.MockPersistentAMQMessage; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.store.TestTransactionLog; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.ArrayList; @@ -47,14 +48,14 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog final private Map<Long, ArrayList<ContentChunk>> _storeChunks = new HashMap<Long, ArrayList<ContentChunk>>(); final private Map<Long, MessageMetaData> _storeMetaData = new HashMap<Long, MessageMetaData>(); - TestableTransactionLog _transactionLog; + TestTransactionLog _transactionLog; private ArrayList<AMQQueue> _queues; private MockPersistentAMQMessage _message; public void setUp() throws Exception { super.setUp(); - _transactionLog = new TestableTransactionLog(this); + _transactionLog = new TestableBaseTransactionLog(this); } public void testSingleEnqueueNoTransactional() throws AMQException diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableBaseTransactionLog.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableBaseTransactionLog.java new file mode 100644 index 0000000000..92bc44da0b --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableBaseTransactionLog.java @@ -0,0 +1,129 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transactionlog; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.MessageMetaData; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.store.TestTransactionLog; +import org.apache.qpid.server.virtualhost.VirtualHost; + +import java.util.List; + +public class TestableBaseTransactionLog extends BaseTransactionLog implements TestTransactionLog +{ + + public TestableBaseTransactionLog() + { + super(null); + } + + public TestableBaseTransactionLog(TransactionLog delegate) + { + super(delegate); + if (delegate instanceof BaseTransactionLog) + { + _delegate = ((BaseTransactionLog) delegate).getDelegate(); + } + + } + + @Override + public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception + { + if (_delegate != null) + { + TransactionLog configuredLog = (TransactionLog) _delegate.configure(virtualHost, base, config); + + // Unwrap any BaseTransactionLog + if (configuredLog instanceof BaseTransactionLog) + { + _delegate = ((BaseTransactionLog) configuredLog).getDelegate(); + } + } + else + { + String delegateClass = config.getStoreConfiguration().getString("delegate"); + Class clazz = Class.forName(delegateClass); + Object o = clazz.newInstance(); + + if (!(o instanceof TransactionLog)) + { + throw new ClassCastException("TransactionLog class must implement " + TransactionLog.class + ". Class " + clazz + + " does not."); + } + _delegate = (TransactionLog) o; + + // If a TransactionLog uses the BaseTransactionLog then it will return this object. + _delegate.configure(virtualHost, base, config); + } + return this; + } + + public void setBaseTransactionLog(BaseTransactionLog base) + { + throw new RuntimeException("TestableTransactionLog is unable to swap BaseTransactionLogs"); + } + + public List<AMQQueue> getMessageReferenceMap(Long messageID) + { + return _idToQueues.get(messageID); + } + + public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException + { + if (_delegate instanceof TestTransactionLog) + { + return ((TestTransactionLog) _delegate).getMessageMetaData(context, messageId); + } + else + { + return null; + } + } + + public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException + { + if (_delegate instanceof TestTransactionLog) + { + return ((TestTransactionLog) _delegate).getContentBodyChunk(context, messageId, index); + } + else + { + return null; + } + } + + public long getMessageMetaDataSize() + { + if (_delegate instanceof TestTransactionLog) + { + return ((TestTransactionLog) _delegate).getMessageMetaDataSize(); + } + else + { + return 0; + } + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java index b0c47052b2..38e17c3a07 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java @@ -20,52 +20,95 @@ */ package org.apache.qpid.server.transactionlog; -import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.server.configuration.VirtualHostConfiguration; -import org.apache.qpid.server.store.TestTransactionLog; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.routing.RoutingTable; +import org.apache.qpid.server.queue.MessageMetaData; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.store.TestTransactionLog; +import org.apache.qpid.server.virtualhost.VirtualHost; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; -import java.util.LinkedList; +import java.util.Map; public class TestableTransactionLog extends BaseTransactionLog implements TestTransactionLog { - - List<Long> _singleEnqueues = new LinkedList<Long>(); + protected Map<Long, List<AMQQueue>> _singleEnqueuedIDstoQueue = new HashMap<Long, List<AMQQueue>>(); public TestableTransactionLog() { super(null); } - public TestableTransactionLog(BaseTransactionLog delegate) + public TestableTransactionLog(TransactionLog delegate) { - super(delegate.getDelegate()); + super(delegate); + if (delegate instanceof BaseTransactionLog) + { + _delegate = ((BaseTransactionLog) delegate).getDelegate(); + } + } - public TestableTransactionLog(TransactionLog delegate) + /** + * Override the BaseTranasactionLog to record the single enqueues of a message so we can perform references counting + * + * @param context The transactional context for the operation. + * @param queues + * @param messageId The message to enqueue. @throws AMQException If the operation fails for any reason. @throws org.apache.qpid.AMQException + * + * @throws AMQException + */ + @Override + public void enqueueMessage(StoreContext context, ArrayList<AMQQueue> queues, Long messageId) throws AMQException { - super(delegate); + if (queues.size() == 1) + { + _singleEnqueuedIDstoQueue.put(messageId, queues); + } + + super.enqueueMessage(context, queues, messageId); } + /** + * Override the BaseTranasactionLog to record the single enqueues of a message so we can perform references counting + * + * @param context The transactional context for the operation. + * @param queue + * @param messageId The message to enqueue. @throws AMQException If the operation fails for any reason. @throws org.apache.qpid.AMQException + * + * @throws AMQException + */ + @Override + public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException + { + if (_singleEnqueuedIDstoQueue.containsKey(messageId)) + { + _singleEnqueuedIDstoQueue.remove(messageId); + } + + super.dequeueMessage(context, queue, messageId); + } @Override public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception { if (_delegate != null) { - TransactionLog configuredLog = (TransactionLog)_delegate.configure(virtualHost, base, config); + TransactionLog configuredLog = (TransactionLog) _delegate.configure(virtualHost, base, config); // Unwrap any BaseTransactionLog if (configuredLog instanceof BaseTransactionLog) { - _delegate = ((BaseTransactionLog)configuredLog).getDelegate(); + _delegate = ((BaseTransactionLog) configuredLog).getDelegate(); } } else { - String delegateClass = config.getStoreConfiguration().getString("delegate"); + String delegateClass = config.getStoreConfiguration().getString("delegate"); Class clazz = Class.forName(delegateClass); Object o = clazz.newInstance(); @@ -77,13 +120,61 @@ public class TestableTransactionLog extends BaseTransactionLog implements TestTr _delegate = (TransactionLog) o; // If a TransactionLog uses the BaseTransactionLog then it will return this object. - _delegate.configure(virtualHost, base, config); + _delegate.configure(virtualHost, base, config); } return this; } + public void setBaseTransactionLog(BaseTransactionLog base) + { + throw new RuntimeException("TestableTransactionLog is unable to swap BaseTransactionLogs"); + } + public List<AMQQueue> getMessageReferenceMap(Long messageID) { - return _idToQueues.get(messageID); + List<AMQQueue> result = _idToQueues.get(messageID); + + if (result == null) + { + result = _singleEnqueuedIDstoQueue.get(messageID); + } + + return result; + } + + public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException + { + if (_delegate instanceof TestTransactionLog) + { + return ((TestTransactionLog) _delegate).getMessageMetaData(context, messageId); + } + else + { + return null; + } + } + + public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException + { + if (_delegate instanceof TestTransactionLog) + { + return ((TestTransactionLog) _delegate).getContentBodyChunk(context, messageId, index); + } + else + { + return null; + } + } + + public long getMessageMetaDataSize() + { + if (_delegate instanceof TestTransactionLog) + { + return ((TestTransactionLog) _delegate).getMessageMetaDataSize(); + } + else + { + return 0; + } } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java index cdc7eabf04..dbd05b9598 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java @@ -34,8 +34,10 @@ import org.apache.qpid.server.protocol.InternalTestProtocolSession; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.ConsumerTagNotUniqueException; import org.apache.qpid.server.transactionlog.TransactionLog; +import org.apache.qpid.server.transactionlog.TestableTransactionLog; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.store.TestTransactionLog; import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; @@ -62,7 +64,11 @@ public class InternalBrokerBaseCase extends TestCase { super.setUp(); PropertiesConfiguration configuration = new PropertiesConfiguration(); - configuration.setProperty("virtualhosts.virtualhost.test.store.class", TestableMemoryMessageStore.class.getName()); + // This configuration is not used as TestApplicationRegistry just creates a single vhost 'test' with + // TransactionLog TestableTransactionLog(TestMemoryMessageStore) + configuration.setProperty("virtualhosts.virtualhost.test.store.class", TestableTransactionLog.class.getName()); + configuration.setProperty("virtualhosts.virtualhost.test.store.delegate", TestableMemoryMessageStore.class.getName()); + _registry = new TestApplicationRegistry(new ServerConfiguration(configuration)); ApplicationRegistry.initialise(_registry); _virtualHost = _registry.getVirtualHostRegistry().getVirtualHost("test"); @@ -96,7 +102,7 @@ public class InternalBrokerBaseCase extends TestCase protected void checkStoreContents(int messageCount) { - assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _transactionLog).getMessageMetaDataMap().size()); + assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestTransactionLog) _transactionLog).getMessageMetaDataSize()); //The above publish message is sufficiently small not to fit in the header so no Body is required. //assertEquals("Message body count incorrect in the ContentBodyMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getContentBodyMap().size()); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java index 22bd3b5aab..8c2508b8f4 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java @@ -21,7 +21,6 @@ package org.apache.qpid.server.util; import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.MapConfiguration; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.configuration.VirtualHostConfiguration; @@ -38,9 +37,9 @@ import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.qpid.server.transactionlog.TransactionLog; +import org.apache.qpid.server.transactionlog.TestableTransactionLog; import java.util.Collection; -import java.util.HashMap; import java.util.Properties; import java.util.Arrays; @@ -83,7 +82,7 @@ public class TestApplicationRegistry extends ApplicationRegistry _managedObjectRegistry = new NoopManagedObjectRegistry(); - _transactionLog = new TestableMemoryMessageStore(); + _transactionLog = new TestableTransactionLog(new TestableMemoryMessageStore().configure()); _virtualHostRegistry = new VirtualHostRegistry(); |
