summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java13
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/routing/RoutingTable.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java19
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java57
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java121
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java234
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java81
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java15
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java3
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java6
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java10
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java7
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java124
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java535
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java8
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualhostInitRoutingTableFromTransactionLogTest.java3
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java52
20 files changed, 1124 insertions, 176 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
index 5eafd281c0..bab19fbc54 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
@@ -136,11 +136,7 @@ public class IncomingMessage implements Filterable<RuntimeException>
if(_destinationQueues != null)
{
- for (int i = 0; i < _destinationQueues.size(); i++)
- {
- transactionLog.enqueueMessage(_txnContext.getStoreContext(),
- _destinationQueues.get(i), getMessageId());
- }
+ transactionLog.enqueueMessage(_txnContext.getStoreContext(), _destinationQueues, getMessageId());
}
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index ed9b1eb8d7..e5898ceda9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -849,7 +849,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
if (entry.isPersistent() && toQueue.isDurable())
{
- transactionLog.enqueueMessage(storeContext, toQueue, entry.getMessageId());
+ //FIXME
+ //fixme
+ ArrayList list = new ArrayList();
+ list.add(toQueue);
+ transactionLog.enqueueMessage(storeContext, list, entry.getMessageId());
}
// dequeue will remove the messages from the queue
entry.dequeue(storeContext);
@@ -941,10 +945,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
if (!entry.isDeleted() && entry.isPersistent() && toQueue.isDurable())
{
- transactionLog.enqueueMessage(storeContext, toQueue, entry.getMessageId());
+ //fixme
+ //FIXME
+ ArrayList list = new ArrayList();
+ list.add(toQueue);
+ transactionLog.enqueueMessage(storeContext, list, entry.getMessageId());
}
}
+
// Commit and flush the move transcations.
try
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/routing/RoutingTable.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/routing/RoutingTable.java
index 0c62638710..883a41b55f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/routing/RoutingTable.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/routing/RoutingTable.java
@@ -42,7 +42,7 @@ public interface RoutingTable
*
* @throws Exception If any error occurs that means the store is unable to configure itself.
*/
- void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception;
+ Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception;
/**
* Called to close and cleanup any resources used by the message store.
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
index 33b3d8608e..157418d806 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
@@ -34,6 +34,7 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.transactionlog.BaseTransactionLog;
import org.apache.qpid.server.routing.RoutingTable;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
@@ -41,7 +42,6 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
@@ -143,7 +143,7 @@ public class DerbyMessageStore implements TransactionLog, RoutingTable
private State _state = State.INITIAL;
- public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+ public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
{
//Only initialise when loaded with the old 'store' confing ignore the new 'RoutingTable' config
if (base.equals("store"))
@@ -178,7 +178,9 @@ public class DerbyMessageStore implements TransactionLog, RoutingTable
recover();
stateTransition(State.RECOVERING, State.STARTED);
+ return new BaseTransactionLog(this);
}
+ return null;
}
private static synchronized void initialiseDriver() throws ClassNotFoundException
@@ -825,7 +827,18 @@ public class DerbyMessageStore implements TransactionLog, RoutingTable
}
- public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+ public void enqueueMessage(StoreContext context, ArrayList<AMQQueue> queues, Long messageId) throws AMQException
+ {
+ for (AMQQueue q : queues)
+ {
+ if (q.isDurable())
+ {
+ enqueueMessage(context,q,messageId);
+ }
+ }
+ }
+
+ void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
{
AMQShortString name = queue.getName();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index 3754b41a3e..d57b81c362 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.store;
-import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
@@ -30,17 +29,14 @@ import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.routing.RoutingTable;
import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.transactionlog.BaseTransactionLog;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
-import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -67,16 +63,16 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable
private final AtomicLong _messageId = new AtomicLong(1);
private AtomicBoolean _closed = new AtomicBoolean(false);
- protected final Map<Long, List<AMQQueue>> _messageEnqueueMap = new HashMap<Long, List<AMQQueue>>();
- public void configure()
+ public TransactionLog configure()
{
_log.info("Using capacity " + DEFAULT_HASHTABLE_CAPACITY + " for hash tables");
_metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(DEFAULT_HASHTABLE_CAPACITY);
_contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(DEFAULT_HASHTABLE_CAPACITY);
+ return new BaseTransactionLog(this);
}
- public void configure(String base, VirtualHostConfiguration config)
+ public TransactionLog configure(String base, VirtualHostConfiguration config)
{
//Only initialise when called with current 'store' configs i.e. don't reinit when used as a 'RoutingTable'
if (base.equals("store"))
@@ -85,12 +81,14 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable
_log.info("Using capacity " + hashtableCapacity + " for hash tables");
_metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(hashtableCapacity);
_contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(hashtableCapacity);
+ return new BaseTransactionLog(this);
}
+ return null;
}
- public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+ public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
{
- configure(base, config);
+ return configure(base, config);
}
public void close() throws Exception
@@ -108,7 +106,7 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable
}
}
- private void removeMessage(StoreContext context, Long messageId) throws AMQException
+ public void removeMessage(StoreContext context, Long messageId) throws AMQException
{
checkNotClosed();
if (_log.isDebugEnabled())
@@ -117,7 +115,6 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable
}
_metaDataMap.remove(messageId);
_contentBodyMap.remove(messageId);
- _messageEnqueueMap.remove(messageId);
}
public void createExchange(Exchange exchange) throws AMQException
@@ -155,41 +152,25 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable
// Not required to do anything
}
- public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+ public void enqueueMessage(StoreContext context, final ArrayList<AMQQueue> queues, Long messageId) throws AMQException
{
- synchronized (_messageEnqueueMap)
+ for (AMQQueue q : queues)
{
- List<AMQQueue> queues = _messageEnqueueMap.get(messageId);
- if (queues == null)
+ if (q.isDurable())
{
- queues = new LinkedList<AMQQueue>();
- _messageEnqueueMap.put(messageId, queues);
+ enqueueMessage(context,q,messageId);
}
-
- queues.add(queue);
}
}
- public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+ public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
{
- synchronized (_messageEnqueueMap)
- {
- List<AMQQueue> queues = _messageEnqueueMap.get(messageId);
- if (queues == null || !queues.contains(queue))
- {
- throw new RuntimeException("Attempt to dequeue messageID:" + messageId + " from queue:" + queue.getName()
- + " but it is not enqueued on that queue.");
- }
- else
- {
- queues.remove(queue);
- if (queues.isEmpty())
- {
- removeMessage(context,messageId);
- }
- }
- }
+ // Not required to do anything
+ }
+ public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+ {
+ // Not required to do anything
}
public void beginTran(StoreContext context) throws AMQException
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
index fdb56a1a55..b5ae8ea284 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
@@ -21,6 +21,12 @@
package org.apache.qpid.server.store;
import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQQueue;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
/**
* A context that the store can use to associate with a transactional context. For example, it could store
@@ -32,17 +38,37 @@ public class StoreContext
{
private static final Logger _logger = Logger.getLogger(StoreContext.class);
+ private static final String DEFAULT_NAME = "StoreContext";
private String _name;
private Object _payload;
+ private HashMap<Long, ArrayList<AMQQueue>> _enqueueMap;
+ private HashMap<Long, ArrayList<AMQQueue>> _dequeueMap;
+ private boolean _async;
public StoreContext()
{
- _name = "StoreContext";
+ this(DEFAULT_NAME);
}
public StoreContext(String name)
{
+ this(name,false);
+ }
+
+ /**
+ *
+ * @param name The name of this Transaction
+ * @param asynchrouous Is this Transaction Asynchronous
+ */
+ public StoreContext(String name, boolean asynchrouous)
+ {
_name = name;
+ _async = asynchrouous;
+ }
+
+ public StoreContext(boolean asynchronous)
+ {
+ this(DEFAULT_NAME, asynchronous);
}
public Object getPayload()
@@ -52,7 +78,7 @@ public class StoreContext
public void setPayload(Object payload)
{
- if(_logger.isDebugEnabled())
+ if (_logger.isDebugEnabled())
{
_logger.debug("public void setPayload(Object payload = " + payload + "): called");
}
@@ -68,4 +94,95 @@ public class StoreContext
{
return "<_name = " + _name + ", _payload = " + _payload + ">";
}
+
+ public Map<Long, ArrayList<AMQQueue>> getEnqueueMap()
+ {
+ return _enqueueMap;
+ }
+
+ public Map<Long, ArrayList<AMQQueue>> getDequeueMap()
+ {
+ return _dequeueMap;
+ }
+
+ /**
+ * Record the enqueues for processing if we abort
+ *
+ * @param queues
+ * @param messageId
+ *
+ * @throws AMQException
+ */
+ public void enqueueMessage(ArrayList<AMQQueue> queues, Long messageId) throws AMQException
+ {
+ if (inTransaction())
+ {
+ ArrayList<AMQQueue> enqueues = _enqueueMap.get(messageId);
+
+ if (enqueues == null)
+ {
+ enqueues = new ArrayList<AMQQueue>();
+ _enqueueMap.put(messageId, enqueues);
+ }
+
+ for (AMQQueue q : queues)
+ {
+ if (!enqueues.contains(q))
+ {
+ enqueues.add(q);
+ }
+ }
+
+ }
+ }
+
+ /**
+ * Record the dequeue for processing on commit
+ *
+ * @param queue
+ * @param messageId
+ *
+ * @throws AMQException
+ */
+ public void dequeueMessage(AMQQueue queue, Long messageId) throws AMQException
+ {
+ if (inTransaction())
+ {
+ ArrayList<AMQQueue> dequeues = _dequeueMap.get(messageId);
+
+ if (dequeues == null)
+ {
+ dequeues = new ArrayList<AMQQueue>();
+ _dequeueMap.put(messageId, dequeues);
+ }
+
+ dequeues.add(queue);
+ }
+ }
+
+ public void beginTransaction() throws AMQException
+ {
+ _enqueueMap = new HashMap<Long, ArrayList<AMQQueue>>();
+ _dequeueMap = new HashMap<Long, ArrayList<AMQQueue>>();
+ }
+
+ public void commitTransaction() throws AMQException
+ {
+ _dequeueMap.clear();
+ }
+
+ public void abortTransaction() throws AMQException
+ {
+ _enqueueMap.clear();
+ }
+
+ public boolean inTransaction()
+ {
+ return _payload != null;
+ }
+
+ public boolean isAsync()
+ {
+ return _async;
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
new file mode 100644
index 0000000000..4c3f1fcc49
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
@@ -0,0 +1,234 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transactionlog;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class BaseTransactionLog implements TransactionLog
+{
+ private static final Logger _logger = Logger.getLogger(BaseTransactionLog.class);
+
+ TransactionLog _delegate;
+ private Map<Long, ArrayList<AMQQueue>> _idToQueues = new HashMap<Long, ArrayList<AMQQueue>>();
+
+ public BaseTransactionLog(TransactionLog delegate)
+ {
+ _delegate = delegate;
+ }
+
+ public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+ {
+ return _delegate.configure(virtualHost, base, config);
+ }
+
+ public void close() throws Exception
+ {
+ _delegate.close();
+ }
+
+ public void enqueueMessage(StoreContext context, ArrayList<AMQQueue> queues, Long messageId) throws AMQException
+ {
+ context.enqueueMessage(queues, messageId);
+
+ if (queues.size() > 0)
+ {
+ _logger.info("Recording Enqueue of (" + messageId + ") on queue:" + queues);
+
+ //Clone the list incase someone else changes it.
+ _idToQueues.put(messageId, (ArrayList) queues.clone());
+ }
+
+ _delegate.enqueueMessage(context, queues, messageId);
+ }
+
+ public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
+ {
+ if (context.inTransaction())
+ {
+ context.dequeueMessage(queue, messageId);
+
+ Map<Long, ArrayList<AMQQueue>> messageMap = context.getDequeueMap();
+
+ //For each Message ID that is in the map check
+ for (Long messageID : messageMap.keySet())
+ {
+ //If we don't have a gloabl reference for this message then there is only a single enqueue
+ if (_idToQueues.get(messageID) == null)
+ {
+ // Add the removal of the message to this transaction
+ _delegate.removeMessage(context,messageID);
+ // Remove this message ID as we have processed it so we don't reprocess after the main commmit
+ messageMap.remove(messageID);
+ }
+ }
+ }
+
+ _delegate.dequeueMessage(context, queue, messageId);
+
+ if (!context.inTransaction())
+ {
+ HashMap<Long, ArrayList<AMQQueue>> dequeue = new HashMap<Long, ArrayList<AMQQueue>>();
+ ArrayList list = new ArrayList();
+ list.add(queue);
+ dequeue.put(messageId, list);
+ processDequeues(dequeue);
+ }
+ }
+
+ /**
+ * This should not be called from main broker code.
+ * // Perhaps we need a new interface:
+ *
+ * Broker <->TransactionLog
+ * Broker <->BaseTransactionLog<->(Log with removeMessage())
+ */
+ public void removeMessage(StoreContext context, Long messageId) throws AMQException
+ {
+ _delegate.removeMessage(context, messageId);
+ }
+
+ public void beginTran(StoreContext context) throws AMQException
+ {
+ context.beginTransaction();
+ _delegate.beginTran(context);
+ }
+
+ public void commitTran(StoreContext context) throws AMQException
+ {
+ //Perform real commit of current data
+ _delegate.commitTran(context);
+
+ // If we have dequeues to process then process them
+ if (context.getDequeueMap() != null)
+ {
+ processDequeues(context.getDequeueMap());
+ }
+
+ //Commit the recorded state for this transaction.
+ context.commitTransaction();
+ }
+
+ public void abortTran(StoreContext context) throws AMQException
+ {
+ // If we have enqueues to rollback
+ if (context.getEnqueueMap() != null)
+ {
+ processDequeues(context.getEnqueueMap());
+ }
+ //Abort the recorded state for this transaction.
+ context.abortTransaction();
+
+ _delegate.abortTran(context);
+ }
+
+ private void processDequeues(Map<Long, ArrayList<AMQQueue>> messageMap)
+ throws AMQException
+ {
+ // Process any enqueues to bring our model up to date.
+ Set<Long> messageIDs = messageMap.keySet();
+
+ //Create a new Asynchronous Context.
+ StoreContext removeContext = new StoreContext(true);
+
+ //Batch Process the Dequeues on the delegate
+ _delegate.beginTran(removeContext);
+
+ try
+ {
+ //For each Message ID Decrement the reference for each of the queues it was on.
+ for (Long messageID : messageIDs)
+ {
+ ArrayList<AMQQueue> queueList = messageMap.get(messageID);
+
+ // For each of the queues decrement the reference
+ for (AMQQueue queue : queueList)
+ {
+ ArrayList<AMQQueue> enqueuedList = _idToQueues.get(messageID);
+
+ // If we have no mapping then this message was only enqueued on a single queue
+ // This will be the case when we are not in a larger transaction
+ if (enqueuedList == null)
+ {
+ _delegate.removeMessage(removeContext, messageID);
+ }
+ else
+ {
+ // Update the enqueued list
+ enqueuedList.remove(queue);
+
+ // If the list is now empty then remove the message
+ if (enqueuedList.isEmpty())
+ {
+ _delegate.removeMessage(removeContext, messageID);
+ }
+ }
+ }
+ }
+
+ //Commit the removes on the delegate.
+ _delegate.commitTran(removeContext);
+ }
+ finally
+ {
+ if (removeContext.inTransaction())
+ {
+ _delegate.abortTran(removeContext);
+ }
+ }
+ }
+
+ public boolean inTran(StoreContext context)
+ {
+ return _delegate.inTran(context);
+ }
+
+ public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException
+ {
+ _delegate.storeContentBodyChunk(context, messageId, index, contentBody, lastContentBody);
+ }
+
+ public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException
+ {
+ _delegate.storeMessageMetaData(context, messageId, messageMetaData);
+ }
+
+ public boolean isPersistent()
+ {
+ return _delegate.isPersistent();
+ }
+
+ public TransactionLog getDelegate()
+ {
+ return _delegate;
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java
index 97a1ecb38c..73d57df6e6 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java
@@ -20,19 +20,16 @@
*/
package org.apache.qpid.server.transactionlog;
-import org.apache.commons.configuration.Configuration;
-
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.store.StoreContext;
+import java.util.ArrayList;
+
/**
* TransactionLog defines the interface for performing transactions.
* This is used to preserve the state of messages, queues
@@ -68,7 +65,7 @@ public interface TransactionLog
*
* @throws Exception If any error occurs that means the store is unable to configure itself.
*/
- void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception;
+ Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception;
/**
* Called to close and cleanup any resources used by the message store.
@@ -81,27 +78,33 @@ public interface TransactionLog
* Places a message onto a specified queue, in a given transactional context.
*
* @param context The transactional context for the operation.
- * @param queue The queue to place the message on.
- * @param messageId The message to enqueue.
- * @throws AMQException If the operation fails for any reason.
+ * @param queues
+ *@param messageId The message to enqueue. @throws AMQException If the operation fails for any reason. @throws org.apache.qpid.AMQException
*/
- void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException;
+ void enqueueMessage(StoreContext context, final ArrayList<AMQQueue> queues, Long messageId) throws AMQException;
/**
* Extracts a message from a specified queue, in a given transactional context.
*
* @param context The transactional context for the operation.
- * @param queue The queue to place the message on.
- * @param messageId The message to dequeue.
- * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
+ * @param queue
+ * @param messageId The message to dequeue. @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
*/
void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException;
/**
+ * Remove the specified message from the log
+ *
+ * @param context The transactional context for the operation
+ * @param messageId The message to remove
+ * @throws AMQException
+ */
+ void removeMessage(StoreContext context, Long messageId) throws AMQException;
+
+ /**
* Begins a transactional context.
*
* @param context The transactional context to begin.
- *
* @throws AMQException If the operation fails for any reason.
*/
void beginTran(StoreContext context) throws AMQException;
@@ -158,31 +161,31 @@ public interface TransactionLog
* @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
*/
void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException;
-
- /**
- * Retrieves message meta-data.
- *
- * @param context The transactional context for the operation.
- * @param messageId The message to get the meta-data for.
- *
- * @return The message meta data.
- *
- * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
- */
- MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException;
-
- /**
- * Retrieves a chunk of message data.
- *
- * @param context The transactional context for the operation.
- * @param messageId The message to get the data chunk for.
- * @param index The offset index of the data chunk within the message.
- *
- * @return A chunk of message data.
- *
- * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
- */
- ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException;
+//
+// /**
+// * Retrieves message meta-data.
+// *
+// * @param context The transactional context for the operation.
+// * @param messageId The message to get the meta-data for.
+// *
+// * @return The message meta data.
+// *
+// * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
+// */
+// MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException;
+//
+// /**
+// * Retrieves a chunk of message data.
+// *
+// * @param context The transactional context for the operation.
+// * @param messageId The message to get the data chunk for.
+// * @param index The offset index of the data chunk within the message.
+// *
+// * @return A chunk of message data.
+// *
+// * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
+// */
+// ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException;
/**
* Is this store capable of persisting the data
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index 8a8cbd23cf..7bcfb9f59a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -51,6 +51,7 @@ import org.apache.qpid.server.security.access.Accessable;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.transactionlog.BaseTransactionLog;
import javax.management.NotCompliantMBeanException;
import java.util.Collections;
@@ -206,6 +207,14 @@ public class VirtualHost implements Accessable
{
_routingTable = (RoutingTable) _transactionLog;
}
+ else if (_transactionLog instanceof BaseTransactionLog)
+ {
+ TransactionLog delegate = ((BaseTransactionLog) _transactionLog).getDelegate();
+ if (delegate instanceof RoutingTable)
+ {
+ _routingTable = (RoutingTable) delegate;
+ }
+ }
}
else
{
@@ -292,7 +301,8 @@ public class VirtualHost implements Accessable
_routingTable = (RoutingTable) _transactionLog;
}
- _transactionLog.configure(this, "store", config);
+ // If a TransactionLog uses the BaseTransactionLog then it will return this object.
+ _transactionLog = (TransactionLog) _transactionLog.configure(this, "store", config);
}
//todo we need to move from store.class to transactionlog.class
@@ -497,8 +507,9 @@ public class VirtualHost implements Accessable
public List<CreateQueueTuple> queue = new LinkedList<CreateQueueTuple>();
public List<CreateBindingTuple> bindings = new LinkedList<CreateBindingTuple>();
- public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+ public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
{
+ return null;
}
public void close() throws Exception
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
index d7844730d1..eb9c8653af 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
@@ -26,6 +26,7 @@ import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.store.StoreContext;
import java.util.ArrayList;
@@ -111,7 +112,7 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
arguments.put(AMQQueueFactory.X_QPID_PRIORITIES, PRIORITIES);
// Create IncomingMessage and nondurable queue
- NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
+ NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, new StoreContext(), null, null);
//Create a priorityQueue
_queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testMessagesFlowToDiskWithPriority"), false, _owner, false, _virtualHost, arguments);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 3d189ae6c5..89dbc4f959 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -113,7 +113,7 @@ public class AMQQueueMBeanTest extends TestCase
private void verifyBrokerState()
{
- TestableMemoryMessageStore store = new TestableMemoryMessageStore((MemoryMessageStore) _virtualHost.getTransactionLog());
+ TestableMemoryMessageStore store = new TestableMemoryMessageStore(_virtualHost.getTransactionLog());
// Unlike MessageReturnTest there is no need for a delay as there this thread does the clean up.
assertNotNull("ContentBodyMap should not be null", store.getContentBodyMap());
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
index 9f8d5f9a99..3280516b56 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
@@ -75,7 +75,7 @@ public class AckTest extends TestCase
ApplicationRegistry.initialise(new NullApplicationRegistry(), 1);
VirtualHost vhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test");
- _messageStore = new TestableMemoryMessageStore((MemoryMessageStore)vhost.getTransactionLog());
+ _messageStore = new TestableMemoryMessageStore(vhost.getTransactionLog());
_protocolSession = new MockProtocolSession(_messageStore);
_channel = new AMQChannel(_protocolSession,5, _messageStore /*dont need exchange registry*/);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java
index 7a944a5399..d007913a4f 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java
@@ -109,7 +109,7 @@ public class PersistentMessageTest extends TransientMessageTest
// Check that it was not enqueued
List<AMQQueue> queueList = _messageStore.getMessageReferenceMap(messageId);
- assertNull("TransactionLog contains a queue reference for this messageID:" + messageId, queueList);
+ assertTrue("TransactionLog contains a queue reference for this messageID:" + messageId, queueList == null || queueList.isEmpty());
checkMessageMetaDataRemoved(messageId);
assertEquals("Return message count not correct", 1, _returnMessages.size());
@@ -152,8 +152,8 @@ public class PersistentMessageTest extends TransientMessageTest
{
assertNull("Message MetaData still exists for message:" + messageId,
_messageStore.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId));
- assertNull("Message still has values in the reference map:" + messageId,
- _messageStore.getMessageReferenceMap(messageId));
+ List ids = _messageStore.getMessageReferenceMap(messageId);
+ assertTrue("Message still has values in the reference map:" + messageId, ids == null || ids.isEmpty());
}
catch (AMQException e)
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index f39dfe765e..d4b1de29b2 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -319,7 +319,7 @@ public class SimpleAMQQueueTest extends TestCase
public void testEnqueueDequeueOfPersistentMessageToNonDurableQueue() throws AMQException
{
// Create IncomingMessage and nondurable queue
- NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
+ NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, new StoreContext(), null, null);
IncomingMessage msg = new IncomingMessage(info, txnContext, new MockProtocolSession(_transactionLog), _transactionLog);
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
@@ -351,17 +351,17 @@ public class SimpleAMQQueueTest extends TestCase
MockQueueEntry entry = new MockQueueEntry(message, _queue);
entry.getQueueEntryList().add(message);
entry.acquire();
- entry.dequeue(null);
+ entry.dequeue(new StoreContext());
// Check that it is dequeued
data = _transactionLog.getMessageReferenceMap(messageId);
- assertNull(data);
+ assertTrue(data == null || data.isEmpty());
}
public void testMessagesFlowToDisk() throws AMQException, InterruptedException
{
// Create IncomingMessage and nondurable queue
- NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
+ NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, new StoreContext(), null, null);
MESSAGE_SIZE = 1;
long MEMORY_MAX = 500;
@@ -431,7 +431,7 @@ public class SimpleAMQQueueTest extends TestCase
public void testMessagesFlowToDiskPurger() throws AMQException, InterruptedException
{
// Create IncomingMessage and nondurable queue
- NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
+ NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, new StoreContext(), null, null);
MESSAGE_SIZE = 1;
/** Set to larger than the purge batch size. Default 100.
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
index 0a30d855b3..d6e658958e 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
@@ -34,6 +34,7 @@ import org.apache.qpid.server.transactionlog.TransactionLog;
import org.apache.qpid.server.routing.RoutingTable;
import java.util.List;
+import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -48,9 +49,9 @@ public class SkeletonMessageStore implements TransactionLog , RoutingTable
{
}
- public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+ public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
{
- //To change body of implemented methods use File | Settings | File Templates.
+ return this;
}
public void close() throws Exception
@@ -146,7 +147,7 @@ public class SkeletonMessageStore implements TransactionLog , RoutingTable
}
- public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+ public void enqueueMessage(StoreContext context, final ArrayList<AMQQueue> queues, Long messageId) throws AMQException
{
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
index 456e816a52..fa5cdc1aa5 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
@@ -21,155 +21,191 @@
package org.apache.qpid.server.store;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.routing.RoutingTable;
+import org.apache.qpid.server.transactionlog.BaseTransactionLog;
import org.apache.qpid.server.transactionlog.TransactionLog;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.commons.configuration.Configuration;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.HashMap;
+import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
-/**
- * Adds some extra methods to the memory message store for testing purposes.
- */
+/** Adds some extra methods to the memory message store for testing purposes. */
public class TestableMemoryMessageStore implements TestTransactionLog, TransactionLog, RoutingTable
{
+ private TransactionLog _transactionLog;
+ private RoutingTable _routingTable;
+ private MemoryMessageStore _mms;
+
+ public TestableMemoryMessageStore(TransactionLog log)
+ {
+ _transactionLog = log;
+ if (log instanceof BaseTransactionLog)
+ {
+ TransactionLog delegate = ((BaseTransactionLog) log).getDelegate();
+ if (delegate instanceof RoutingTable)
+ {
+ _routingTable = (RoutingTable) delegate;
+ }
+ else
+ {
+ throw new RuntimeException("Specified BaseTransactionLog does not delegate to a RoutingTable:" + log);
+ }
+
+ if (delegate instanceof MemoryMessageStore)
+ {
+ _mms = (MemoryMessageStore) delegate;
+ }
+
+ }
+ else
+ {
+ throw new RuntimeException("Specified BaseTransactionLog is not testable:" + log);
+ }
- MemoryMessageStore _mms = null;
+ }
public TestableMemoryMessageStore(MemoryMessageStore mms)
{
- _mms = mms;
+ _routingTable = mms;
+ _transactionLog = mms.configure();
}
public TestableMemoryMessageStore()
{
_mms = new MemoryMessageStore();
- _mms.configure();
+ _transactionLog = _mms.configure();
+ _routingTable = _mms;
}
public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap()
{
- return _mms._metaDataMap;
+ return ((MemoryMessageStore) _routingTable)._metaDataMap;
}
public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap()
{
- return _mms._contentBodyMap;
+ return ((MemoryMessageStore) _routingTable)._contentBodyMap;
}
public List<AMQQueue> getMessageReferenceMap(Long messageId)
{
- return _mms._messageEnqueueMap.get(messageId);
+// return _mms._messageEnqueueMap.get(messageId);
+// ((BaseTransactionLog)_transactionLog).
+ return new ArrayList<AMQQueue>();
}
- public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+ public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
{
- _mms.configure(virtualHost,base,config);
+ _transactionLog = (TransactionLog) _transactionLog.configure(virtualHost, base, config);
+ return _transactionLog;
}
public void close() throws Exception
{
- _mms.close();
+ _transactionLog.close();
+ _routingTable.close();
}
public void createExchange(Exchange exchange) throws AMQException
{
- _mms.createExchange(exchange);
+ _routingTable.createExchange(exchange);
}
public void removeExchange(Exchange exchange) throws AMQException
{
- _mms.removeExchange(exchange);
+ _routingTable.removeExchange(exchange);
}
public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
{
- _mms.bindQueue(exchange,routingKey,queue,args);
+ _routingTable.bindQueue(exchange, routingKey, queue, args);
}
public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
{
- _mms.unbindQueue(exchange,routingKey,queue,args);
+ _routingTable.unbindQueue(exchange, routingKey, queue, args);
}
public void createQueue(AMQQueue queue) throws AMQException
{
- _mms.createQueue(queue);
+ _routingTable.createQueue(queue);
}
public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
{
- _mms.createQueue(queue,arguments);
+ _routingTable.createQueue(queue, arguments);
}
public void removeQueue(AMQQueue queue) throws AMQException
{
- _mms.removeQueue(queue);
+ _routingTable.removeQueue(queue);
}
- public void enqueueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
+ public void enqueueMessage(StoreContext context, ArrayList<AMQQueue> queues, Long messageId) throws AMQException
{
- _mms.enqueueMessage(context,queue,messageId);
+ _transactionLog.enqueueMessage(context, queues, messageId);
}
public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
{
- _mms.dequeueMessage(context,queue,messageId);
+ _transactionLog.dequeueMessage(context, queue, messageId);
+ }
+
+ public void removeMessage(StoreContext context, Long messageId) throws AMQException
+ {
+ _transactionLog.removeMessage(context, messageId);
}
public void beginTran(StoreContext context) throws AMQException
{
- _mms.beginTran(context);
+ _transactionLog.beginTran(context);
}
public void commitTran(StoreContext context) throws AMQException
{
- _mms.commitTran(context);
+ _transactionLog.commitTran(context);
}
public void abortTran(StoreContext context) throws AMQException
{
- _mms.abortTran(context);
+ _transactionLog.abortTran(context);
}
public boolean inTran(StoreContext context)
{
- return _mms.inTran(context);
+ return _transactionLog.inTran(context);
}
public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException
{
- _mms.storeContentBodyChunk(context,messageId,index,contentBody,lastContentBody);
+ _transactionLog.storeContentBodyChunk(context, messageId, index, contentBody, lastContentBody);
}
public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException
{
- _mms.storeMessageMetaData(context,messageId,messageMetaData);
+ _transactionLog.storeMessageMetaData(context, messageId, messageMetaData);
}
- public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
+ public boolean isPersistent()
{
- return _mms.getMessageMetaData(context,messageId);
+ return _transactionLog.isPersistent();
}
- public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
+ public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
{
- return _mms.getContentBodyChunk(context,messageId,index);
+ return _mms.getMessageMetaData(context, messageId);
}
- public boolean isPersistent()
+ public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
{
- return _mms.isPersistent();
+ return _mms.getContentBodyChunk(context, messageId, index);
}
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java
new file mode 100644
index 0000000000..d3294d4c10
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java
@@ -0,0 +1,535 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transactionlog;
+
+import junit.framework.TestCase;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.server.queue.MockAMQQueue;
+import org.apache.qpid.server.queue.MockContentChunk;
+import org.apache.qpid.server.queue.MockPersistentAMQMessage;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+public class BaseTransactionLogTest extends TestCase implements TransactionLog
+{
+ private boolean _inTransaction;
+ final private Map<Long, ArrayList<AMQQueue>> _enqueues = new HashMap<Long, ArrayList<AMQQueue>>();
+ final private Map<Long, ArrayList<ContentChunk>> _storeChunks = new HashMap<Long, ArrayList<ContentChunk>>();
+ final private Map<Long, MessageMetaData> _storeMetaData = new HashMap<Long, MessageMetaData>();
+
+ BaseTransactionLog _transactionLog;
+ private ArrayList<AMQQueue> _queues;
+ private MockPersistentAMQMessage _message;
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _transactionLog = new BaseTransactionLog(this);
+ }
+
+ public void testSingleEnqueueNoTransactional() throws AMQException
+ {
+ //Store Data
+
+ _message = new MockPersistentAMQMessage(1L, this);
+
+ _message.addContentBodyFrame(new StoreContext(), new MockContentChunk(100), true);
+
+ MessagePublishInfo mpi = new MessagePublishInfoImpl();
+
+ ContentHeaderBody chb = new ContentHeaderBody();
+
+ _message.setPublishAndContentHeaderBody(new StoreContext(), mpi, chb);
+
+ verifyMessageStored(_message.getMessageId());
+ // Enqueue
+
+ _queues = new ArrayList<AMQQueue>();
+ MockAMQQueue queue = new MockAMQQueue(this.getName());
+ _queues.add(queue);
+
+ _transactionLog.enqueueMessage(new StoreContext(), _queues, _message.getMessageId());
+
+ verifyEnqueuedOnQueues(_message.getMessageId(), _queues);
+ }
+
+ public void testSingleDequeueNoTransaction() throws AMQException
+ {
+ // Enqueue a message to dequeue
+ testSingleEnqueueNoTransactional();
+
+ _transactionLog.dequeueMessage(new StoreContext(),_queues.get(0), _message.getMessageId());
+
+ assertNull("Message enqueued", _enqueues.get(_message.getMessageId()));
+ assertNull("Message enqueued", _storeChunks.get(_message.getMessageId()));
+ assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId()));
+ }
+
+ public void testSingleEnqueueTransactional() throws AMQException
+ {
+ StoreContext context = new StoreContext();
+
+ _transactionLog.beginTran(context);
+
+ //Store Data
+ _message = new MockPersistentAMQMessage(1L, this);
+
+ _message.addContentBodyFrame(context, new MockContentChunk(100), true);
+
+ MessagePublishInfo mpi = new MessagePublishInfoImpl();
+
+ ContentHeaderBody chb = new ContentHeaderBody();
+
+ _message.setPublishAndContentHeaderBody(context, mpi, chb);
+
+ _transactionLog.commitTran(context);
+
+ verifyMessageStored(_message.getMessageId());
+
+ // Enqueue
+ _transactionLog.beginTran(context);
+
+ _queues = new ArrayList<AMQQueue>();
+ MockAMQQueue queue = new MockAMQQueue(this.getName());
+ _queues.add(queue);
+
+ _transactionLog.enqueueMessage(context, _queues, _message.getMessageId());
+
+ _transactionLog.commitTran(context);
+ verifyEnqueuedOnQueues(_message.getMessageId(), _queues);
+ }
+
+ public void testSingleDequeueTransaction() throws AMQException
+ {
+ // Enqueue a message to dequeue
+ testSingleEnqueueTransactional();
+
+ StoreContext context = new StoreContext();
+
+ _transactionLog.beginTran(context);
+
+ _transactionLog.dequeueMessage(context,_queues.get(0), _message.getMessageId());
+
+ _transactionLog.commitTran(context);
+
+ assertNull("Message enqueued", _enqueues.get(_message.getMessageId()));
+ assertNull("Message enqueued", _storeChunks.get(_message.getMessageId()));
+ assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId()));
+ }
+
+
+ public void testMultipleEnqueueNoTransactional() throws AMQException
+ {
+ //Store Data
+
+ _message = new MockPersistentAMQMessage(1L, this);
+
+ _message.addContentBodyFrame(new StoreContext(), new MockContentChunk(100), true);
+
+ MessagePublishInfo mpi = new MessagePublishInfoImpl();
+
+ ContentHeaderBody chb = new ContentHeaderBody();
+
+ _message.setPublishAndContentHeaderBody(new StoreContext(), mpi, chb);
+
+ verifyMessageStored(_message.getMessageId());
+ // Enqueue
+
+ _queues = new ArrayList<AMQQueue>();
+
+ MockAMQQueue queue = new MockAMQQueue(this.getName());
+ _queues.add(queue);
+
+ queue = new MockAMQQueue(this.getName() + "2");
+ _queues.add(queue);
+
+ queue = new MockAMQQueue(this.getName() + "3");
+ _queues.add(queue);
+
+ _transactionLog.enqueueMessage(new StoreContext(), _queues, _message.getMessageId());
+
+ verifyEnqueuedOnQueues(_message.getMessageId(), _queues);
+ }
+
+ public void testMultipleDequeueNoTransaction() throws AMQException
+ {
+ // Enqueue a message to dequeue
+ testMultipleEnqueueNoTransactional();
+
+ _transactionLog.dequeueMessage(new StoreContext(),_queues.get(0), _message.getMessageId());
+
+ ArrayList<AMQQueue> enqueued = _enqueues.get(_message.getMessageId());
+ assertNotNull("Message not enqueued", enqueued);
+ assertFalse("Message still enqueued on the first queue,",enqueued.contains(_queues.get(0)));
+ assertEquals("Message should still be enqueued on 2 queues", 2, enqueued.size());
+
+ assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId()));
+ assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId()));
+
+
+ _transactionLog.dequeueMessage(new StoreContext(),_queues.get(1), _message.getMessageId());
+
+ enqueued = _enqueues.get(_message.getMessageId());
+ assertNotNull("Message not enqueued", enqueued);
+ assertFalse("Message still enqueued on the second queue,",enqueued.contains(_queues.get(1)));
+ assertEquals("Message should still be enqueued on 2 queues", 1, enqueued.size());
+
+ assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId()));
+ assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId()));
+
+ _transactionLog.dequeueMessage(new StoreContext(),_queues.get(2), _message.getMessageId());
+
+ assertNull("Message enqueued", _enqueues.get(_message.getMessageId()));
+ assertNull("Message enqueued", _storeChunks.get(_message.getMessageId()));
+ assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId()));
+ }
+
+
+ public void testMultipleEnqueueTransactional() throws AMQException
+ {
+ StoreContext context = new StoreContext();
+
+ _transactionLog.beginTran(context);
+
+ //Store Data
+ _message = new MockPersistentAMQMessage(1L, this);
+
+ _message.addContentBodyFrame(context, new MockContentChunk(100), true);
+
+ MessagePublishInfo mpi = new MessagePublishInfoImpl();
+
+ ContentHeaderBody chb = new ContentHeaderBody();
+
+ _message.setPublishAndContentHeaderBody(context, mpi, chb);
+
+ _transactionLog.commitTran(context);
+
+ verifyMessageStored(_message.getMessageId());
+
+ // Enqueue
+ _transactionLog.beginTran(context);
+
+ _queues = new ArrayList<AMQQueue>();
+ MockAMQQueue queue = new MockAMQQueue(this.getName());
+ _queues.add(queue);
+
+ queue = new MockAMQQueue(this.getName() + "2");
+ _queues.add(queue);
+
+ queue = new MockAMQQueue(this.getName() + "3");
+ _queues.add(queue);
+
+ _transactionLog.enqueueMessage(context, _queues, _message.getMessageId());
+
+ _transactionLog.commitTran(context);
+ verifyEnqueuedOnQueues(_message.getMessageId(), _queues);
+ }
+
+ public void testMultipleDequeueMultipleTransactions() throws AMQException
+ {
+ // Enqueue a message to dequeue
+ testMultipleEnqueueTransactional();
+
+ StoreContext context = new StoreContext();
+
+ _transactionLog.beginTran(context);
+
+ _transactionLog.dequeueMessage(context, _queues.get(0), _message.getMessageId());
+
+ _transactionLog.commitTran(context);
+ ArrayList<AMQQueue> enqueued = _enqueues.get(_message.getMessageId());
+ assertNotNull("Message not enqueued", enqueued);
+ assertFalse("Message still enqueued on the first queue,", enqueued.contains(_queues.get(0)));
+ assertEquals("Message should still be enqueued on 2 queues", 2, enqueued.size());
+
+ assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId()));
+ assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId()));
+
+ _transactionLog.beginTran(context);
+
+ _transactionLog.dequeueMessage(context, _queues.get(1), _message.getMessageId());
+
+ _transactionLog.commitTran(context);
+
+ enqueued = _enqueues.get(_message.getMessageId());
+ assertNotNull("Message not enqueued", enqueued);
+ assertFalse("Message still enqueued on the second queue,", enqueued.contains(_queues.get(1)));
+ assertEquals("Message should still be enqueued on 2 queues", 1, enqueued.size());
+
+ assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId()));
+ assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId()));
+
+ _transactionLog.beginTran(context);
+
+ _transactionLog.dequeueMessage(context, _queues.get(2), _message.getMessageId());
+
+ _transactionLog.commitTran(context);
+
+ assertNull("Message enqueued", _enqueues.get(_message.getMessageId()));
+ assertNull("Message enqueued", _storeChunks.get(_message.getMessageId()));
+ assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId()));
+ }
+
+ public void testMultipleDequeueSingleTransaction() throws AMQException
+ {
+ // Enqueue a message to dequeue
+ testMultipleEnqueueTransactional();
+
+ StoreContext context = new StoreContext();
+
+ _transactionLog.beginTran(context);
+
+ _transactionLog.dequeueMessage(context, _queues.get(0), _message.getMessageId());
+
+ ArrayList<AMQQueue> enqueued = _enqueues.get(_message.getMessageId());
+ assertNotNull("Message not enqueued", enqueued);
+ assertFalse("Message still enqueued on the first queue,", enqueued.contains(_queues.get(0)));
+ assertEquals("Message should still be enqueued on 2 queues", 2, enqueued.size());
+
+ assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId()));
+ assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId()));
+
+
+ _transactionLog.dequeueMessage(context, _queues.get(1), _message.getMessageId());
+
+
+ enqueued = _enqueues.get(_message.getMessageId());
+ assertNotNull("Message not enqueued", enqueued);
+ assertFalse("Message still enqueued on the second queue,", enqueued.contains(_queues.get(1)));
+ assertEquals("Message should still be enqueued on 2 queues", 1, enqueued.size());
+
+ assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId()));
+ assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId()));
+
+
+ _transactionLog.dequeueMessage(context, _queues.get(2), _message.getMessageId());
+
+ _transactionLog.commitTran(context);
+
+ assertNull("Message enqueued", _enqueues.get(_message.getMessageId()));
+ assertNull("Message enqueued", _storeChunks.get(_message.getMessageId()));
+ assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId()));
+ }
+
+ private void verifyMessageStored(Long messageId)
+ {
+ assertTrue("MessageMD has not been stored", _storeMetaData.containsKey(messageId));
+ assertTrue("Messasge Chunk has not been stored", _storeChunks.containsKey(messageId));
+ }
+
+ private void verifyEnqueuedOnQueues(Long messageId, ArrayList<AMQQueue> queues)
+ {
+ ArrayList<AMQQueue> enqueues = _enqueues.get(messageId);
+
+ assertNotNull("Message not enqueued", enqueues);
+ assertEquals("Message is not enqueued on the right number of queues", queues.size(), enqueues.size());
+ for (AMQQueue queue : queues)
+ {
+ assertTrue("Message not enqueued on:" + queue, enqueues.contains(queue));
+ }
+ }
+
+ /*************************** TransactionLog *******************************
+ *
+ * Simple InMemory TransactionLog that actually records enqueues/dequeues
+ */
+
+ /**
+ * @param virtualHost The virtual host using by this store
+ * @param base The base element identifier from which all configuration items are relative. For example, if
+ * the base element is "store", the all elements used by concrete classes will be "store.foo" etc.
+ * @param config The apache commons configuration object.
+ *
+ * @return
+ *
+ * @throws Exception
+ */
+ public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+ {
+ return this;
+ }
+
+ public void close() throws Exception
+ {
+ }
+
+ public void enqueueMessage(StoreContext context, ArrayList<AMQQueue> queues, Long messageId) throws AMQException
+ {
+ for (AMQQueue queue : queues)
+ {
+ enqueueMessage(messageId, queue);
+ }
+ }
+
+ private void enqueueMessage(Long messageId, AMQQueue queue)
+ {
+ ArrayList<AMQQueue> queues = _enqueues.get(messageId);
+
+ if (queues == null)
+ {
+ synchronized (_enqueues)
+ {
+ queues = _enqueues.get(messageId);
+ if (queues == null)
+ {
+ queues = new ArrayList<AMQQueue>();
+ _enqueues.put(messageId, queues);
+ }
+ }
+ }
+
+ synchronized (queues)
+ {
+ queues.add(queue);
+ }
+ }
+
+ public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
+ {
+ ArrayList<AMQQueue> queues = _enqueues.get(messageId);
+
+ if (queues == null)
+ {
+ throw new RuntimeException("Attempt to dequeue message(" + messageId + ") from " +
+ "queue(" + queue + ") but no enqueue data available");
+ }
+
+ synchronized (queues)
+ {
+ if (!queues.contains(queue))
+ {
+ throw new RuntimeException("Attempt to dequeue message(" + messageId + ") from " +
+ "queue(" + queue + ") but no message not enqueued on queue");
+ }
+
+ queues.remove(queue);
+ }
+ }
+
+ public void removeMessage(StoreContext context, Long messageId) throws AMQException
+ {
+ ArrayList<AMQQueue> queues;
+
+ synchronized (_enqueues)
+ {
+ queues = _enqueues.remove(messageId);
+ }
+
+ if (queues == null)
+ {
+ throw new RuntimeException("Attempt to remove message(" + messageId + ") but " +
+ "no enqueue data available");
+ }
+
+ if (!queues.isEmpty())
+ {
+ throw new RuntimeException("Removed a message(" + messageId + ") that still had references.");
+ }
+
+ synchronized (_storeMetaData)
+ {
+ _storeMetaData.remove(messageId);
+ }
+
+ synchronized (_storeChunks)
+ {
+ _storeChunks.remove(messageId);
+ }
+
+ }
+
+ //
+ // This class does not attempt to operate transactionally. It only knows when it should be in a transaction.
+ // Data is stored immediately.
+ //
+
+ public void beginTran(StoreContext context) throws AMQException
+ {
+ context.setPayload(new Object());
+ }
+
+ public void commitTran(StoreContext context) throws AMQException
+ {
+ context.setPayload(null);
+ }
+
+ public void abortTran(StoreContext context) throws AMQException
+ {
+ _inTransaction = false;
+ }
+
+ public boolean inTran(StoreContext context)
+ {
+ return _inTransaction;
+ }
+
+ public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException
+ {
+ ArrayList<ContentChunk> chunks = _storeChunks.get(messageId);
+
+ if (chunks == null)
+ {
+ synchronized (_storeChunks)
+ {
+ chunks = _storeChunks.get(messageId);
+ if (chunks == null)
+ {
+ chunks = new ArrayList<ContentChunk>();
+ _storeChunks.put(messageId, chunks);
+ }
+ }
+ }
+
+ synchronized (chunks)
+ {
+ chunks.add(contentBody);
+ }
+ }
+
+ public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException
+ {
+ if (_storeMetaData.get(messageId) != null)
+ {
+ throw new RuntimeException("Attempt to storeMessageMetaData for messageId(" + messageId + ") but data already exists");
+ }
+
+ synchronized (_storeMetaData)
+ {
+ _storeMetaData.put(messageId, messageMetaData);
+ }
+ }
+
+ public boolean isPersistent()
+ {
+ return false;
+ }
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java
index 26802b4210..1210423d1b 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java
@@ -69,7 +69,7 @@ public class TxnBufferTest extends TestCase
public void testCommitWithFailureDuringPrepare() throws AMQException
{
MockStore store = new MockStore();
- store.beginTran(null);
+ store.beginTran(new StoreContext());
TxnBuffer buffer = new TxnBuffer();
buffer.enlist(new StoreMessageOperation(store));
@@ -81,7 +81,7 @@ public class TxnBufferTest extends TestCase
try
{
- buffer.commit(null);
+ buffer.commit(new StoreContext());
}
catch (NoSuchElementException e)
{
@@ -95,7 +95,7 @@ public class TxnBufferTest extends TestCase
public void testCommitWithPersistance() throws AMQException
{
MockStore store = new MockStore();
- store.beginTran(null);
+ store.beginTran(new StoreContext());
store.expectCommit();
TxnBuffer buffer = new TxnBuffer();
@@ -105,7 +105,7 @@ public class TxnBufferTest extends TestCase
buffer.enlist(new StoreMessageOperation(store));
buffer.enlist(new TxnTester(store));
- buffer.commit(null);
+ buffer.commit(new StoreContext());
validateOps();
store.validate();
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualhostInitRoutingTableFromTransactionLogTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualhostInitRoutingTableFromTransactionLogTest.java
index 1274b99880..ed79f1cc4f 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualhostInitRoutingTableFromTransactionLogTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualhostInitRoutingTableFromTransactionLogTest.java
@@ -39,8 +39,7 @@ public class VirtualhostInitRoutingTableFromTransactionLogTest extends TestCase
_virtualHost = new VirtualHost(new VirtualHostConfiguration("test", env));
assertNotNull(_virtualHost.getTransactionLog());
- assertNotNull(_virtualHost.getRoutingTable());
- assertEquals(_virtualHost.getTransactionLog(),_virtualHost.getRoutingTable());
+ assertNotNull(_virtualHost.getRoutingTable());
}
catch (Exception e)
{
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
index c7c2c8b292..662f04b3c9 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
@@ -36,6 +36,7 @@ import org.apache.qpid.server.routing.RoutingTable;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.ArrayList;
public class SlowMessageStore implements TransactionLog, RoutingTable
{
@@ -50,7 +51,7 @@ public class SlowMessageStore implements TransactionLog, RoutingTable
private static final String POST = "post";
private String DEFAULT_DELAY = "default";
- public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+ public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
{
_logger.warn("Starting SlowMessageStore on Virtualhost:" + virtualHost.getName());
Configuration delays = config.getStoreConfiguration().subset(DELAYS);
@@ -81,7 +82,11 @@ public class SlowMessageStore implements TransactionLog, RoutingTable
_realTransactionLog = (TransactionLog) o;
}
}
- _realTransactionLog.configure(virtualHost, base , config);
+
+ // The call to configure may return a new transaction log
+ _realTransactionLog = (TransactionLog) _realTransactionLog.configure(virtualHost, base , config);
+
+ return this;
}
private void configureDelays(Configuration config)
@@ -205,10 +210,10 @@ public class SlowMessageStore implements TransactionLog, RoutingTable
doPostDelay("removeQueue");
}
- public void enqueueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
+ public void enqueueMessage(StoreContext context, ArrayList<AMQQueue> queues, Long messageId) throws AMQException
{
doPreDelay("enqueueMessage");
- _realTransactionLog.enqueueMessage(context, queue, messageId);
+ _realTransactionLog.enqueueMessage(context, queues, messageId);
doPostDelay("enqueueMessage");
}
@@ -219,6 +224,13 @@ public class SlowMessageStore implements TransactionLog, RoutingTable
doPostDelay("dequeueMessage");
}
+ public void removeMessage(StoreContext context, Long messageId) throws AMQException
+ {
+ doPreDelay("dequeueMessage");
+ _realTransactionLog.removeMessage(context, messageId);
+ doPostDelay("dequeueMessage");
+ }
+
public void beginTran(StoreContext context) throws AMQException
{
doPreDelay("beginTran");
@@ -262,22 +274,22 @@ public class SlowMessageStore implements TransactionLog, RoutingTable
doPostDelay("storeMessageMetaData");
}
- public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
- {
- doPreDelay("getMessageMetaData");
- MessageMetaData mmd = _realTransactionLog.getMessageMetaData(context, messageId);
- doPostDelay("getMessageMetaData");
- return mmd;
- }
-
- public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
- {
- doPreDelay("getContentBodyChunk");
- ContentChunk c = _realTransactionLog.getContentBodyChunk(context, messageId, index);
- doPostDelay("getContentBodyChunk");
- return c;
- }
-
+// public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
+// {
+// doPreDelay("getMessageMetaData");
+// MessageMetaData mmd = _realTransactionLog.getMessageMetaData(context, messageId);
+// doPostDelay("getMessageMetaData");
+// return mmd;
+// }
+//
+// public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
+// {
+// doPreDelay("getContentBodyChunk");
+// ContentChunk c = _realTransactionLog.getContentBodyChunk(context, messageId, index);
+// doPostDelay("getContentBodyChunk");
+// return c;
+// }
+//
public boolean isPersistent()
{
return _realTransactionLog.isPersistent();