diff options
Diffstat (limited to 'qpid/java')
20 files changed, 1124 insertions, 176 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java index 5eafd281c0..bab19fbc54 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java @@ -136,11 +136,7 @@ public class IncomingMessage implements Filterable<RuntimeException> if(_destinationQueues != null) { - for (int i = 0; i < _destinationQueues.size(); i++) - { - transactionLog.enqueueMessage(_txnContext.getStoreContext(), - _destinationQueues.get(i), getMessageId()); - } + transactionLog.enqueueMessage(_txnContext.getStoreContext(), _destinationQueues, getMessageId()); } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index ed9b1eb8d7..e5898ceda9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -849,7 +849,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { if (entry.isPersistent() && toQueue.isDurable()) { - transactionLog.enqueueMessage(storeContext, toQueue, entry.getMessageId()); + //FIXME + //fixme + ArrayList list = new ArrayList(); + list.add(toQueue); + transactionLog.enqueueMessage(storeContext, list, entry.getMessageId()); } // dequeue will remove the messages from the queue entry.dequeue(storeContext); @@ -941,10 +945,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { if (!entry.isDeleted() && entry.isPersistent() && toQueue.isDurable()) { - transactionLog.enqueueMessage(storeContext, toQueue, entry.getMessageId()); + //fixme + //FIXME + ArrayList list = new ArrayList(); + list.add(toQueue); + transactionLog.enqueueMessage(storeContext, list, entry.getMessageId()); } } + // Commit and flush the move transcations. try { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/routing/RoutingTable.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/routing/RoutingTable.java index 0c62638710..883a41b55f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/routing/RoutingTable.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/routing/RoutingTable.java @@ -42,7 +42,7 @@ public interface RoutingTable * * @throws Exception If any error occurs that means the store is unable to configure itself. */ - void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception; + Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception; /** * Called to close and cleanup any resources used by the message store. diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java index 33b3d8608e..157418d806 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java @@ -34,6 +34,7 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.transactionlog.TransactionLog; +import org.apache.qpid.server.transactionlog.BaseTransactionLog; import org.apache.qpid.server.routing.RoutingTable; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; @@ -41,7 +42,6 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; @@ -143,7 +143,7 @@ public class DerbyMessageStore implements TransactionLog, RoutingTable private State _state = State.INITIAL; - public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception + public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception { //Only initialise when loaded with the old 'store' confing ignore the new 'RoutingTable' config if (base.equals("store")) @@ -178,7 +178,9 @@ public class DerbyMessageStore implements TransactionLog, RoutingTable recover(); stateTransition(State.RECOVERING, State.STARTED); + return new BaseTransactionLog(this); } + return null; } private static synchronized void initialiseDriver() throws ClassNotFoundException @@ -825,7 +827,18 @@ public class DerbyMessageStore implements TransactionLog, RoutingTable } - public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException + public void enqueueMessage(StoreContext context, ArrayList<AMQQueue> queues, Long messageId) throws AMQException + { + for (AMQQueue q : queues) + { + if (q.isDurable()) + { + enqueueMessage(context,q,messageId); + } + } + } + + void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException { AMQShortString name = queue.getName(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index 3754b41a3e..d57b81c362 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.store; -import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; @@ -30,17 +29,14 @@ import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.routing.RoutingTable; import org.apache.qpid.server.transactionlog.TransactionLog; +import org.apache.qpid.server.transactionlog.BaseTransactionLog; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.ArrayList; import java.util.Collections; -import java.util.LinkedList; import java.util.List; -import java.util.Map; -import java.util.HashMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -67,16 +63,16 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable private final AtomicLong _messageId = new AtomicLong(1); private AtomicBoolean _closed = new AtomicBoolean(false); - protected final Map<Long, List<AMQQueue>> _messageEnqueueMap = new HashMap<Long, List<AMQQueue>>(); - public void configure() + public TransactionLog configure() { _log.info("Using capacity " + DEFAULT_HASHTABLE_CAPACITY + " for hash tables"); _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(DEFAULT_HASHTABLE_CAPACITY); _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(DEFAULT_HASHTABLE_CAPACITY); + return new BaseTransactionLog(this); } - public void configure(String base, VirtualHostConfiguration config) + public TransactionLog configure(String base, VirtualHostConfiguration config) { //Only initialise when called with current 'store' configs i.e. don't reinit when used as a 'RoutingTable' if (base.equals("store")) @@ -85,12 +81,14 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable _log.info("Using capacity " + hashtableCapacity + " for hash tables"); _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(hashtableCapacity); _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(hashtableCapacity); + return new BaseTransactionLog(this); } + return null; } - public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception + public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception { - configure(base, config); + return configure(base, config); } public void close() throws Exception @@ -108,7 +106,7 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable } } - private void removeMessage(StoreContext context, Long messageId) throws AMQException + public void removeMessage(StoreContext context, Long messageId) throws AMQException { checkNotClosed(); if (_log.isDebugEnabled()) @@ -117,7 +115,6 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable } _metaDataMap.remove(messageId); _contentBodyMap.remove(messageId); - _messageEnqueueMap.remove(messageId); } public void createExchange(Exchange exchange) throws AMQException @@ -155,41 +152,25 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable // Not required to do anything } - public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException + public void enqueueMessage(StoreContext context, final ArrayList<AMQQueue> queues, Long messageId) throws AMQException { - synchronized (_messageEnqueueMap) + for (AMQQueue q : queues) { - List<AMQQueue> queues = _messageEnqueueMap.get(messageId); - if (queues == null) + if (q.isDurable()) { - queues = new LinkedList<AMQQueue>(); - _messageEnqueueMap.put(messageId, queues); + enqueueMessage(context,q,messageId); } - - queues.add(queue); } } - public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException + public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException { - synchronized (_messageEnqueueMap) - { - List<AMQQueue> queues = _messageEnqueueMap.get(messageId); - if (queues == null || !queues.contains(queue)) - { - throw new RuntimeException("Attempt to dequeue messageID:" + messageId + " from queue:" + queue.getName() - + " but it is not enqueued on that queue."); - } - else - { - queues.remove(queue); - if (queues.isEmpty()) - { - removeMessage(context,messageId); - } - } - } + // Not required to do anything + } + public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException + { + // Not required to do anything } public void beginTran(StoreContext context) throws AMQException diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java index fdb56a1a55..b5ae8ea284 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java @@ -21,6 +21,12 @@ package org.apache.qpid.server.store; import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.queue.AMQQueue; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; /** * A context that the store can use to associate with a transactional context. For example, it could store @@ -32,17 +38,37 @@ public class StoreContext { private static final Logger _logger = Logger.getLogger(StoreContext.class); + private static final String DEFAULT_NAME = "StoreContext"; private String _name; private Object _payload; + private HashMap<Long, ArrayList<AMQQueue>> _enqueueMap; + private HashMap<Long, ArrayList<AMQQueue>> _dequeueMap; + private boolean _async; public StoreContext() { - _name = "StoreContext"; + this(DEFAULT_NAME); } public StoreContext(String name) { + this(name,false); + } + + /** + * + * @param name The name of this Transaction + * @param asynchrouous Is this Transaction Asynchronous + */ + public StoreContext(String name, boolean asynchrouous) + { _name = name; + _async = asynchrouous; + } + + public StoreContext(boolean asynchronous) + { + this(DEFAULT_NAME, asynchronous); } public Object getPayload() @@ -52,7 +78,7 @@ public class StoreContext public void setPayload(Object payload) { - if(_logger.isDebugEnabled()) + if (_logger.isDebugEnabled()) { _logger.debug("public void setPayload(Object payload = " + payload + "): called"); } @@ -68,4 +94,95 @@ public class StoreContext { return "<_name = " + _name + ", _payload = " + _payload + ">"; } + + public Map<Long, ArrayList<AMQQueue>> getEnqueueMap() + { + return _enqueueMap; + } + + public Map<Long, ArrayList<AMQQueue>> getDequeueMap() + { + return _dequeueMap; + } + + /** + * Record the enqueues for processing if we abort + * + * @param queues + * @param messageId + * + * @throws AMQException + */ + public void enqueueMessage(ArrayList<AMQQueue> queues, Long messageId) throws AMQException + { + if (inTransaction()) + { + ArrayList<AMQQueue> enqueues = _enqueueMap.get(messageId); + + if (enqueues == null) + { + enqueues = new ArrayList<AMQQueue>(); + _enqueueMap.put(messageId, enqueues); + } + + for (AMQQueue q : queues) + { + if (!enqueues.contains(q)) + { + enqueues.add(q); + } + } + + } + } + + /** + * Record the dequeue for processing on commit + * + * @param queue + * @param messageId + * + * @throws AMQException + */ + public void dequeueMessage(AMQQueue queue, Long messageId) throws AMQException + { + if (inTransaction()) + { + ArrayList<AMQQueue> dequeues = _dequeueMap.get(messageId); + + if (dequeues == null) + { + dequeues = new ArrayList<AMQQueue>(); + _dequeueMap.put(messageId, dequeues); + } + + dequeues.add(queue); + } + } + + public void beginTransaction() throws AMQException + { + _enqueueMap = new HashMap<Long, ArrayList<AMQQueue>>(); + _dequeueMap = new HashMap<Long, ArrayList<AMQQueue>>(); + } + + public void commitTransaction() throws AMQException + { + _dequeueMap.clear(); + } + + public void abortTransaction() throws AMQException + { + _enqueueMap.clear(); + } + + public boolean inTransaction() + { + return _payload != null; + } + + public boolean isAsync() + { + return _async; + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java new file mode 100644 index 0000000000..4c3f1fcc49 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java @@ -0,0 +1,234 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transactionlog; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.MessageMetaData; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.virtualhost.VirtualHost; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class BaseTransactionLog implements TransactionLog +{ + private static final Logger _logger = Logger.getLogger(BaseTransactionLog.class); + + TransactionLog _delegate; + private Map<Long, ArrayList<AMQQueue>> _idToQueues = new HashMap<Long, ArrayList<AMQQueue>>(); + + public BaseTransactionLog(TransactionLog delegate) + { + _delegate = delegate; + } + + public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception + { + return _delegate.configure(virtualHost, base, config); + } + + public void close() throws Exception + { + _delegate.close(); + } + + public void enqueueMessage(StoreContext context, ArrayList<AMQQueue> queues, Long messageId) throws AMQException + { + context.enqueueMessage(queues, messageId); + + if (queues.size() > 0) + { + _logger.info("Recording Enqueue of (" + messageId + ") on queue:" + queues); + + //Clone the list incase someone else changes it. + _idToQueues.put(messageId, (ArrayList) queues.clone()); + } + + _delegate.enqueueMessage(context, queues, messageId); + } + + public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException + { + if (context.inTransaction()) + { + context.dequeueMessage(queue, messageId); + + Map<Long, ArrayList<AMQQueue>> messageMap = context.getDequeueMap(); + + //For each Message ID that is in the map check + for (Long messageID : messageMap.keySet()) + { + //If we don't have a gloabl reference for this message then there is only a single enqueue + if (_idToQueues.get(messageID) == null) + { + // Add the removal of the message to this transaction + _delegate.removeMessage(context,messageID); + // Remove this message ID as we have processed it so we don't reprocess after the main commmit + messageMap.remove(messageID); + } + } + } + + _delegate.dequeueMessage(context, queue, messageId); + + if (!context.inTransaction()) + { + HashMap<Long, ArrayList<AMQQueue>> dequeue = new HashMap<Long, ArrayList<AMQQueue>>(); + ArrayList list = new ArrayList(); + list.add(queue); + dequeue.put(messageId, list); + processDequeues(dequeue); + } + } + + /** + * This should not be called from main broker code. + * // Perhaps we need a new interface: + * + * Broker <->TransactionLog + * Broker <->BaseTransactionLog<->(Log with removeMessage()) + */ + public void removeMessage(StoreContext context, Long messageId) throws AMQException + { + _delegate.removeMessage(context, messageId); + } + + public void beginTran(StoreContext context) throws AMQException + { + context.beginTransaction(); + _delegate.beginTran(context); + } + + public void commitTran(StoreContext context) throws AMQException + { + //Perform real commit of current data + _delegate.commitTran(context); + + // If we have dequeues to process then process them + if (context.getDequeueMap() != null) + { + processDequeues(context.getDequeueMap()); + } + + //Commit the recorded state for this transaction. + context.commitTransaction(); + } + + public void abortTran(StoreContext context) throws AMQException + { + // If we have enqueues to rollback + if (context.getEnqueueMap() != null) + { + processDequeues(context.getEnqueueMap()); + } + //Abort the recorded state for this transaction. + context.abortTransaction(); + + _delegate.abortTran(context); + } + + private void processDequeues(Map<Long, ArrayList<AMQQueue>> messageMap) + throws AMQException + { + // Process any enqueues to bring our model up to date. + Set<Long> messageIDs = messageMap.keySet(); + + //Create a new Asynchronous Context. + StoreContext removeContext = new StoreContext(true); + + //Batch Process the Dequeues on the delegate + _delegate.beginTran(removeContext); + + try + { + //For each Message ID Decrement the reference for each of the queues it was on. + for (Long messageID : messageIDs) + { + ArrayList<AMQQueue> queueList = messageMap.get(messageID); + + // For each of the queues decrement the reference + for (AMQQueue queue : queueList) + { + ArrayList<AMQQueue> enqueuedList = _idToQueues.get(messageID); + + // If we have no mapping then this message was only enqueued on a single queue + // This will be the case when we are not in a larger transaction + if (enqueuedList == null) + { + _delegate.removeMessage(removeContext, messageID); + } + else + { + // Update the enqueued list + enqueuedList.remove(queue); + + // If the list is now empty then remove the message + if (enqueuedList.isEmpty()) + { + _delegate.removeMessage(removeContext, messageID); + } + } + } + } + + //Commit the removes on the delegate. + _delegate.commitTran(removeContext); + } + finally + { + if (removeContext.inTransaction()) + { + _delegate.abortTran(removeContext); + } + } + } + + public boolean inTran(StoreContext context) + { + return _delegate.inTran(context); + } + + public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException + { + _delegate.storeContentBodyChunk(context, messageId, index, contentBody, lastContentBody); + } + + public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException + { + _delegate.storeMessageMetaData(context, messageId, messageMetaData); + } + + public boolean isPersistent() + { + return _delegate.isPersistent(); + } + + public TransactionLog getDelegate() + { + return _delegate; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java index 97a1ecb38c..73d57df6e6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java @@ -20,19 +20,16 @@ */ package org.apache.qpid.server.transactionlog; -import org.apache.commons.configuration.Configuration; - import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.server.configuration.VirtualHostConfiguration; -import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.store.StoreContext; +import java.util.ArrayList; + /** * TransactionLog defines the interface for performing transactions. * This is used to preserve the state of messages, queues @@ -68,7 +65,7 @@ public interface TransactionLog * * @throws Exception If any error occurs that means the store is unable to configure itself. */ - void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception; + Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception; /** * Called to close and cleanup any resources used by the message store. @@ -81,27 +78,33 @@ public interface TransactionLog * Places a message onto a specified queue, in a given transactional context. * * @param context The transactional context for the operation. - * @param queue The queue to place the message on. - * @param messageId The message to enqueue. - * @throws AMQException If the operation fails for any reason. + * @param queues + *@param messageId The message to enqueue. @throws AMQException If the operation fails for any reason. @throws org.apache.qpid.AMQException */ - void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException; + void enqueueMessage(StoreContext context, final ArrayList<AMQQueue> queues, Long messageId) throws AMQException; /** * Extracts a message from a specified queue, in a given transactional context. * * @param context The transactional context for the operation. - * @param queue The queue to place the message on. - * @param messageId The message to dequeue. - * @throws AMQException If the operation fails for any reason, or if the specified message does not exist. + * @param queue + * @param messageId The message to dequeue. @throws AMQException If the operation fails for any reason, or if the specified message does not exist. */ void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException; /** + * Remove the specified message from the log + * + * @param context The transactional context for the operation + * @param messageId The message to remove + * @throws AMQException + */ + void removeMessage(StoreContext context, Long messageId) throws AMQException; + + /** * Begins a transactional context. * * @param context The transactional context to begin. - * * @throws AMQException If the operation fails for any reason. */ void beginTran(StoreContext context) throws AMQException; @@ -158,31 +161,31 @@ public interface TransactionLog * @throws AMQException If the operation fails for any reason, or if the specified message does not exist. */ void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException; - - /** - * Retrieves message meta-data. - * - * @param context The transactional context for the operation. - * @param messageId The message to get the meta-data for. - * - * @return The message meta data. - * - * @throws AMQException If the operation fails for any reason, or if the specified message does not exist. - */ - MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException; - - /** - * Retrieves a chunk of message data. - * - * @param context The transactional context for the operation. - * @param messageId The message to get the data chunk for. - * @param index The offset index of the data chunk within the message. - * - * @return A chunk of message data. - * - * @throws AMQException If the operation fails for any reason, or if the specified message does not exist. - */ - ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException; +// +// /** +// * Retrieves message meta-data. +// * +// * @param context The transactional context for the operation. +// * @param messageId The message to get the meta-data for. +// * +// * @return The message meta data. +// * +// * @throws AMQException If the operation fails for any reason, or if the specified message does not exist. +// */ +// MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException; +// +// /** +// * Retrieves a chunk of message data. +// * +// * @param context The transactional context for the operation. +// * @param messageId The message to get the data chunk for. +// * @param index The offset index of the data chunk within the message. +// * +// * @return A chunk of message data. +// * +// * @throws AMQException If the operation fails for any reason, or if the specified message does not exist. +// */ +// ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException; /** * Is this store capable of persisting the data diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 8a8cbd23cf..7bcfb9f59a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -51,6 +51,7 @@ import org.apache.qpid.server.security.access.Accessable; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; import org.apache.qpid.server.transactionlog.TransactionLog; +import org.apache.qpid.server.transactionlog.BaseTransactionLog; import javax.management.NotCompliantMBeanException; import java.util.Collections; @@ -206,6 +207,14 @@ public class VirtualHost implements Accessable { _routingTable = (RoutingTable) _transactionLog; } + else if (_transactionLog instanceof BaseTransactionLog) + { + TransactionLog delegate = ((BaseTransactionLog) _transactionLog).getDelegate(); + if (delegate instanceof RoutingTable) + { + _routingTable = (RoutingTable) delegate; + } + } } else { @@ -292,7 +301,8 @@ public class VirtualHost implements Accessable _routingTable = (RoutingTable) _transactionLog; } - _transactionLog.configure(this, "store", config); + // If a TransactionLog uses the BaseTransactionLog then it will return this object. + _transactionLog = (TransactionLog) _transactionLog.configure(this, "store", config); } //todo we need to move from store.class to transactionlog.class @@ -497,8 +507,9 @@ public class VirtualHost implements Accessable public List<CreateQueueTuple> queue = new LinkedList<CreateQueueTuple>(); public List<CreateBindingTuple> bindings = new LinkedList<CreateBindingTuple>(); - public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception + public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception { + return null; } public void close() throws Exception diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java index d7844730d1..eb9c8653af 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java @@ -26,6 +26,7 @@ import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.txn.NonTransactionalContext; +import org.apache.qpid.server.store.StoreContext; import java.util.ArrayList; @@ -111,7 +112,7 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest arguments.put(AMQQueueFactory.X_QPID_PRIORITIES, PRIORITIES); // Create IncomingMessage and nondurable queue - NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null); + NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, new StoreContext(), null, null); //Create a priorityQueue _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testMessagesFlowToDiskWithPriority"), false, _owner, false, _virtualHost, arguments); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 3d189ae6c5..89dbc4f959 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -113,7 +113,7 @@ public class AMQQueueMBeanTest extends TestCase private void verifyBrokerState() { - TestableMemoryMessageStore store = new TestableMemoryMessageStore((MemoryMessageStore) _virtualHost.getTransactionLog()); + TestableMemoryMessageStore store = new TestableMemoryMessageStore(_virtualHost.getTransactionLog()); // Unlike MessageReturnTest there is no need for a delay as there this thread does the clean up. assertNotNull("ContentBodyMap should not be null", store.getContentBodyMap()); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java index 9f8d5f9a99..3280516b56 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java @@ -75,7 +75,7 @@ public class AckTest extends TestCase ApplicationRegistry.initialise(new NullApplicationRegistry(), 1); VirtualHost vhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"); - _messageStore = new TestableMemoryMessageStore((MemoryMessageStore)vhost.getTransactionLog()); + _messageStore = new TestableMemoryMessageStore(vhost.getTransactionLog()); _protocolSession = new MockProtocolSession(_messageStore); _channel = new AMQChannel(_protocolSession,5, _messageStore /*dont need exchange registry*/); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java index 7a944a5399..d007913a4f 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java @@ -109,7 +109,7 @@ public class PersistentMessageTest extends TransientMessageTest // Check that it was not enqueued List<AMQQueue> queueList = _messageStore.getMessageReferenceMap(messageId); - assertNull("TransactionLog contains a queue reference for this messageID:" + messageId, queueList); + assertTrue("TransactionLog contains a queue reference for this messageID:" + messageId, queueList == null || queueList.isEmpty()); checkMessageMetaDataRemoved(messageId); assertEquals("Return message count not correct", 1, _returnMessages.size()); @@ -152,8 +152,8 @@ public class PersistentMessageTest extends TransientMessageTest { assertNull("Message MetaData still exists for message:" + messageId, _messageStore.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId)); - assertNull("Message still has values in the reference map:" + messageId, - _messageStore.getMessageReferenceMap(messageId)); + List ids = _messageStore.getMessageReferenceMap(messageId); + assertTrue("Message still has values in the reference map:" + messageId, ids == null || ids.isEmpty()); } catch (AMQException e) diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index f39dfe765e..d4b1de29b2 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -319,7 +319,7 @@ public class SimpleAMQQueueTest extends TestCase public void testEnqueueDequeueOfPersistentMessageToNonDurableQueue() throws AMQException { // Create IncomingMessage and nondurable queue - NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null); + NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, new StoreContext(), null, null); IncomingMessage msg = new IncomingMessage(info, txnContext, new MockProtocolSession(_transactionLog), _transactionLog); ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); @@ -351,17 +351,17 @@ public class SimpleAMQQueueTest extends TestCase MockQueueEntry entry = new MockQueueEntry(message, _queue); entry.getQueueEntryList().add(message); entry.acquire(); - entry.dequeue(null); + entry.dequeue(new StoreContext()); // Check that it is dequeued data = _transactionLog.getMessageReferenceMap(messageId); - assertNull(data); + assertTrue(data == null || data.isEmpty()); } public void testMessagesFlowToDisk() throws AMQException, InterruptedException { // Create IncomingMessage and nondurable queue - NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null); + NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, new StoreContext(), null, null); MESSAGE_SIZE = 1; long MEMORY_MAX = 500; @@ -431,7 +431,7 @@ public class SimpleAMQQueueTest extends TestCase public void testMessagesFlowToDiskPurger() throws AMQException, InterruptedException { // Create IncomingMessage and nondurable queue - NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null); + NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, new StoreContext(), null, null); MESSAGE_SIZE = 1; /** Set to larger than the purge batch size. Default 100. diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java index 0a30d855b3..d6e658958e 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java @@ -34,6 +34,7 @@ import org.apache.qpid.server.transactionlog.TransactionLog; import org.apache.qpid.server.routing.RoutingTable; import java.util.List; +import java.util.ArrayList; import java.util.concurrent.atomic.AtomicLong; /** @@ -48,9 +49,9 @@ public class SkeletonMessageStore implements TransactionLog , RoutingTable { } - public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception + public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception { - //To change body of implemented methods use File | Settings | File Templates. + return this; } public void close() throws Exception @@ -146,7 +147,7 @@ public class SkeletonMessageStore implements TransactionLog , RoutingTable } - public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException + public void enqueueMessage(StoreContext context, final ArrayList<AMQQueue> queues, Long messageId) throws AMQException { } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java index 456e816a52..fa5cdc1aa5 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java @@ -21,155 +21,191 @@ package org.apache.qpid.server.store; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; +import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.routing.RoutingTable; +import org.apache.qpid.server.transactionlog.BaseTransactionLog; import org.apache.qpid.server.transactionlog.TransactionLog; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.configuration.VirtualHostConfiguration; -import org.apache.qpid.framing.abstraction.ContentChunk; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.commons.configuration.Configuration; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.HashMap; +import java.util.ArrayList; import java.util.List; -import java.util.Map; +import java.util.concurrent.ConcurrentMap; -/** - * Adds some extra methods to the memory message store for testing purposes. - */ +/** Adds some extra methods to the memory message store for testing purposes. */ public class TestableMemoryMessageStore implements TestTransactionLog, TransactionLog, RoutingTable { + private TransactionLog _transactionLog; + private RoutingTable _routingTable; + private MemoryMessageStore _mms; + + public TestableMemoryMessageStore(TransactionLog log) + { + _transactionLog = log; + if (log instanceof BaseTransactionLog) + { + TransactionLog delegate = ((BaseTransactionLog) log).getDelegate(); + if (delegate instanceof RoutingTable) + { + _routingTable = (RoutingTable) delegate; + } + else + { + throw new RuntimeException("Specified BaseTransactionLog does not delegate to a RoutingTable:" + log); + } + + if (delegate instanceof MemoryMessageStore) + { + _mms = (MemoryMessageStore) delegate; + } + + } + else + { + throw new RuntimeException("Specified BaseTransactionLog is not testable:" + log); + } - MemoryMessageStore _mms = null; + } public TestableMemoryMessageStore(MemoryMessageStore mms) { - _mms = mms; + _routingTable = mms; + _transactionLog = mms.configure(); } public TestableMemoryMessageStore() { _mms = new MemoryMessageStore(); - _mms.configure(); + _transactionLog = _mms.configure(); + _routingTable = _mms; } public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap() { - return _mms._metaDataMap; + return ((MemoryMessageStore) _routingTable)._metaDataMap; } public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap() { - return _mms._contentBodyMap; + return ((MemoryMessageStore) _routingTable)._contentBodyMap; } public List<AMQQueue> getMessageReferenceMap(Long messageId) { - return _mms._messageEnqueueMap.get(messageId); +// return _mms._messageEnqueueMap.get(messageId); +// ((BaseTransactionLog)_transactionLog). + return new ArrayList<AMQQueue>(); } - public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception + public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception { - _mms.configure(virtualHost,base,config); + _transactionLog = (TransactionLog) _transactionLog.configure(virtualHost, base, config); + return _transactionLog; } public void close() throws Exception { - _mms.close(); + _transactionLog.close(); + _routingTable.close(); } public void createExchange(Exchange exchange) throws AMQException { - _mms.createExchange(exchange); + _routingTable.createExchange(exchange); } public void removeExchange(Exchange exchange) throws AMQException { - _mms.removeExchange(exchange); + _routingTable.removeExchange(exchange); } public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException { - _mms.bindQueue(exchange,routingKey,queue,args); + _routingTable.bindQueue(exchange, routingKey, queue, args); } public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException { - _mms.unbindQueue(exchange,routingKey,queue,args); + _routingTable.unbindQueue(exchange, routingKey, queue, args); } public void createQueue(AMQQueue queue) throws AMQException { - _mms.createQueue(queue); + _routingTable.createQueue(queue); } public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException { - _mms.createQueue(queue,arguments); + _routingTable.createQueue(queue, arguments); } public void removeQueue(AMQQueue queue) throws AMQException { - _mms.removeQueue(queue); + _routingTable.removeQueue(queue); } - public void enqueueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException + public void enqueueMessage(StoreContext context, ArrayList<AMQQueue> queues, Long messageId) throws AMQException { - _mms.enqueueMessage(context,queue,messageId); + _transactionLog.enqueueMessage(context, queues, messageId); } public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException { - _mms.dequeueMessage(context,queue,messageId); + _transactionLog.dequeueMessage(context, queue, messageId); + } + + public void removeMessage(StoreContext context, Long messageId) throws AMQException + { + _transactionLog.removeMessage(context, messageId); } public void beginTran(StoreContext context) throws AMQException { - _mms.beginTran(context); + _transactionLog.beginTran(context); } public void commitTran(StoreContext context) throws AMQException { - _mms.commitTran(context); + _transactionLog.commitTran(context); } public void abortTran(StoreContext context) throws AMQException { - _mms.abortTran(context); + _transactionLog.abortTran(context); } public boolean inTran(StoreContext context) { - return _mms.inTran(context); + return _transactionLog.inTran(context); } public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException { - _mms.storeContentBodyChunk(context,messageId,index,contentBody,lastContentBody); + _transactionLog.storeContentBodyChunk(context, messageId, index, contentBody, lastContentBody); } public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException { - _mms.storeMessageMetaData(context,messageId,messageMetaData); + _transactionLog.storeMessageMetaData(context, messageId, messageMetaData); } - public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException + public boolean isPersistent() { - return _mms.getMessageMetaData(context,messageId); + return _transactionLog.isPersistent(); } - public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException + public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException { - return _mms.getContentBodyChunk(context,messageId,index); + return _mms.getMessageMetaData(context, messageId); } - public boolean isPersistent() + public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException { - return _mms.isPersistent(); + return _mms.getContentBodyChunk(context, messageId, index); } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java new file mode 100644 index 0000000000..d3294d4c10 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java @@ -0,0 +1,535 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transactionlog; + +import junit.framework.TestCase; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.MessageMetaData; +import org.apache.qpid.server.queue.MockAMQQueue; +import org.apache.qpid.server.queue.MockContentChunk; +import org.apache.qpid.server.queue.MockPersistentAMQMessage; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.virtualhost.VirtualHost; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +public class BaseTransactionLogTest extends TestCase implements TransactionLog +{ + private boolean _inTransaction; + final private Map<Long, ArrayList<AMQQueue>> _enqueues = new HashMap<Long, ArrayList<AMQQueue>>(); + final private Map<Long, ArrayList<ContentChunk>> _storeChunks = new HashMap<Long, ArrayList<ContentChunk>>(); + final private Map<Long, MessageMetaData> _storeMetaData = new HashMap<Long, MessageMetaData>(); + + BaseTransactionLog _transactionLog; + private ArrayList<AMQQueue> _queues; + private MockPersistentAMQMessage _message; + + public void setUp() throws Exception + { + super.setUp(); + _transactionLog = new BaseTransactionLog(this); + } + + public void testSingleEnqueueNoTransactional() throws AMQException + { + //Store Data + + _message = new MockPersistentAMQMessage(1L, this); + + _message.addContentBodyFrame(new StoreContext(), new MockContentChunk(100), true); + + MessagePublishInfo mpi = new MessagePublishInfoImpl(); + + ContentHeaderBody chb = new ContentHeaderBody(); + + _message.setPublishAndContentHeaderBody(new StoreContext(), mpi, chb); + + verifyMessageStored(_message.getMessageId()); + // Enqueue + + _queues = new ArrayList<AMQQueue>(); + MockAMQQueue queue = new MockAMQQueue(this.getName()); + _queues.add(queue); + + _transactionLog.enqueueMessage(new StoreContext(), _queues, _message.getMessageId()); + + verifyEnqueuedOnQueues(_message.getMessageId(), _queues); + } + + public void testSingleDequeueNoTransaction() throws AMQException + { + // Enqueue a message to dequeue + testSingleEnqueueNoTransactional(); + + _transactionLog.dequeueMessage(new StoreContext(),_queues.get(0), _message.getMessageId()); + + assertNull("Message enqueued", _enqueues.get(_message.getMessageId())); + assertNull("Message enqueued", _storeChunks.get(_message.getMessageId())); + assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId())); + } + + public void testSingleEnqueueTransactional() throws AMQException + { + StoreContext context = new StoreContext(); + + _transactionLog.beginTran(context); + + //Store Data + _message = new MockPersistentAMQMessage(1L, this); + + _message.addContentBodyFrame(context, new MockContentChunk(100), true); + + MessagePublishInfo mpi = new MessagePublishInfoImpl(); + + ContentHeaderBody chb = new ContentHeaderBody(); + + _message.setPublishAndContentHeaderBody(context, mpi, chb); + + _transactionLog.commitTran(context); + + verifyMessageStored(_message.getMessageId()); + + // Enqueue + _transactionLog.beginTran(context); + + _queues = new ArrayList<AMQQueue>(); + MockAMQQueue queue = new MockAMQQueue(this.getName()); + _queues.add(queue); + + _transactionLog.enqueueMessage(context, _queues, _message.getMessageId()); + + _transactionLog.commitTran(context); + verifyEnqueuedOnQueues(_message.getMessageId(), _queues); + } + + public void testSingleDequeueTransaction() throws AMQException + { + // Enqueue a message to dequeue + testSingleEnqueueTransactional(); + + StoreContext context = new StoreContext(); + + _transactionLog.beginTran(context); + + _transactionLog.dequeueMessage(context,_queues.get(0), _message.getMessageId()); + + _transactionLog.commitTran(context); + + assertNull("Message enqueued", _enqueues.get(_message.getMessageId())); + assertNull("Message enqueued", _storeChunks.get(_message.getMessageId())); + assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId())); + } + + + public void testMultipleEnqueueNoTransactional() throws AMQException + { + //Store Data + + _message = new MockPersistentAMQMessage(1L, this); + + _message.addContentBodyFrame(new StoreContext(), new MockContentChunk(100), true); + + MessagePublishInfo mpi = new MessagePublishInfoImpl(); + + ContentHeaderBody chb = new ContentHeaderBody(); + + _message.setPublishAndContentHeaderBody(new StoreContext(), mpi, chb); + + verifyMessageStored(_message.getMessageId()); + // Enqueue + + _queues = new ArrayList<AMQQueue>(); + + MockAMQQueue queue = new MockAMQQueue(this.getName()); + _queues.add(queue); + + queue = new MockAMQQueue(this.getName() + "2"); + _queues.add(queue); + + queue = new MockAMQQueue(this.getName() + "3"); + _queues.add(queue); + + _transactionLog.enqueueMessage(new StoreContext(), _queues, _message.getMessageId()); + + verifyEnqueuedOnQueues(_message.getMessageId(), _queues); + } + + public void testMultipleDequeueNoTransaction() throws AMQException + { + // Enqueue a message to dequeue + testMultipleEnqueueNoTransactional(); + + _transactionLog.dequeueMessage(new StoreContext(),_queues.get(0), _message.getMessageId()); + + ArrayList<AMQQueue> enqueued = _enqueues.get(_message.getMessageId()); + assertNotNull("Message not enqueued", enqueued); + assertFalse("Message still enqueued on the first queue,",enqueued.contains(_queues.get(0))); + assertEquals("Message should still be enqueued on 2 queues", 2, enqueued.size()); + + assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId())); + assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId())); + + + _transactionLog.dequeueMessage(new StoreContext(),_queues.get(1), _message.getMessageId()); + + enqueued = _enqueues.get(_message.getMessageId()); + assertNotNull("Message not enqueued", enqueued); + assertFalse("Message still enqueued on the second queue,",enqueued.contains(_queues.get(1))); + assertEquals("Message should still be enqueued on 2 queues", 1, enqueued.size()); + + assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId())); + assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId())); + + _transactionLog.dequeueMessage(new StoreContext(),_queues.get(2), _message.getMessageId()); + + assertNull("Message enqueued", _enqueues.get(_message.getMessageId())); + assertNull("Message enqueued", _storeChunks.get(_message.getMessageId())); + assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId())); + } + + + public void testMultipleEnqueueTransactional() throws AMQException + { + StoreContext context = new StoreContext(); + + _transactionLog.beginTran(context); + + //Store Data + _message = new MockPersistentAMQMessage(1L, this); + + _message.addContentBodyFrame(context, new MockContentChunk(100), true); + + MessagePublishInfo mpi = new MessagePublishInfoImpl(); + + ContentHeaderBody chb = new ContentHeaderBody(); + + _message.setPublishAndContentHeaderBody(context, mpi, chb); + + _transactionLog.commitTran(context); + + verifyMessageStored(_message.getMessageId()); + + // Enqueue + _transactionLog.beginTran(context); + + _queues = new ArrayList<AMQQueue>(); + MockAMQQueue queue = new MockAMQQueue(this.getName()); + _queues.add(queue); + + queue = new MockAMQQueue(this.getName() + "2"); + _queues.add(queue); + + queue = new MockAMQQueue(this.getName() + "3"); + _queues.add(queue); + + _transactionLog.enqueueMessage(context, _queues, _message.getMessageId()); + + _transactionLog.commitTran(context); + verifyEnqueuedOnQueues(_message.getMessageId(), _queues); + } + + public void testMultipleDequeueMultipleTransactions() throws AMQException + { + // Enqueue a message to dequeue + testMultipleEnqueueTransactional(); + + StoreContext context = new StoreContext(); + + _transactionLog.beginTran(context); + + _transactionLog.dequeueMessage(context, _queues.get(0), _message.getMessageId()); + + _transactionLog.commitTran(context); + ArrayList<AMQQueue> enqueued = _enqueues.get(_message.getMessageId()); + assertNotNull("Message not enqueued", enqueued); + assertFalse("Message still enqueued on the first queue,", enqueued.contains(_queues.get(0))); + assertEquals("Message should still be enqueued on 2 queues", 2, enqueued.size()); + + assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId())); + assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId())); + + _transactionLog.beginTran(context); + + _transactionLog.dequeueMessage(context, _queues.get(1), _message.getMessageId()); + + _transactionLog.commitTran(context); + + enqueued = _enqueues.get(_message.getMessageId()); + assertNotNull("Message not enqueued", enqueued); + assertFalse("Message still enqueued on the second queue,", enqueued.contains(_queues.get(1))); + assertEquals("Message should still be enqueued on 2 queues", 1, enqueued.size()); + + assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId())); + assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId())); + + _transactionLog.beginTran(context); + + _transactionLog.dequeueMessage(context, _queues.get(2), _message.getMessageId()); + + _transactionLog.commitTran(context); + + assertNull("Message enqueued", _enqueues.get(_message.getMessageId())); + assertNull("Message enqueued", _storeChunks.get(_message.getMessageId())); + assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId())); + } + + public void testMultipleDequeueSingleTransaction() throws AMQException + { + // Enqueue a message to dequeue + testMultipleEnqueueTransactional(); + + StoreContext context = new StoreContext(); + + _transactionLog.beginTran(context); + + _transactionLog.dequeueMessage(context, _queues.get(0), _message.getMessageId()); + + ArrayList<AMQQueue> enqueued = _enqueues.get(_message.getMessageId()); + assertNotNull("Message not enqueued", enqueued); + assertFalse("Message still enqueued on the first queue,", enqueued.contains(_queues.get(0))); + assertEquals("Message should still be enqueued on 2 queues", 2, enqueued.size()); + + assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId())); + assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId())); + + + _transactionLog.dequeueMessage(context, _queues.get(1), _message.getMessageId()); + + + enqueued = _enqueues.get(_message.getMessageId()); + assertNotNull("Message not enqueued", enqueued); + assertFalse("Message still enqueued on the second queue,", enqueued.contains(_queues.get(1))); + assertEquals("Message should still be enqueued on 2 queues", 1, enqueued.size()); + + assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId())); + assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId())); + + + _transactionLog.dequeueMessage(context, _queues.get(2), _message.getMessageId()); + + _transactionLog.commitTran(context); + + assertNull("Message enqueued", _enqueues.get(_message.getMessageId())); + assertNull("Message enqueued", _storeChunks.get(_message.getMessageId())); + assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId())); + } + + private void verifyMessageStored(Long messageId) + { + assertTrue("MessageMD has not been stored", _storeMetaData.containsKey(messageId)); + assertTrue("Messasge Chunk has not been stored", _storeChunks.containsKey(messageId)); + } + + private void verifyEnqueuedOnQueues(Long messageId, ArrayList<AMQQueue> queues) + { + ArrayList<AMQQueue> enqueues = _enqueues.get(messageId); + + assertNotNull("Message not enqueued", enqueues); + assertEquals("Message is not enqueued on the right number of queues", queues.size(), enqueues.size()); + for (AMQQueue queue : queues) + { + assertTrue("Message not enqueued on:" + queue, enqueues.contains(queue)); + } + } + + /*************************** TransactionLog ******************************* + * + * Simple InMemory TransactionLog that actually records enqueues/dequeues + */ + + /** + * @param virtualHost The virtual host using by this store + * @param base The base element identifier from which all configuration items are relative. For example, if + * the base element is "store", the all elements used by concrete classes will be "store.foo" etc. + * @param config The apache commons configuration object. + * + * @return + * + * @throws Exception + */ + public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception + { + return this; + } + + public void close() throws Exception + { + } + + public void enqueueMessage(StoreContext context, ArrayList<AMQQueue> queues, Long messageId) throws AMQException + { + for (AMQQueue queue : queues) + { + enqueueMessage(messageId, queue); + } + } + + private void enqueueMessage(Long messageId, AMQQueue queue) + { + ArrayList<AMQQueue> queues = _enqueues.get(messageId); + + if (queues == null) + { + synchronized (_enqueues) + { + queues = _enqueues.get(messageId); + if (queues == null) + { + queues = new ArrayList<AMQQueue>(); + _enqueues.put(messageId, queues); + } + } + } + + synchronized (queues) + { + queues.add(queue); + } + } + + public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException + { + ArrayList<AMQQueue> queues = _enqueues.get(messageId); + + if (queues == null) + { + throw new RuntimeException("Attempt to dequeue message(" + messageId + ") from " + + "queue(" + queue + ") but no enqueue data available"); + } + + synchronized (queues) + { + if (!queues.contains(queue)) + { + throw new RuntimeException("Attempt to dequeue message(" + messageId + ") from " + + "queue(" + queue + ") but no message not enqueued on queue"); + } + + queues.remove(queue); + } + } + + public void removeMessage(StoreContext context, Long messageId) throws AMQException + { + ArrayList<AMQQueue> queues; + + synchronized (_enqueues) + { + queues = _enqueues.remove(messageId); + } + + if (queues == null) + { + throw new RuntimeException("Attempt to remove message(" + messageId + ") but " + + "no enqueue data available"); + } + + if (!queues.isEmpty()) + { + throw new RuntimeException("Removed a message(" + messageId + ") that still had references."); + } + + synchronized (_storeMetaData) + { + _storeMetaData.remove(messageId); + } + + synchronized (_storeChunks) + { + _storeChunks.remove(messageId); + } + + } + + // + // This class does not attempt to operate transactionally. It only knows when it should be in a transaction. + // Data is stored immediately. + // + + public void beginTran(StoreContext context) throws AMQException + { + context.setPayload(new Object()); + } + + public void commitTran(StoreContext context) throws AMQException + { + context.setPayload(null); + } + + public void abortTran(StoreContext context) throws AMQException + { + _inTransaction = false; + } + + public boolean inTran(StoreContext context) + { + return _inTransaction; + } + + public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException + { + ArrayList<ContentChunk> chunks = _storeChunks.get(messageId); + + if (chunks == null) + { + synchronized (_storeChunks) + { + chunks = _storeChunks.get(messageId); + if (chunks == null) + { + chunks = new ArrayList<ContentChunk>(); + _storeChunks.put(messageId, chunks); + } + } + } + + synchronized (chunks) + { + chunks.add(contentBody); + } + } + + public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException + { + if (_storeMetaData.get(messageId) != null) + { + throw new RuntimeException("Attempt to storeMessageMetaData for messageId(" + messageId + ") but data already exists"); + } + + synchronized (_storeMetaData) + { + _storeMetaData.put(messageId, messageMetaData); + } + } + + public boolean isPersistent() + { + return false; + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java index 26802b4210..1210423d1b 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java @@ -69,7 +69,7 @@ public class TxnBufferTest extends TestCase public void testCommitWithFailureDuringPrepare() throws AMQException { MockStore store = new MockStore(); - store.beginTran(null); + store.beginTran(new StoreContext()); TxnBuffer buffer = new TxnBuffer(); buffer.enlist(new StoreMessageOperation(store)); @@ -81,7 +81,7 @@ public class TxnBufferTest extends TestCase try { - buffer.commit(null); + buffer.commit(new StoreContext()); } catch (NoSuchElementException e) { @@ -95,7 +95,7 @@ public class TxnBufferTest extends TestCase public void testCommitWithPersistance() throws AMQException { MockStore store = new MockStore(); - store.beginTran(null); + store.beginTran(new StoreContext()); store.expectCommit(); TxnBuffer buffer = new TxnBuffer(); @@ -105,7 +105,7 @@ public class TxnBufferTest extends TestCase buffer.enlist(new StoreMessageOperation(store)); buffer.enlist(new TxnTester(store)); - buffer.commit(null); + buffer.commit(new StoreContext()); validateOps(); store.validate(); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualhostInitRoutingTableFromTransactionLogTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualhostInitRoutingTableFromTransactionLogTest.java index 1274b99880..ed79f1cc4f 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualhostInitRoutingTableFromTransactionLogTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualhostInitRoutingTableFromTransactionLogTest.java @@ -39,8 +39,7 @@ public class VirtualhostInitRoutingTableFromTransactionLogTest extends TestCase _virtualHost = new VirtualHost(new VirtualHostConfiguration("test", env)); assertNotNull(_virtualHost.getTransactionLog()); - assertNotNull(_virtualHost.getRoutingTable()); - assertEquals(_virtualHost.getTransactionLog(),_virtualHost.getRoutingTable()); + assertNotNull(_virtualHost.getRoutingTable()); } catch (Exception e) { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java index c7c2c8b292..662f04b3c9 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java @@ -36,6 +36,7 @@ import org.apache.qpid.server.routing.RoutingTable; import java.util.HashMap; import java.util.Iterator; +import java.util.ArrayList; public class SlowMessageStore implements TransactionLog, RoutingTable { @@ -50,7 +51,7 @@ public class SlowMessageStore implements TransactionLog, RoutingTable private static final String POST = "post"; private String DEFAULT_DELAY = "default"; - public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception + public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception { _logger.warn("Starting SlowMessageStore on Virtualhost:" + virtualHost.getName()); Configuration delays = config.getStoreConfiguration().subset(DELAYS); @@ -81,7 +82,11 @@ public class SlowMessageStore implements TransactionLog, RoutingTable _realTransactionLog = (TransactionLog) o; } } - _realTransactionLog.configure(virtualHost, base , config); + + // The call to configure may return a new transaction log + _realTransactionLog = (TransactionLog) _realTransactionLog.configure(virtualHost, base , config); + + return this; } private void configureDelays(Configuration config) @@ -205,10 +210,10 @@ public class SlowMessageStore implements TransactionLog, RoutingTable doPostDelay("removeQueue"); } - public void enqueueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException + public void enqueueMessage(StoreContext context, ArrayList<AMQQueue> queues, Long messageId) throws AMQException { doPreDelay("enqueueMessage"); - _realTransactionLog.enqueueMessage(context, queue, messageId); + _realTransactionLog.enqueueMessage(context, queues, messageId); doPostDelay("enqueueMessage"); } @@ -219,6 +224,13 @@ public class SlowMessageStore implements TransactionLog, RoutingTable doPostDelay("dequeueMessage"); } + public void removeMessage(StoreContext context, Long messageId) throws AMQException + { + doPreDelay("dequeueMessage"); + _realTransactionLog.removeMessage(context, messageId); + doPostDelay("dequeueMessage"); + } + public void beginTran(StoreContext context) throws AMQException { doPreDelay("beginTran"); @@ -262,22 +274,22 @@ public class SlowMessageStore implements TransactionLog, RoutingTable doPostDelay("storeMessageMetaData"); } - public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException - { - doPreDelay("getMessageMetaData"); - MessageMetaData mmd = _realTransactionLog.getMessageMetaData(context, messageId); - doPostDelay("getMessageMetaData"); - return mmd; - } - - public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException - { - doPreDelay("getContentBodyChunk"); - ContentChunk c = _realTransactionLog.getContentBodyChunk(context, messageId, index); - doPostDelay("getContentBodyChunk"); - return c; - } - +// public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException +// { +// doPreDelay("getMessageMetaData"); +// MessageMetaData mmd = _realTransactionLog.getMessageMetaData(context, messageId); +// doPostDelay("getMessageMetaData"); +// return mmd; +// } +// +// public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException +// { +// doPreDelay("getContentBodyChunk"); +// ContentChunk c = _realTransactionLog.getContentBodyChunk(context, messageId, index); +// doPostDelay("getContentBodyChunk"); +// return c; +// } +// public boolean isPersistent() { return _realTransactionLog.isPersistent(); |
