diff options
Diffstat (limited to 'qpid/java/broker/src/test')
10 files changed, 636 insertions, 64 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java index d7844730d1..eb9c8653af 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java @@ -26,6 +26,7 @@ import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.txn.NonTransactionalContext; +import org.apache.qpid.server.store.StoreContext; import java.util.ArrayList; @@ -111,7 +112,7 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest arguments.put(AMQQueueFactory.X_QPID_PRIORITIES, PRIORITIES); // Create IncomingMessage and nondurable queue - NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null); + NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, new StoreContext(), null, null); //Create a priorityQueue _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testMessagesFlowToDiskWithPriority"), false, _owner, false, _virtualHost, arguments); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 3d189ae6c5..89dbc4f959 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -113,7 +113,7 @@ public class AMQQueueMBeanTest extends TestCase private void verifyBrokerState() { - TestableMemoryMessageStore store = new TestableMemoryMessageStore((MemoryMessageStore) _virtualHost.getTransactionLog()); + TestableMemoryMessageStore store = new TestableMemoryMessageStore(_virtualHost.getTransactionLog()); // Unlike MessageReturnTest there is no need for a delay as there this thread does the clean up. assertNotNull("ContentBodyMap should not be null", store.getContentBodyMap()); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java index 9f8d5f9a99..3280516b56 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java @@ -75,7 +75,7 @@ public class AckTest extends TestCase ApplicationRegistry.initialise(new NullApplicationRegistry(), 1); VirtualHost vhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"); - _messageStore = new TestableMemoryMessageStore((MemoryMessageStore)vhost.getTransactionLog()); + _messageStore = new TestableMemoryMessageStore(vhost.getTransactionLog()); _protocolSession = new MockProtocolSession(_messageStore); _channel = new AMQChannel(_protocolSession,5, _messageStore /*dont need exchange registry*/); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java index 7a944a5399..d007913a4f 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java @@ -109,7 +109,7 @@ public class PersistentMessageTest extends TransientMessageTest // Check that it was not enqueued List<AMQQueue> queueList = _messageStore.getMessageReferenceMap(messageId); - assertNull("TransactionLog contains a queue reference for this messageID:" + messageId, queueList); + assertTrue("TransactionLog contains a queue reference for this messageID:" + messageId, queueList == null || queueList.isEmpty()); checkMessageMetaDataRemoved(messageId); assertEquals("Return message count not correct", 1, _returnMessages.size()); @@ -152,8 +152,8 @@ public class PersistentMessageTest extends TransientMessageTest { assertNull("Message MetaData still exists for message:" + messageId, _messageStore.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId)); - assertNull("Message still has values in the reference map:" + messageId, - _messageStore.getMessageReferenceMap(messageId)); + List ids = _messageStore.getMessageReferenceMap(messageId); + assertTrue("Message still has values in the reference map:" + messageId, ids == null || ids.isEmpty()); } catch (AMQException e) diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index f39dfe765e..d4b1de29b2 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -319,7 +319,7 @@ public class SimpleAMQQueueTest extends TestCase public void testEnqueueDequeueOfPersistentMessageToNonDurableQueue() throws AMQException { // Create IncomingMessage and nondurable queue - NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null); + NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, new StoreContext(), null, null); IncomingMessage msg = new IncomingMessage(info, txnContext, new MockProtocolSession(_transactionLog), _transactionLog); ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); @@ -351,17 +351,17 @@ public class SimpleAMQQueueTest extends TestCase MockQueueEntry entry = new MockQueueEntry(message, _queue); entry.getQueueEntryList().add(message); entry.acquire(); - entry.dequeue(null); + entry.dequeue(new StoreContext()); // Check that it is dequeued data = _transactionLog.getMessageReferenceMap(messageId); - assertNull(data); + assertTrue(data == null || data.isEmpty()); } public void testMessagesFlowToDisk() throws AMQException, InterruptedException { // Create IncomingMessage and nondurable queue - NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null); + NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, new StoreContext(), null, null); MESSAGE_SIZE = 1; long MEMORY_MAX = 500; @@ -431,7 +431,7 @@ public class SimpleAMQQueueTest extends TestCase public void testMessagesFlowToDiskPurger() throws AMQException, InterruptedException { // Create IncomingMessage and nondurable queue - NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null); + NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, new StoreContext(), null, null); MESSAGE_SIZE = 1; /** Set to larger than the purge batch size. Default 100. diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java index 0a30d855b3..d6e658958e 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java @@ -34,6 +34,7 @@ import org.apache.qpid.server.transactionlog.TransactionLog; import org.apache.qpid.server.routing.RoutingTable; import java.util.List; +import java.util.ArrayList; import java.util.concurrent.atomic.AtomicLong; /** @@ -48,9 +49,9 @@ public class SkeletonMessageStore implements TransactionLog , RoutingTable { } - public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception + public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception { - //To change body of implemented methods use File | Settings | File Templates. + return this; } public void close() throws Exception @@ -146,7 +147,7 @@ public class SkeletonMessageStore implements TransactionLog , RoutingTable } - public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException + public void enqueueMessage(StoreContext context, final ArrayList<AMQQueue> queues, Long messageId) throws AMQException { } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java index 456e816a52..fa5cdc1aa5 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java @@ -21,155 +21,191 @@ package org.apache.qpid.server.store; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; +import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.routing.RoutingTable; +import org.apache.qpid.server.transactionlog.BaseTransactionLog; import org.apache.qpid.server.transactionlog.TransactionLog; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.configuration.VirtualHostConfiguration; -import org.apache.qpid.framing.abstraction.ContentChunk; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.commons.configuration.Configuration; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.HashMap; +import java.util.ArrayList; import java.util.List; -import java.util.Map; +import java.util.concurrent.ConcurrentMap; -/** - * Adds some extra methods to the memory message store for testing purposes. - */ +/** Adds some extra methods to the memory message store for testing purposes. */ public class TestableMemoryMessageStore implements TestTransactionLog, TransactionLog, RoutingTable { + private TransactionLog _transactionLog; + private RoutingTable _routingTable; + private MemoryMessageStore _mms; + + public TestableMemoryMessageStore(TransactionLog log) + { + _transactionLog = log; + if (log instanceof BaseTransactionLog) + { + TransactionLog delegate = ((BaseTransactionLog) log).getDelegate(); + if (delegate instanceof RoutingTable) + { + _routingTable = (RoutingTable) delegate; + } + else + { + throw new RuntimeException("Specified BaseTransactionLog does not delegate to a RoutingTable:" + log); + } + + if (delegate instanceof MemoryMessageStore) + { + _mms = (MemoryMessageStore) delegate; + } + + } + else + { + throw new RuntimeException("Specified BaseTransactionLog is not testable:" + log); + } - MemoryMessageStore _mms = null; + } public TestableMemoryMessageStore(MemoryMessageStore mms) { - _mms = mms; + _routingTable = mms; + _transactionLog = mms.configure(); } public TestableMemoryMessageStore() { _mms = new MemoryMessageStore(); - _mms.configure(); + _transactionLog = _mms.configure(); + _routingTable = _mms; } public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap() { - return _mms._metaDataMap; + return ((MemoryMessageStore) _routingTable)._metaDataMap; } public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap() { - return _mms._contentBodyMap; + return ((MemoryMessageStore) _routingTable)._contentBodyMap; } public List<AMQQueue> getMessageReferenceMap(Long messageId) { - return _mms._messageEnqueueMap.get(messageId); +// return _mms._messageEnqueueMap.get(messageId); +// ((BaseTransactionLog)_transactionLog). + return new ArrayList<AMQQueue>(); } - public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception + public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception { - _mms.configure(virtualHost,base,config); + _transactionLog = (TransactionLog) _transactionLog.configure(virtualHost, base, config); + return _transactionLog; } public void close() throws Exception { - _mms.close(); + _transactionLog.close(); + _routingTable.close(); } public void createExchange(Exchange exchange) throws AMQException { - _mms.createExchange(exchange); + _routingTable.createExchange(exchange); } public void removeExchange(Exchange exchange) throws AMQException { - _mms.removeExchange(exchange); + _routingTable.removeExchange(exchange); } public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException { - _mms.bindQueue(exchange,routingKey,queue,args); + _routingTable.bindQueue(exchange, routingKey, queue, args); } public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException { - _mms.unbindQueue(exchange,routingKey,queue,args); + _routingTable.unbindQueue(exchange, routingKey, queue, args); } public void createQueue(AMQQueue queue) throws AMQException { - _mms.createQueue(queue); + _routingTable.createQueue(queue); } public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException { - _mms.createQueue(queue,arguments); + _routingTable.createQueue(queue, arguments); } public void removeQueue(AMQQueue queue) throws AMQException { - _mms.removeQueue(queue); + _routingTable.removeQueue(queue); } - public void enqueueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException + public void enqueueMessage(StoreContext context, ArrayList<AMQQueue> queues, Long messageId) throws AMQException { - _mms.enqueueMessage(context,queue,messageId); + _transactionLog.enqueueMessage(context, queues, messageId); } public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException { - _mms.dequeueMessage(context,queue,messageId); + _transactionLog.dequeueMessage(context, queue, messageId); + } + + public void removeMessage(StoreContext context, Long messageId) throws AMQException + { + _transactionLog.removeMessage(context, messageId); } public void beginTran(StoreContext context) throws AMQException { - _mms.beginTran(context); + _transactionLog.beginTran(context); } public void commitTran(StoreContext context) throws AMQException { - _mms.commitTran(context); + _transactionLog.commitTran(context); } public void abortTran(StoreContext context) throws AMQException { - _mms.abortTran(context); + _transactionLog.abortTran(context); } public boolean inTran(StoreContext context) { - return _mms.inTran(context); + return _transactionLog.inTran(context); } public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException { - _mms.storeContentBodyChunk(context,messageId,index,contentBody,lastContentBody); + _transactionLog.storeContentBodyChunk(context, messageId, index, contentBody, lastContentBody); } public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException { - _mms.storeMessageMetaData(context,messageId,messageMetaData); + _transactionLog.storeMessageMetaData(context, messageId, messageMetaData); } - public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException + public boolean isPersistent() { - return _mms.getMessageMetaData(context,messageId); + return _transactionLog.isPersistent(); } - public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException + public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException { - return _mms.getContentBodyChunk(context,messageId,index); + return _mms.getMessageMetaData(context, messageId); } - public boolean isPersistent() + public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException { - return _mms.isPersistent(); + return _mms.getContentBodyChunk(context, messageId, index); } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java new file mode 100644 index 0000000000..d3294d4c10 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java @@ -0,0 +1,535 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transactionlog; + +import junit.framework.TestCase; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.MessageMetaData; +import org.apache.qpid.server.queue.MockAMQQueue; +import org.apache.qpid.server.queue.MockContentChunk; +import org.apache.qpid.server.queue.MockPersistentAMQMessage; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.virtualhost.VirtualHost; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +public class BaseTransactionLogTest extends TestCase implements TransactionLog +{ + private boolean _inTransaction; + final private Map<Long, ArrayList<AMQQueue>> _enqueues = new HashMap<Long, ArrayList<AMQQueue>>(); + final private Map<Long, ArrayList<ContentChunk>> _storeChunks = new HashMap<Long, ArrayList<ContentChunk>>(); + final private Map<Long, MessageMetaData> _storeMetaData = new HashMap<Long, MessageMetaData>(); + + BaseTransactionLog _transactionLog; + private ArrayList<AMQQueue> _queues; + private MockPersistentAMQMessage _message; + + public void setUp() throws Exception + { + super.setUp(); + _transactionLog = new BaseTransactionLog(this); + } + + public void testSingleEnqueueNoTransactional() throws AMQException + { + //Store Data + + _message = new MockPersistentAMQMessage(1L, this); + + _message.addContentBodyFrame(new StoreContext(), new MockContentChunk(100), true); + + MessagePublishInfo mpi = new MessagePublishInfoImpl(); + + ContentHeaderBody chb = new ContentHeaderBody(); + + _message.setPublishAndContentHeaderBody(new StoreContext(), mpi, chb); + + verifyMessageStored(_message.getMessageId()); + // Enqueue + + _queues = new ArrayList<AMQQueue>(); + MockAMQQueue queue = new MockAMQQueue(this.getName()); + _queues.add(queue); + + _transactionLog.enqueueMessage(new StoreContext(), _queues, _message.getMessageId()); + + verifyEnqueuedOnQueues(_message.getMessageId(), _queues); + } + + public void testSingleDequeueNoTransaction() throws AMQException + { + // Enqueue a message to dequeue + testSingleEnqueueNoTransactional(); + + _transactionLog.dequeueMessage(new StoreContext(),_queues.get(0), _message.getMessageId()); + + assertNull("Message enqueued", _enqueues.get(_message.getMessageId())); + assertNull("Message enqueued", _storeChunks.get(_message.getMessageId())); + assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId())); + } + + public void testSingleEnqueueTransactional() throws AMQException + { + StoreContext context = new StoreContext(); + + _transactionLog.beginTran(context); + + //Store Data + _message = new MockPersistentAMQMessage(1L, this); + + _message.addContentBodyFrame(context, new MockContentChunk(100), true); + + MessagePublishInfo mpi = new MessagePublishInfoImpl(); + + ContentHeaderBody chb = new ContentHeaderBody(); + + _message.setPublishAndContentHeaderBody(context, mpi, chb); + + _transactionLog.commitTran(context); + + verifyMessageStored(_message.getMessageId()); + + // Enqueue + _transactionLog.beginTran(context); + + _queues = new ArrayList<AMQQueue>(); + MockAMQQueue queue = new MockAMQQueue(this.getName()); + _queues.add(queue); + + _transactionLog.enqueueMessage(context, _queues, _message.getMessageId()); + + _transactionLog.commitTran(context); + verifyEnqueuedOnQueues(_message.getMessageId(), _queues); + } + + public void testSingleDequeueTransaction() throws AMQException + { + // Enqueue a message to dequeue + testSingleEnqueueTransactional(); + + StoreContext context = new StoreContext(); + + _transactionLog.beginTran(context); + + _transactionLog.dequeueMessage(context,_queues.get(0), _message.getMessageId()); + + _transactionLog.commitTran(context); + + assertNull("Message enqueued", _enqueues.get(_message.getMessageId())); + assertNull("Message enqueued", _storeChunks.get(_message.getMessageId())); + assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId())); + } + + + public void testMultipleEnqueueNoTransactional() throws AMQException + { + //Store Data + + _message = new MockPersistentAMQMessage(1L, this); + + _message.addContentBodyFrame(new StoreContext(), new MockContentChunk(100), true); + + MessagePublishInfo mpi = new MessagePublishInfoImpl(); + + ContentHeaderBody chb = new ContentHeaderBody(); + + _message.setPublishAndContentHeaderBody(new StoreContext(), mpi, chb); + + verifyMessageStored(_message.getMessageId()); + // Enqueue + + _queues = new ArrayList<AMQQueue>(); + + MockAMQQueue queue = new MockAMQQueue(this.getName()); + _queues.add(queue); + + queue = new MockAMQQueue(this.getName() + "2"); + _queues.add(queue); + + queue = new MockAMQQueue(this.getName() + "3"); + _queues.add(queue); + + _transactionLog.enqueueMessage(new StoreContext(), _queues, _message.getMessageId()); + + verifyEnqueuedOnQueues(_message.getMessageId(), _queues); + } + + public void testMultipleDequeueNoTransaction() throws AMQException + { + // Enqueue a message to dequeue + testMultipleEnqueueNoTransactional(); + + _transactionLog.dequeueMessage(new StoreContext(),_queues.get(0), _message.getMessageId()); + + ArrayList<AMQQueue> enqueued = _enqueues.get(_message.getMessageId()); + assertNotNull("Message not enqueued", enqueued); + assertFalse("Message still enqueued on the first queue,",enqueued.contains(_queues.get(0))); + assertEquals("Message should still be enqueued on 2 queues", 2, enqueued.size()); + + assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId())); + assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId())); + + + _transactionLog.dequeueMessage(new StoreContext(),_queues.get(1), _message.getMessageId()); + + enqueued = _enqueues.get(_message.getMessageId()); + assertNotNull("Message not enqueued", enqueued); + assertFalse("Message still enqueued on the second queue,",enqueued.contains(_queues.get(1))); + assertEquals("Message should still be enqueued on 2 queues", 1, enqueued.size()); + + assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId())); + assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId())); + + _transactionLog.dequeueMessage(new StoreContext(),_queues.get(2), _message.getMessageId()); + + assertNull("Message enqueued", _enqueues.get(_message.getMessageId())); + assertNull("Message enqueued", _storeChunks.get(_message.getMessageId())); + assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId())); + } + + + public void testMultipleEnqueueTransactional() throws AMQException + { + StoreContext context = new StoreContext(); + + _transactionLog.beginTran(context); + + //Store Data + _message = new MockPersistentAMQMessage(1L, this); + + _message.addContentBodyFrame(context, new MockContentChunk(100), true); + + MessagePublishInfo mpi = new MessagePublishInfoImpl(); + + ContentHeaderBody chb = new ContentHeaderBody(); + + _message.setPublishAndContentHeaderBody(context, mpi, chb); + + _transactionLog.commitTran(context); + + verifyMessageStored(_message.getMessageId()); + + // Enqueue + _transactionLog.beginTran(context); + + _queues = new ArrayList<AMQQueue>(); + MockAMQQueue queue = new MockAMQQueue(this.getName()); + _queues.add(queue); + + queue = new MockAMQQueue(this.getName() + "2"); + _queues.add(queue); + + queue = new MockAMQQueue(this.getName() + "3"); + _queues.add(queue); + + _transactionLog.enqueueMessage(context, _queues, _message.getMessageId()); + + _transactionLog.commitTran(context); + verifyEnqueuedOnQueues(_message.getMessageId(), _queues); + } + + public void testMultipleDequeueMultipleTransactions() throws AMQException + { + // Enqueue a message to dequeue + testMultipleEnqueueTransactional(); + + StoreContext context = new StoreContext(); + + _transactionLog.beginTran(context); + + _transactionLog.dequeueMessage(context, _queues.get(0), _message.getMessageId()); + + _transactionLog.commitTran(context); + ArrayList<AMQQueue> enqueued = _enqueues.get(_message.getMessageId()); + assertNotNull("Message not enqueued", enqueued); + assertFalse("Message still enqueued on the first queue,", enqueued.contains(_queues.get(0))); + assertEquals("Message should still be enqueued on 2 queues", 2, enqueued.size()); + + assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId())); + assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId())); + + _transactionLog.beginTran(context); + + _transactionLog.dequeueMessage(context, _queues.get(1), _message.getMessageId()); + + _transactionLog.commitTran(context); + + enqueued = _enqueues.get(_message.getMessageId()); + assertNotNull("Message not enqueued", enqueued); + assertFalse("Message still enqueued on the second queue,", enqueued.contains(_queues.get(1))); + assertEquals("Message should still be enqueued on 2 queues", 1, enqueued.size()); + + assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId())); + assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId())); + + _transactionLog.beginTran(context); + + _transactionLog.dequeueMessage(context, _queues.get(2), _message.getMessageId()); + + _transactionLog.commitTran(context); + + assertNull("Message enqueued", _enqueues.get(_message.getMessageId())); + assertNull("Message enqueued", _storeChunks.get(_message.getMessageId())); + assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId())); + } + + public void testMultipleDequeueSingleTransaction() throws AMQException + { + // Enqueue a message to dequeue + testMultipleEnqueueTransactional(); + + StoreContext context = new StoreContext(); + + _transactionLog.beginTran(context); + + _transactionLog.dequeueMessage(context, _queues.get(0), _message.getMessageId()); + + ArrayList<AMQQueue> enqueued = _enqueues.get(_message.getMessageId()); + assertNotNull("Message not enqueued", enqueued); + assertFalse("Message still enqueued on the first queue,", enqueued.contains(_queues.get(0))); + assertEquals("Message should still be enqueued on 2 queues", 2, enqueued.size()); + + assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId())); + assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId())); + + + _transactionLog.dequeueMessage(context, _queues.get(1), _message.getMessageId()); + + + enqueued = _enqueues.get(_message.getMessageId()); + assertNotNull("Message not enqueued", enqueued); + assertFalse("Message still enqueued on the second queue,", enqueued.contains(_queues.get(1))); + assertEquals("Message should still be enqueued on 2 queues", 1, enqueued.size()); + + assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId())); + assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId())); + + + _transactionLog.dequeueMessage(context, _queues.get(2), _message.getMessageId()); + + _transactionLog.commitTran(context); + + assertNull("Message enqueued", _enqueues.get(_message.getMessageId())); + assertNull("Message enqueued", _storeChunks.get(_message.getMessageId())); + assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId())); + } + + private void verifyMessageStored(Long messageId) + { + assertTrue("MessageMD has not been stored", _storeMetaData.containsKey(messageId)); + assertTrue("Messasge Chunk has not been stored", _storeChunks.containsKey(messageId)); + } + + private void verifyEnqueuedOnQueues(Long messageId, ArrayList<AMQQueue> queues) + { + ArrayList<AMQQueue> enqueues = _enqueues.get(messageId); + + assertNotNull("Message not enqueued", enqueues); + assertEquals("Message is not enqueued on the right number of queues", queues.size(), enqueues.size()); + for (AMQQueue queue : queues) + { + assertTrue("Message not enqueued on:" + queue, enqueues.contains(queue)); + } + } + + /*************************** TransactionLog ******************************* + * + * Simple InMemory TransactionLog that actually records enqueues/dequeues + */ + + /** + * @param virtualHost The virtual host using by this store + * @param base The base element identifier from which all configuration items are relative. For example, if + * the base element is "store", the all elements used by concrete classes will be "store.foo" etc. + * @param config The apache commons configuration object. + * + * @return + * + * @throws Exception + */ + public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception + { + return this; + } + + public void close() throws Exception + { + } + + public void enqueueMessage(StoreContext context, ArrayList<AMQQueue> queues, Long messageId) throws AMQException + { + for (AMQQueue queue : queues) + { + enqueueMessage(messageId, queue); + } + } + + private void enqueueMessage(Long messageId, AMQQueue queue) + { + ArrayList<AMQQueue> queues = _enqueues.get(messageId); + + if (queues == null) + { + synchronized (_enqueues) + { + queues = _enqueues.get(messageId); + if (queues == null) + { + queues = new ArrayList<AMQQueue>(); + _enqueues.put(messageId, queues); + } + } + } + + synchronized (queues) + { + queues.add(queue); + } + } + + public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException + { + ArrayList<AMQQueue> queues = _enqueues.get(messageId); + + if (queues == null) + { + throw new RuntimeException("Attempt to dequeue message(" + messageId + ") from " + + "queue(" + queue + ") but no enqueue data available"); + } + + synchronized (queues) + { + if (!queues.contains(queue)) + { + throw new RuntimeException("Attempt to dequeue message(" + messageId + ") from " + + "queue(" + queue + ") but no message not enqueued on queue"); + } + + queues.remove(queue); + } + } + + public void removeMessage(StoreContext context, Long messageId) throws AMQException + { + ArrayList<AMQQueue> queues; + + synchronized (_enqueues) + { + queues = _enqueues.remove(messageId); + } + + if (queues == null) + { + throw new RuntimeException("Attempt to remove message(" + messageId + ") but " + + "no enqueue data available"); + } + + if (!queues.isEmpty()) + { + throw new RuntimeException("Removed a message(" + messageId + ") that still had references."); + } + + synchronized (_storeMetaData) + { + _storeMetaData.remove(messageId); + } + + synchronized (_storeChunks) + { + _storeChunks.remove(messageId); + } + + } + + // + // This class does not attempt to operate transactionally. It only knows when it should be in a transaction. + // Data is stored immediately. + // + + public void beginTran(StoreContext context) throws AMQException + { + context.setPayload(new Object()); + } + + public void commitTran(StoreContext context) throws AMQException + { + context.setPayload(null); + } + + public void abortTran(StoreContext context) throws AMQException + { + _inTransaction = false; + } + + public boolean inTran(StoreContext context) + { + return _inTransaction; + } + + public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException + { + ArrayList<ContentChunk> chunks = _storeChunks.get(messageId); + + if (chunks == null) + { + synchronized (_storeChunks) + { + chunks = _storeChunks.get(messageId); + if (chunks == null) + { + chunks = new ArrayList<ContentChunk>(); + _storeChunks.put(messageId, chunks); + } + } + } + + synchronized (chunks) + { + chunks.add(contentBody); + } + } + + public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException + { + if (_storeMetaData.get(messageId) != null) + { + throw new RuntimeException("Attempt to storeMessageMetaData for messageId(" + messageId + ") but data already exists"); + } + + synchronized (_storeMetaData) + { + _storeMetaData.put(messageId, messageMetaData); + } + } + + public boolean isPersistent() + { + return false; + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java index 26802b4210..1210423d1b 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java @@ -69,7 +69,7 @@ public class TxnBufferTest extends TestCase public void testCommitWithFailureDuringPrepare() throws AMQException { MockStore store = new MockStore(); - store.beginTran(null); + store.beginTran(new StoreContext()); TxnBuffer buffer = new TxnBuffer(); buffer.enlist(new StoreMessageOperation(store)); @@ -81,7 +81,7 @@ public class TxnBufferTest extends TestCase try { - buffer.commit(null); + buffer.commit(new StoreContext()); } catch (NoSuchElementException e) { @@ -95,7 +95,7 @@ public class TxnBufferTest extends TestCase public void testCommitWithPersistance() throws AMQException { MockStore store = new MockStore(); - store.beginTran(null); + store.beginTran(new StoreContext()); store.expectCommit(); TxnBuffer buffer = new TxnBuffer(); @@ -105,7 +105,7 @@ public class TxnBufferTest extends TestCase buffer.enlist(new StoreMessageOperation(store)); buffer.enlist(new TxnTester(store)); - buffer.commit(null); + buffer.commit(new StoreContext()); validateOps(); store.validate(); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualhostInitRoutingTableFromTransactionLogTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualhostInitRoutingTableFromTransactionLogTest.java index 1274b99880..ed79f1cc4f 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualhostInitRoutingTableFromTransactionLogTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualhostInitRoutingTableFromTransactionLogTest.java @@ -39,8 +39,7 @@ public class VirtualhostInitRoutingTableFromTransactionLogTest extends TestCase _virtualHost = new VirtualHost(new VirtualHostConfiguration("test", env)); assertNotNull(_virtualHost.getTransactionLog()); - assertNotNull(_virtualHost.getRoutingTable()); - assertEquals(_virtualHost.getTransactionLog(),_virtualHost.getRoutingTable()); + assertNotNull(_virtualHost.getRoutingTable()); } catch (Exception e) { |
