summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/test')
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java3
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java6
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java10
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java7
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java124
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java535
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java8
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualhostInitRoutingTableFromTransactionLogTest.java3
10 files changed, 636 insertions, 64 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
index d7844730d1..eb9c8653af 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
@@ -26,6 +26,7 @@ import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.store.StoreContext;
import java.util.ArrayList;
@@ -111,7 +112,7 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
arguments.put(AMQQueueFactory.X_QPID_PRIORITIES, PRIORITIES);
// Create IncomingMessage and nondurable queue
- NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
+ NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, new StoreContext(), null, null);
//Create a priorityQueue
_queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testMessagesFlowToDiskWithPriority"), false, _owner, false, _virtualHost, arguments);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 3d189ae6c5..89dbc4f959 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -113,7 +113,7 @@ public class AMQQueueMBeanTest extends TestCase
private void verifyBrokerState()
{
- TestableMemoryMessageStore store = new TestableMemoryMessageStore((MemoryMessageStore) _virtualHost.getTransactionLog());
+ TestableMemoryMessageStore store = new TestableMemoryMessageStore(_virtualHost.getTransactionLog());
// Unlike MessageReturnTest there is no need for a delay as there this thread does the clean up.
assertNotNull("ContentBodyMap should not be null", store.getContentBodyMap());
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
index 9f8d5f9a99..3280516b56 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
@@ -75,7 +75,7 @@ public class AckTest extends TestCase
ApplicationRegistry.initialise(new NullApplicationRegistry(), 1);
VirtualHost vhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test");
- _messageStore = new TestableMemoryMessageStore((MemoryMessageStore)vhost.getTransactionLog());
+ _messageStore = new TestableMemoryMessageStore(vhost.getTransactionLog());
_protocolSession = new MockProtocolSession(_messageStore);
_channel = new AMQChannel(_protocolSession,5, _messageStore /*dont need exchange registry*/);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java
index 7a944a5399..d007913a4f 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java
@@ -109,7 +109,7 @@ public class PersistentMessageTest extends TransientMessageTest
// Check that it was not enqueued
List<AMQQueue> queueList = _messageStore.getMessageReferenceMap(messageId);
- assertNull("TransactionLog contains a queue reference for this messageID:" + messageId, queueList);
+ assertTrue("TransactionLog contains a queue reference for this messageID:" + messageId, queueList == null || queueList.isEmpty());
checkMessageMetaDataRemoved(messageId);
assertEquals("Return message count not correct", 1, _returnMessages.size());
@@ -152,8 +152,8 @@ public class PersistentMessageTest extends TransientMessageTest
{
assertNull("Message MetaData still exists for message:" + messageId,
_messageStore.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId));
- assertNull("Message still has values in the reference map:" + messageId,
- _messageStore.getMessageReferenceMap(messageId));
+ List ids = _messageStore.getMessageReferenceMap(messageId);
+ assertTrue("Message still has values in the reference map:" + messageId, ids == null || ids.isEmpty());
}
catch (AMQException e)
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index f39dfe765e..d4b1de29b2 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -319,7 +319,7 @@ public class SimpleAMQQueueTest extends TestCase
public void testEnqueueDequeueOfPersistentMessageToNonDurableQueue() throws AMQException
{
// Create IncomingMessage and nondurable queue
- NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
+ NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, new StoreContext(), null, null);
IncomingMessage msg = new IncomingMessage(info, txnContext, new MockProtocolSession(_transactionLog), _transactionLog);
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
@@ -351,17 +351,17 @@ public class SimpleAMQQueueTest extends TestCase
MockQueueEntry entry = new MockQueueEntry(message, _queue);
entry.getQueueEntryList().add(message);
entry.acquire();
- entry.dequeue(null);
+ entry.dequeue(new StoreContext());
// Check that it is dequeued
data = _transactionLog.getMessageReferenceMap(messageId);
- assertNull(data);
+ assertTrue(data == null || data.isEmpty());
}
public void testMessagesFlowToDisk() throws AMQException, InterruptedException
{
// Create IncomingMessage and nondurable queue
- NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
+ NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, new StoreContext(), null, null);
MESSAGE_SIZE = 1;
long MEMORY_MAX = 500;
@@ -431,7 +431,7 @@ public class SimpleAMQQueueTest extends TestCase
public void testMessagesFlowToDiskPurger() throws AMQException, InterruptedException
{
// Create IncomingMessage and nondurable queue
- NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
+ NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, new StoreContext(), null, null);
MESSAGE_SIZE = 1;
/** Set to larger than the purge batch size. Default 100.
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
index 0a30d855b3..d6e658958e 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
@@ -34,6 +34,7 @@ import org.apache.qpid.server.transactionlog.TransactionLog;
import org.apache.qpid.server.routing.RoutingTable;
import java.util.List;
+import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -48,9 +49,9 @@ public class SkeletonMessageStore implements TransactionLog , RoutingTable
{
}
- public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+ public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
{
- //To change body of implemented methods use File | Settings | File Templates.
+ return this;
}
public void close() throws Exception
@@ -146,7 +147,7 @@ public class SkeletonMessageStore implements TransactionLog , RoutingTable
}
- public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+ public void enqueueMessage(StoreContext context, final ArrayList<AMQQueue> queues, Long messageId) throws AMQException
{
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
index 456e816a52..fa5cdc1aa5 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
@@ -21,155 +21,191 @@
package org.apache.qpid.server.store;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.routing.RoutingTable;
+import org.apache.qpid.server.transactionlog.BaseTransactionLog;
import org.apache.qpid.server.transactionlog.TransactionLog;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.commons.configuration.Configuration;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.HashMap;
+import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
-/**
- * Adds some extra methods to the memory message store for testing purposes.
- */
+/** Adds some extra methods to the memory message store for testing purposes. */
public class TestableMemoryMessageStore implements TestTransactionLog, TransactionLog, RoutingTable
{
+ private TransactionLog _transactionLog;
+ private RoutingTable _routingTable;
+ private MemoryMessageStore _mms;
+
+ public TestableMemoryMessageStore(TransactionLog log)
+ {
+ _transactionLog = log;
+ if (log instanceof BaseTransactionLog)
+ {
+ TransactionLog delegate = ((BaseTransactionLog) log).getDelegate();
+ if (delegate instanceof RoutingTable)
+ {
+ _routingTable = (RoutingTable) delegate;
+ }
+ else
+ {
+ throw new RuntimeException("Specified BaseTransactionLog does not delegate to a RoutingTable:" + log);
+ }
+
+ if (delegate instanceof MemoryMessageStore)
+ {
+ _mms = (MemoryMessageStore) delegate;
+ }
+
+ }
+ else
+ {
+ throw new RuntimeException("Specified BaseTransactionLog is not testable:" + log);
+ }
- MemoryMessageStore _mms = null;
+ }
public TestableMemoryMessageStore(MemoryMessageStore mms)
{
- _mms = mms;
+ _routingTable = mms;
+ _transactionLog = mms.configure();
}
public TestableMemoryMessageStore()
{
_mms = new MemoryMessageStore();
- _mms.configure();
+ _transactionLog = _mms.configure();
+ _routingTable = _mms;
}
public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap()
{
- return _mms._metaDataMap;
+ return ((MemoryMessageStore) _routingTable)._metaDataMap;
}
public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap()
{
- return _mms._contentBodyMap;
+ return ((MemoryMessageStore) _routingTable)._contentBodyMap;
}
public List<AMQQueue> getMessageReferenceMap(Long messageId)
{
- return _mms._messageEnqueueMap.get(messageId);
+// return _mms._messageEnqueueMap.get(messageId);
+// ((BaseTransactionLog)_transactionLog).
+ return new ArrayList<AMQQueue>();
}
- public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+ public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
{
- _mms.configure(virtualHost,base,config);
+ _transactionLog = (TransactionLog) _transactionLog.configure(virtualHost, base, config);
+ return _transactionLog;
}
public void close() throws Exception
{
- _mms.close();
+ _transactionLog.close();
+ _routingTable.close();
}
public void createExchange(Exchange exchange) throws AMQException
{
- _mms.createExchange(exchange);
+ _routingTable.createExchange(exchange);
}
public void removeExchange(Exchange exchange) throws AMQException
{
- _mms.removeExchange(exchange);
+ _routingTable.removeExchange(exchange);
}
public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
{
- _mms.bindQueue(exchange,routingKey,queue,args);
+ _routingTable.bindQueue(exchange, routingKey, queue, args);
}
public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
{
- _mms.unbindQueue(exchange,routingKey,queue,args);
+ _routingTable.unbindQueue(exchange, routingKey, queue, args);
}
public void createQueue(AMQQueue queue) throws AMQException
{
- _mms.createQueue(queue);
+ _routingTable.createQueue(queue);
}
public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
{
- _mms.createQueue(queue,arguments);
+ _routingTable.createQueue(queue, arguments);
}
public void removeQueue(AMQQueue queue) throws AMQException
{
- _mms.removeQueue(queue);
+ _routingTable.removeQueue(queue);
}
- public void enqueueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
+ public void enqueueMessage(StoreContext context, ArrayList<AMQQueue> queues, Long messageId) throws AMQException
{
- _mms.enqueueMessage(context,queue,messageId);
+ _transactionLog.enqueueMessage(context, queues, messageId);
}
public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
{
- _mms.dequeueMessage(context,queue,messageId);
+ _transactionLog.dequeueMessage(context, queue, messageId);
+ }
+
+ public void removeMessage(StoreContext context, Long messageId) throws AMQException
+ {
+ _transactionLog.removeMessage(context, messageId);
}
public void beginTran(StoreContext context) throws AMQException
{
- _mms.beginTran(context);
+ _transactionLog.beginTran(context);
}
public void commitTran(StoreContext context) throws AMQException
{
- _mms.commitTran(context);
+ _transactionLog.commitTran(context);
}
public void abortTran(StoreContext context) throws AMQException
{
- _mms.abortTran(context);
+ _transactionLog.abortTran(context);
}
public boolean inTran(StoreContext context)
{
- return _mms.inTran(context);
+ return _transactionLog.inTran(context);
}
public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException
{
- _mms.storeContentBodyChunk(context,messageId,index,contentBody,lastContentBody);
+ _transactionLog.storeContentBodyChunk(context, messageId, index, contentBody, lastContentBody);
}
public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException
{
- _mms.storeMessageMetaData(context,messageId,messageMetaData);
+ _transactionLog.storeMessageMetaData(context, messageId, messageMetaData);
}
- public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
+ public boolean isPersistent()
{
- return _mms.getMessageMetaData(context,messageId);
+ return _transactionLog.isPersistent();
}
- public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
+ public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
{
- return _mms.getContentBodyChunk(context,messageId,index);
+ return _mms.getMessageMetaData(context, messageId);
}
- public boolean isPersistent()
+ public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
{
- return _mms.isPersistent();
+ return _mms.getContentBodyChunk(context, messageId, index);
}
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java
new file mode 100644
index 0000000000..d3294d4c10
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java
@@ -0,0 +1,535 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transactionlog;
+
+import junit.framework.TestCase;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.server.queue.MockAMQQueue;
+import org.apache.qpid.server.queue.MockContentChunk;
+import org.apache.qpid.server.queue.MockPersistentAMQMessage;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+public class BaseTransactionLogTest extends TestCase implements TransactionLog
+{
+ private boolean _inTransaction;
+ final private Map<Long, ArrayList<AMQQueue>> _enqueues = new HashMap<Long, ArrayList<AMQQueue>>();
+ final private Map<Long, ArrayList<ContentChunk>> _storeChunks = new HashMap<Long, ArrayList<ContentChunk>>();
+ final private Map<Long, MessageMetaData> _storeMetaData = new HashMap<Long, MessageMetaData>();
+
+ BaseTransactionLog _transactionLog;
+ private ArrayList<AMQQueue> _queues;
+ private MockPersistentAMQMessage _message;
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _transactionLog = new BaseTransactionLog(this);
+ }
+
+ public void testSingleEnqueueNoTransactional() throws AMQException
+ {
+ //Store Data
+
+ _message = new MockPersistentAMQMessage(1L, this);
+
+ _message.addContentBodyFrame(new StoreContext(), new MockContentChunk(100), true);
+
+ MessagePublishInfo mpi = new MessagePublishInfoImpl();
+
+ ContentHeaderBody chb = new ContentHeaderBody();
+
+ _message.setPublishAndContentHeaderBody(new StoreContext(), mpi, chb);
+
+ verifyMessageStored(_message.getMessageId());
+ // Enqueue
+
+ _queues = new ArrayList<AMQQueue>();
+ MockAMQQueue queue = new MockAMQQueue(this.getName());
+ _queues.add(queue);
+
+ _transactionLog.enqueueMessage(new StoreContext(), _queues, _message.getMessageId());
+
+ verifyEnqueuedOnQueues(_message.getMessageId(), _queues);
+ }
+
+ public void testSingleDequeueNoTransaction() throws AMQException
+ {
+ // Enqueue a message to dequeue
+ testSingleEnqueueNoTransactional();
+
+ _transactionLog.dequeueMessage(new StoreContext(),_queues.get(0), _message.getMessageId());
+
+ assertNull("Message enqueued", _enqueues.get(_message.getMessageId()));
+ assertNull("Message enqueued", _storeChunks.get(_message.getMessageId()));
+ assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId()));
+ }
+
+ public void testSingleEnqueueTransactional() throws AMQException
+ {
+ StoreContext context = new StoreContext();
+
+ _transactionLog.beginTran(context);
+
+ //Store Data
+ _message = new MockPersistentAMQMessage(1L, this);
+
+ _message.addContentBodyFrame(context, new MockContentChunk(100), true);
+
+ MessagePublishInfo mpi = new MessagePublishInfoImpl();
+
+ ContentHeaderBody chb = new ContentHeaderBody();
+
+ _message.setPublishAndContentHeaderBody(context, mpi, chb);
+
+ _transactionLog.commitTran(context);
+
+ verifyMessageStored(_message.getMessageId());
+
+ // Enqueue
+ _transactionLog.beginTran(context);
+
+ _queues = new ArrayList<AMQQueue>();
+ MockAMQQueue queue = new MockAMQQueue(this.getName());
+ _queues.add(queue);
+
+ _transactionLog.enqueueMessage(context, _queues, _message.getMessageId());
+
+ _transactionLog.commitTran(context);
+ verifyEnqueuedOnQueues(_message.getMessageId(), _queues);
+ }
+
+ public void testSingleDequeueTransaction() throws AMQException
+ {
+ // Enqueue a message to dequeue
+ testSingleEnqueueTransactional();
+
+ StoreContext context = new StoreContext();
+
+ _transactionLog.beginTran(context);
+
+ _transactionLog.dequeueMessage(context,_queues.get(0), _message.getMessageId());
+
+ _transactionLog.commitTran(context);
+
+ assertNull("Message enqueued", _enqueues.get(_message.getMessageId()));
+ assertNull("Message enqueued", _storeChunks.get(_message.getMessageId()));
+ assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId()));
+ }
+
+
+ public void testMultipleEnqueueNoTransactional() throws AMQException
+ {
+ //Store Data
+
+ _message = new MockPersistentAMQMessage(1L, this);
+
+ _message.addContentBodyFrame(new StoreContext(), new MockContentChunk(100), true);
+
+ MessagePublishInfo mpi = new MessagePublishInfoImpl();
+
+ ContentHeaderBody chb = new ContentHeaderBody();
+
+ _message.setPublishAndContentHeaderBody(new StoreContext(), mpi, chb);
+
+ verifyMessageStored(_message.getMessageId());
+ // Enqueue
+
+ _queues = new ArrayList<AMQQueue>();
+
+ MockAMQQueue queue = new MockAMQQueue(this.getName());
+ _queues.add(queue);
+
+ queue = new MockAMQQueue(this.getName() + "2");
+ _queues.add(queue);
+
+ queue = new MockAMQQueue(this.getName() + "3");
+ _queues.add(queue);
+
+ _transactionLog.enqueueMessage(new StoreContext(), _queues, _message.getMessageId());
+
+ verifyEnqueuedOnQueues(_message.getMessageId(), _queues);
+ }
+
+ public void testMultipleDequeueNoTransaction() throws AMQException
+ {
+ // Enqueue a message to dequeue
+ testMultipleEnqueueNoTransactional();
+
+ _transactionLog.dequeueMessage(new StoreContext(),_queues.get(0), _message.getMessageId());
+
+ ArrayList<AMQQueue> enqueued = _enqueues.get(_message.getMessageId());
+ assertNotNull("Message not enqueued", enqueued);
+ assertFalse("Message still enqueued on the first queue,",enqueued.contains(_queues.get(0)));
+ assertEquals("Message should still be enqueued on 2 queues", 2, enqueued.size());
+
+ assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId()));
+ assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId()));
+
+
+ _transactionLog.dequeueMessage(new StoreContext(),_queues.get(1), _message.getMessageId());
+
+ enqueued = _enqueues.get(_message.getMessageId());
+ assertNotNull("Message not enqueued", enqueued);
+ assertFalse("Message still enqueued on the second queue,",enqueued.contains(_queues.get(1)));
+ assertEquals("Message should still be enqueued on 2 queues", 1, enqueued.size());
+
+ assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId()));
+ assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId()));
+
+ _transactionLog.dequeueMessage(new StoreContext(),_queues.get(2), _message.getMessageId());
+
+ assertNull("Message enqueued", _enqueues.get(_message.getMessageId()));
+ assertNull("Message enqueued", _storeChunks.get(_message.getMessageId()));
+ assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId()));
+ }
+
+
+ public void testMultipleEnqueueTransactional() throws AMQException
+ {
+ StoreContext context = new StoreContext();
+
+ _transactionLog.beginTran(context);
+
+ //Store Data
+ _message = new MockPersistentAMQMessage(1L, this);
+
+ _message.addContentBodyFrame(context, new MockContentChunk(100), true);
+
+ MessagePublishInfo mpi = new MessagePublishInfoImpl();
+
+ ContentHeaderBody chb = new ContentHeaderBody();
+
+ _message.setPublishAndContentHeaderBody(context, mpi, chb);
+
+ _transactionLog.commitTran(context);
+
+ verifyMessageStored(_message.getMessageId());
+
+ // Enqueue
+ _transactionLog.beginTran(context);
+
+ _queues = new ArrayList<AMQQueue>();
+ MockAMQQueue queue = new MockAMQQueue(this.getName());
+ _queues.add(queue);
+
+ queue = new MockAMQQueue(this.getName() + "2");
+ _queues.add(queue);
+
+ queue = new MockAMQQueue(this.getName() + "3");
+ _queues.add(queue);
+
+ _transactionLog.enqueueMessage(context, _queues, _message.getMessageId());
+
+ _transactionLog.commitTran(context);
+ verifyEnqueuedOnQueues(_message.getMessageId(), _queues);
+ }
+
+ public void testMultipleDequeueMultipleTransactions() throws AMQException
+ {
+ // Enqueue a message to dequeue
+ testMultipleEnqueueTransactional();
+
+ StoreContext context = new StoreContext();
+
+ _transactionLog.beginTran(context);
+
+ _transactionLog.dequeueMessage(context, _queues.get(0), _message.getMessageId());
+
+ _transactionLog.commitTran(context);
+ ArrayList<AMQQueue> enqueued = _enqueues.get(_message.getMessageId());
+ assertNotNull("Message not enqueued", enqueued);
+ assertFalse("Message still enqueued on the first queue,", enqueued.contains(_queues.get(0)));
+ assertEquals("Message should still be enqueued on 2 queues", 2, enqueued.size());
+
+ assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId()));
+ assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId()));
+
+ _transactionLog.beginTran(context);
+
+ _transactionLog.dequeueMessage(context, _queues.get(1), _message.getMessageId());
+
+ _transactionLog.commitTran(context);
+
+ enqueued = _enqueues.get(_message.getMessageId());
+ assertNotNull("Message not enqueued", enqueued);
+ assertFalse("Message still enqueued on the second queue,", enqueued.contains(_queues.get(1)));
+ assertEquals("Message should still be enqueued on 2 queues", 1, enqueued.size());
+
+ assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId()));
+ assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId()));
+
+ _transactionLog.beginTran(context);
+
+ _transactionLog.dequeueMessage(context, _queues.get(2), _message.getMessageId());
+
+ _transactionLog.commitTran(context);
+
+ assertNull("Message enqueued", _enqueues.get(_message.getMessageId()));
+ assertNull("Message enqueued", _storeChunks.get(_message.getMessageId()));
+ assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId()));
+ }
+
+ public void testMultipleDequeueSingleTransaction() throws AMQException
+ {
+ // Enqueue a message to dequeue
+ testMultipleEnqueueTransactional();
+
+ StoreContext context = new StoreContext();
+
+ _transactionLog.beginTran(context);
+
+ _transactionLog.dequeueMessage(context, _queues.get(0), _message.getMessageId());
+
+ ArrayList<AMQQueue> enqueued = _enqueues.get(_message.getMessageId());
+ assertNotNull("Message not enqueued", enqueued);
+ assertFalse("Message still enqueued on the first queue,", enqueued.contains(_queues.get(0)));
+ assertEquals("Message should still be enqueued on 2 queues", 2, enqueued.size());
+
+ assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId()));
+ assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId()));
+
+
+ _transactionLog.dequeueMessage(context, _queues.get(1), _message.getMessageId());
+
+
+ enqueued = _enqueues.get(_message.getMessageId());
+ assertNotNull("Message not enqueued", enqueued);
+ assertFalse("Message still enqueued on the second queue,", enqueued.contains(_queues.get(1)));
+ assertEquals("Message should still be enqueued on 2 queues", 1, enqueued.size());
+
+ assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId()));
+ assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId()));
+
+
+ _transactionLog.dequeueMessage(context, _queues.get(2), _message.getMessageId());
+
+ _transactionLog.commitTran(context);
+
+ assertNull("Message enqueued", _enqueues.get(_message.getMessageId()));
+ assertNull("Message enqueued", _storeChunks.get(_message.getMessageId()));
+ assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId()));
+ }
+
+ private void verifyMessageStored(Long messageId)
+ {
+ assertTrue("MessageMD has not been stored", _storeMetaData.containsKey(messageId));
+ assertTrue("Messasge Chunk has not been stored", _storeChunks.containsKey(messageId));
+ }
+
+ private void verifyEnqueuedOnQueues(Long messageId, ArrayList<AMQQueue> queues)
+ {
+ ArrayList<AMQQueue> enqueues = _enqueues.get(messageId);
+
+ assertNotNull("Message not enqueued", enqueues);
+ assertEquals("Message is not enqueued on the right number of queues", queues.size(), enqueues.size());
+ for (AMQQueue queue : queues)
+ {
+ assertTrue("Message not enqueued on:" + queue, enqueues.contains(queue));
+ }
+ }
+
+ /*************************** TransactionLog *******************************
+ *
+ * Simple InMemory TransactionLog that actually records enqueues/dequeues
+ */
+
+ /**
+ * @param virtualHost The virtual host using by this store
+ * @param base The base element identifier from which all configuration items are relative. For example, if
+ * the base element is "store", the all elements used by concrete classes will be "store.foo" etc.
+ * @param config The apache commons configuration object.
+ *
+ * @return
+ *
+ * @throws Exception
+ */
+ public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+ {
+ return this;
+ }
+
+ public void close() throws Exception
+ {
+ }
+
+ public void enqueueMessage(StoreContext context, ArrayList<AMQQueue> queues, Long messageId) throws AMQException
+ {
+ for (AMQQueue queue : queues)
+ {
+ enqueueMessage(messageId, queue);
+ }
+ }
+
+ private void enqueueMessage(Long messageId, AMQQueue queue)
+ {
+ ArrayList<AMQQueue> queues = _enqueues.get(messageId);
+
+ if (queues == null)
+ {
+ synchronized (_enqueues)
+ {
+ queues = _enqueues.get(messageId);
+ if (queues == null)
+ {
+ queues = new ArrayList<AMQQueue>();
+ _enqueues.put(messageId, queues);
+ }
+ }
+ }
+
+ synchronized (queues)
+ {
+ queues.add(queue);
+ }
+ }
+
+ public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
+ {
+ ArrayList<AMQQueue> queues = _enqueues.get(messageId);
+
+ if (queues == null)
+ {
+ throw new RuntimeException("Attempt to dequeue message(" + messageId + ") from " +
+ "queue(" + queue + ") but no enqueue data available");
+ }
+
+ synchronized (queues)
+ {
+ if (!queues.contains(queue))
+ {
+ throw new RuntimeException("Attempt to dequeue message(" + messageId + ") from " +
+ "queue(" + queue + ") but no message not enqueued on queue");
+ }
+
+ queues.remove(queue);
+ }
+ }
+
+ public void removeMessage(StoreContext context, Long messageId) throws AMQException
+ {
+ ArrayList<AMQQueue> queues;
+
+ synchronized (_enqueues)
+ {
+ queues = _enqueues.remove(messageId);
+ }
+
+ if (queues == null)
+ {
+ throw new RuntimeException("Attempt to remove message(" + messageId + ") but " +
+ "no enqueue data available");
+ }
+
+ if (!queues.isEmpty())
+ {
+ throw new RuntimeException("Removed a message(" + messageId + ") that still had references.");
+ }
+
+ synchronized (_storeMetaData)
+ {
+ _storeMetaData.remove(messageId);
+ }
+
+ synchronized (_storeChunks)
+ {
+ _storeChunks.remove(messageId);
+ }
+
+ }
+
+ //
+ // This class does not attempt to operate transactionally. It only knows when it should be in a transaction.
+ // Data is stored immediately.
+ //
+
+ public void beginTran(StoreContext context) throws AMQException
+ {
+ context.setPayload(new Object());
+ }
+
+ public void commitTran(StoreContext context) throws AMQException
+ {
+ context.setPayload(null);
+ }
+
+ public void abortTran(StoreContext context) throws AMQException
+ {
+ _inTransaction = false;
+ }
+
+ public boolean inTran(StoreContext context)
+ {
+ return _inTransaction;
+ }
+
+ public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException
+ {
+ ArrayList<ContentChunk> chunks = _storeChunks.get(messageId);
+
+ if (chunks == null)
+ {
+ synchronized (_storeChunks)
+ {
+ chunks = _storeChunks.get(messageId);
+ if (chunks == null)
+ {
+ chunks = new ArrayList<ContentChunk>();
+ _storeChunks.put(messageId, chunks);
+ }
+ }
+ }
+
+ synchronized (chunks)
+ {
+ chunks.add(contentBody);
+ }
+ }
+
+ public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException
+ {
+ if (_storeMetaData.get(messageId) != null)
+ {
+ throw new RuntimeException("Attempt to storeMessageMetaData for messageId(" + messageId + ") but data already exists");
+ }
+
+ synchronized (_storeMetaData)
+ {
+ _storeMetaData.put(messageId, messageMetaData);
+ }
+ }
+
+ public boolean isPersistent()
+ {
+ return false;
+ }
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java
index 26802b4210..1210423d1b 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java
@@ -69,7 +69,7 @@ public class TxnBufferTest extends TestCase
public void testCommitWithFailureDuringPrepare() throws AMQException
{
MockStore store = new MockStore();
- store.beginTran(null);
+ store.beginTran(new StoreContext());
TxnBuffer buffer = new TxnBuffer();
buffer.enlist(new StoreMessageOperation(store));
@@ -81,7 +81,7 @@ public class TxnBufferTest extends TestCase
try
{
- buffer.commit(null);
+ buffer.commit(new StoreContext());
}
catch (NoSuchElementException e)
{
@@ -95,7 +95,7 @@ public class TxnBufferTest extends TestCase
public void testCommitWithPersistance() throws AMQException
{
MockStore store = new MockStore();
- store.beginTran(null);
+ store.beginTran(new StoreContext());
store.expectCommit();
TxnBuffer buffer = new TxnBuffer();
@@ -105,7 +105,7 @@ public class TxnBufferTest extends TestCase
buffer.enlist(new StoreMessageOperation(store));
buffer.enlist(new TxnTester(store));
- buffer.commit(null);
+ buffer.commit(new StoreContext());
validateOps();
store.validate();
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualhostInitRoutingTableFromTransactionLogTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualhostInitRoutingTableFromTransactionLogTest.java
index 1274b99880..ed79f1cc4f 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualhostInitRoutingTableFromTransactionLogTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualhostInitRoutingTableFromTransactionLogTest.java
@@ -39,8 +39,7 @@ public class VirtualhostInitRoutingTableFromTransactionLogTest extends TestCase
_virtualHost = new VirtualHost(new VirtualHostConfiguration("test", env));
assertNotNull(_virtualHost.getTransactionLog());
- assertNotNull(_virtualHost.getRoutingTable());
- assertEquals(_virtualHost.getTransactionLog(),_virtualHost.getRoutingTable());
+ assertNotNull(_virtualHost.getRoutingTable());
}
catch (Exception e)
{