diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2009-04-08 19:44:34 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2009-04-08 19:44:34 +0000 |
| commit | 9305a19f6d6d0eda39137483735c9924a22ab247 (patch) | |
| tree | b34d92bd9f4fd5ecafcc6cff73476453f69a6fcc /java/broker | |
| parent | 6ebda3595dd243e35966c9bd4d72e40af66d2a6d (diff) | |
| download | qpid-python-9305a19f6d6d0eda39137483735c9924a22ab247.tar.gz | |
QPID-1794 : Clear the StoreContext after non-transactional processing. Updated BaseTransactionLog to synchronize the on the enqueued messages from the _idToQueue Map as this will be being modified by many ack-ing threads and closing/requeue threads. Updated BaseTransactionLogTest so that it uses a single StoreContext per test rather than a fresh context for each operation. This was masking the problem.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@763361 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
4 files changed, 148 insertions, 117 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java b/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java index bb50df139f..26892868f3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java @@ -27,6 +27,8 @@ import org.apache.qpid.server.queue.AMQQueue; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; +import java.util.List; +import java.util.Collections; /** * A context that the store can use to associate with a transactional context. For example, it could store @@ -41,8 +43,7 @@ public class StoreContext private static final String DEFAULT_NAME = "StoreContext"; private String _name; private Object _payload; - private HashMap<Long, ArrayList<AMQQueue>> _enqueueMap; - private HashMap<Long, ArrayList<AMQQueue>> _dequeueMap; + private Map<Long, List<AMQQueue>> _dequeueMap; private boolean _async; private boolean _inTransaction; @@ -53,12 +54,11 @@ public class StoreContext public StoreContext(String name) { - this(name,false); + this(name, false); } /** - * - * @param name The name of this Transaction + * @param name The name of this Transaction * @param asynchrouous Is this Transaction Asynchronous */ public StoreContext(String name, boolean asynchrouous) @@ -66,8 +66,7 @@ public class StoreContext _name = name; _async = asynchrouous; _inTransaction = false; - _enqueueMap = new HashMap<Long, ArrayList<AMQQueue>>(); - _dequeueMap = new HashMap<Long, ArrayList<AMQQueue>>(); + _dequeueMap = Collections.synchronizedMap(new HashMap<Long, List<AMQQueue>>()); } public StoreContext(boolean asynchronous) @@ -86,7 +85,7 @@ public class StoreContext { _logger.debug("public void setPayload(Object payload = " + payload + "): called"); } - _payload = payload; + _payload = payload; } /** @@ -99,49 +98,13 @@ public class StoreContext return "<_name = " + _name + ", _payload = " + _payload + ">"; } - public Map<Long, ArrayList<AMQQueue>> getEnqueueMap() - { - return _enqueueMap; - } - - public Map<Long, ArrayList<AMQQueue>> getDequeueMap() + public Map<Long, List<AMQQueue>> getDequeueMap() { return _dequeueMap; } /** - * Record the enqueues for processing if we abort - * - * @param queues - * @param messageId - * - * @throws AMQException - */ - public void enqueueMessage(ArrayList<AMQQueue> queues, Long messageId) throws AMQException - { - if (inTransaction()) - { - ArrayList<AMQQueue> enqueues = _enqueueMap.get(messageId); - - if (enqueues == null) - { - enqueues = new ArrayList<AMQQueue>(); - _enqueueMap.put(messageId, enqueues); - } - - for (AMQQueue q : queues) - { - if (!enqueues.contains(q)) - { - enqueues.add(q); - } - } - - } - } - - /** - * Record the dequeue for processing after the commit + * Record the dequeue for processing after the commit * * @param queue * @param messageId @@ -150,15 +113,22 @@ public class StoreContext */ public void dequeueMessage(AMQQueue queue, Long messageId) throws AMQException { - ArrayList<AMQQueue> dequeues = _dequeueMap.get(messageId); + List<AMQQueue> dequeues = _dequeueMap.get(messageId); - if (dequeues == null) + synchronized (_dequeueMap) { - dequeues = new ArrayList<AMQQueue>(); - _dequeueMap.put(messageId, dequeues); + if (dequeues == null) + { + dequeues = Collections.synchronizedList(new ArrayList<AMQQueue>()); + _dequeueMap.put(messageId, dequeues); + } } dequeues.add(queue); + if (_logger.isInfoEnabled()) + { + _logger.info("Added (" + messageId + ") to dequeues:" + dequeues); + } } public void beginTransaction() throws AMQException @@ -174,13 +144,13 @@ public class StoreContext public void abortTransaction() throws AMQException { - _enqueueMap.clear(); + _dequeueMap.clear(); _inTransaction = false; } public boolean inTransaction() { - return _inTransaction; // _payload != null; + return _inTransaction; } public boolean isAsync() diff --git a/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java b/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java index dded7f7142..ce2d67cf60 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java @@ -31,16 +31,18 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.Set; -import java.util.Iterator; +import java.util.Collections; +import java.util.List; public class BaseTransactionLog implements TransactionLog { private static final Logger _logger = Logger.getLogger(BaseTransactionLog.class); TransactionLog _delegate; - protected Map<Long, ArrayList<AMQQueue>> _idToQueues = new HashMap<Long, ArrayList<AMQQueue>>(); + protected Map<Long, List<AMQQueue>> _idToQueues = Collections.synchronizedMap(new HashMap<Long, List<AMQQueue>>()); public BaseTransactionLog(TransactionLog delegate) { @@ -59,14 +61,15 @@ public class BaseTransactionLog implements TransactionLog public void enqueueMessage(StoreContext context, ArrayList<AMQQueue> queues, Long messageId) throws AMQException { - context.enqueueMessage(queues, messageId); - if (queues.size() > 1) { - _logger.info("Recording Enqueue of (" + messageId + ") on queue:" + queues); + if (_logger.isInfoEnabled()) + { + _logger.info("Recording Enqueue of (" + messageId + ") on queue:" + queues); + } //Clone the list incase someone else changes it. - _idToQueues.put(messageId, (ArrayList) queues.clone()); + _idToQueues.put(messageId, (List<AMQQueue>)Collections.synchronizedList((ArrayList<AMQQueue>)queues.clone())); } _delegate.enqueueMessage(context, queues, messageId); @@ -78,21 +81,33 @@ public class BaseTransactionLog implements TransactionLog if (context.inTransaction()) { - Map<Long, ArrayList<AMQQueue>> messageMap = context.getDequeueMap(); + + Map<Long, List<AMQQueue>> messageMap = context.getDequeueMap(); //For each Message ID that is in the map check - Iterator iterator = messageMap.keySet().iterator(); + Set<Long> messageIDs = messageMap.keySet(); - while (iterator.hasNext()) + synchronized (messageMap) { - Long messageID = (Long) iterator.next(); - //If we don't have a gloabl reference for this message then there is only a single enqueue - if (_idToQueues.get(messageID) == null) + if (_logger.isInfoEnabled()) { - // Add the removal of the message to this transaction - _delegate.removeMessage(context,messageID); - // Remove this message ID as we have processed it so we don't reprocess after the main commmit - iterator.remove(); + _logger.info("Pre-Processing single dequeue of:" + messageIDs); + } + + Iterator iterator = messageIDs.iterator(); + + while (iterator.hasNext()) + { + Long messageID = (Long) iterator.next(); + //If we don't have a gloabl reference for this message then there is only a single enqueue + //can check here to see if this is the last reference? + if (_idToQueues.get(messageID) == null) + { + // Add the removal of the message to this transaction + _delegate.removeMessage(context, messageID); + // Remove this message ID as we have processed it so we don't reprocess after the main commmit + iterator.remove(); + } } } } @@ -136,22 +151,19 @@ public class BaseTransactionLog implements TransactionLog public void abortTran(StoreContext context) throws AMQException { - // If we have enqueues to rollback - processDequeues(context.getEnqueueMap()); - //Abort the recorded state for this transaction. context.abortTransaction(); _delegate.abortTran(context); } - private void processDequeues(Map<Long, ArrayList<AMQQueue>> messageMap) + private void processDequeues(Map<Long, List<AMQQueue>> messageMap) throws AMQException { // Check we have dequeues to process then process them if (messageMap == null || messageMap.isEmpty()) { - return; + return; } // Process any enqueues to bring our model up to date. @@ -162,50 +174,77 @@ public class BaseTransactionLog implements TransactionLog //Batch Process the Dequeues on the delegate _delegate.beginTran(removeContext); + removeContext.beginTransaction(); try { //For each Message ID Decrement the reference for each of the queues it was on. - for (Long messageID : messageIDs) + + synchronized (messageMap) { - ArrayList<AMQQueue> queueList = messageMap.get(messageID); + if (_logger.isInfoEnabled()) + { + _logger.info("Processing Dequeue for:" + messageIDs); + } - // For each of the queues decrement the reference - for (AMQQueue queue : queueList) + Iterator<Long> messageIDIterator = messageIDs.iterator(); + + while(messageIDIterator.hasNext()) { - ArrayList<AMQQueue> enqueuedList = _idToQueues.get(messageID); + Long messageID = messageIDIterator.next(); + List<AMQQueue> queueList = messageMap.get(messageID); - // If we have no mapping then this message was only enqueued on a single queue - // This will be the case when we are not in a larger transaction - if (enqueuedList == null) - { - _delegate.removeMessage(removeContext, messageID); - } - else + //Remove this message from our DequeueMap as we are processing it. + messageIDIterator.remove(); + + // For each of the queues decrement the reference + for (AMQQueue queue : queueList) { - //When a message is on more than one queue it is possible that this code section is exectuted - // by one thread per enqueue. - // It is however, thread safe because there is only removes being performed and so the - // last thread that does the remove will see the empty queue and remove the message - // At this stage there is nothing that is going to cause this operation to abort. So we don't - // need to worry about any potential adds. - // The message will no longer be enqueued as that operation has been committed before now so - // this is clean up of the data. - - // Update the enqueued list - enqueuedList.remove(queue); - - // If the list is now empty then remove the message - if (enqueuedList.isEmpty()) + List<AMQQueue> enqueuedList = _idToQueues.get(messageID); + + if (_logger.isInfoEnabled()) + { + _logger.info("Dequeue message:" + messageID + " from :" + queue); + } + + + // If we have no mapping then this message was only enqueued on a single queue + // This will be the case when we are not in a larger transaction + if (enqueuedList == null) { _delegate.removeMessage(removeContext, messageID); - //Remove references list - _idToQueues.remove(messageID); + } + else + { + //When a message is on more than one queue it is possible that this code section is exectuted + // by one thread per enqueue. + // It is however, thread safe because there is only removes being performed and so the + // last thread that does the remove will see the empty queue and remove the message + // At this stage there is nothing that is going to cause this operation to abort. So we don't + // need to worry about any potential adds. + // The message will no longer be enqueued as that operation has been committed before now so + // this is clean up of the data. + synchronized (enqueuedList) + { + // Update the enqueued list but if the queue is not in the list then we are trying + // to dequeue something that is not there anymore, or was never there. + if (!enqueuedList.remove(queue)) + { + throw new UnableToDequeueException(messageID, queue); + } + + // If the list is now empty then remove the message + if (enqueuedList.isEmpty()) + { + _delegate.removeMessage(removeContext, messageID); + //Remove references list + _idToQueues.remove(messageID); + } + } } } } } - //Commit the removes on the delegate. _delegate.commitTran(removeContext); // Mark this context as committed. @@ -244,4 +283,18 @@ public class BaseTransactionLog implements TransactionLog { return _delegate; } + + private class UnableToDequeueException extends RuntimeException + { + Long _messageID; + AMQQueue _queue; + + public UnableToDequeueException(Long messageID, AMQQueue queue) + { + super("Unable to dequeue message(" + messageID + ") from queue " + + "(" + queue + ") it is not/nolonger enqueued on it."); + _messageID = messageID; + _queue = queue; + } + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java b/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java index 73d57df6e6..b2054c3436 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java @@ -77,6 +77,8 @@ public interface TransactionLog /** * Places a message onto a specified queue, in a given transactional context. * + * This method need not be thread safe as it is only called by the message delivery thread + * * @param context The transactional context for the operation. * @param queues *@param messageId The message to enqueue. @throws AMQException If the operation fails for any reason. @throws org.apache.qpid.AMQException @@ -86,6 +88,8 @@ public interface TransactionLog /** * Extracts a message from a specified queue, in a given transactional context. * + * This method MUST be thread safe as dequeue will be called by multiple threads, ack, requeue, delivery thread + * * @param context The transactional context for the operation. * @param queue * @param messageId The message to dequeue. @throws AMQException If the operation fails for any reason, or if the specified message does not exist. @@ -95,6 +99,8 @@ public interface TransactionLog /** * Remove the specified message from the log * + * This method MUST be thread safe as dequeue will be called by multiple threads, ack, requeue, delivery thread + * * @param context The transactional context for the operation * @param messageId The message to remove * @throws AMQException diff --git a/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java b/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java index a0c38ff0ad..c3ccde3844 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java @@ -51,11 +51,13 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog TestTransactionLog _transactionLog; private ArrayList<AMQQueue> _queues; private MockPersistentAMQMessage _message; + StoreContext _context; public void setUp() throws Exception { super.setUp(); _transactionLog = new TestableBaseTransactionLog(this); + _context = new StoreContext(); } public void testSingleEnqueueNoTransactional() throws AMQException @@ -64,13 +66,13 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog _message = new MockPersistentAMQMessage(1L, this); - _message.addContentBodyFrame(new StoreContext(), new MockContentChunk(100), true); + _message.addContentBodyFrame(_context, new MockContentChunk(100), true); MessagePublishInfo mpi = new MessagePublishInfoImpl(); ContentHeaderBody chb = new ContentHeaderBody(); - _message.setPublishAndContentHeaderBody(new StoreContext(), mpi, chb); + _message.setPublishAndContentHeaderBody(_context, mpi, chb); verifyMessageStored(_message.getMessageId()); // Enqueue @@ -79,7 +81,7 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog MockAMQQueue queue = new MockAMQQueue(this.getName()); _queues.add(queue); - _transactionLog.enqueueMessage(new StoreContext(), _queues, _message.getMessageId()); + _transactionLog.enqueueMessage(_context, _queues, _message.getMessageId()); verifyEnqueuedOnQueues(_message.getMessageId(), _queues); } @@ -89,14 +91,14 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog // Enqueue a message to dequeue testSingleEnqueueNoTransactional(); - _transactionLog.dequeueMessage(new StoreContext(), _queues.get(0), _message.getMessageId()); + _transactionLog.dequeueMessage(_context, _queues.get(0), _message.getMessageId()); verifyMessageRemoved(_message.getMessageId()); } public void testSingleEnqueueTransactional() throws AMQException { - StoreContext context = new StoreContext(); + StoreContext context = _context; _transactionLog.beginTran(context); @@ -133,7 +135,7 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog // Enqueue a message to dequeue testSingleEnqueueTransactional(); - StoreContext context = new StoreContext(); + StoreContext context = _context; _transactionLog.beginTran(context); @@ -150,13 +152,13 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog _message = new MockPersistentAMQMessage(1L, this); - _message.addContentBodyFrame(new StoreContext(), new MockContentChunk(100), true); + _message.addContentBodyFrame(_context, new MockContentChunk(100), true); MessagePublishInfo mpi = new MessagePublishInfoImpl(); ContentHeaderBody chb = new ContentHeaderBody(); - _message.setPublishAndContentHeaderBody(new StoreContext(), mpi, chb); + _message.setPublishAndContentHeaderBody(_context, mpi, chb); verifyMessageStored(_message.getMessageId()); // Enqueue @@ -172,7 +174,7 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog queue = new MockAMQQueue(this.getName() + "3"); _queues.add(queue); - _transactionLog.enqueueMessage(new StoreContext(), _queues, _message.getMessageId()); + _transactionLog.enqueueMessage(_context, _queues, _message.getMessageId()); verifyEnqueuedOnQueues(_message.getMessageId(), _queues); } @@ -182,7 +184,7 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog // Enqueue a message to dequeue testMultipleEnqueueNoTransactional(); - _transactionLog.dequeueMessage(new StoreContext(), _queues.get(0), _message.getMessageId()); + _transactionLog.dequeueMessage(_context, _queues.get(0), _message.getMessageId()); ArrayList<AMQQueue> enqueued = _enqueues.get(_message.getMessageId()); @@ -192,7 +194,7 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog verifyEnqueuedOnQueues(_message.getMessageId(), _queues); verifyMessageStored(_message.getMessageId()); - _transactionLog.dequeueMessage(new StoreContext(), _queues.get(0), _message.getMessageId()); + _transactionLog.dequeueMessage(_context, _queues.get(0), _message.getMessageId()); assertFalse("Message still enqueued on the first queue,", enqueued.contains(_queues.get(0))); _queues.remove(0); @@ -218,7 +220,7 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog verifyMessageStored(_message.getMessageId()); - _transactionLog.dequeueMessage(new StoreContext(), _queues.get(0), _message.getMessageId()); + _transactionLog.dequeueMessage(_context, _queues.get(0), _message.getMessageId()); verifyMessageRemoved(_message.getMessageId()); } @@ -233,7 +235,7 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog public void testMultipleEnqueueTransactional() throws AMQException { - StoreContext context = new StoreContext(); + StoreContext context = _context; _transactionLog.beginTran(context); @@ -276,7 +278,7 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog // Enqueue a message to dequeue testMultipleEnqueueTransactional(); - StoreContext context = new StoreContext(); + StoreContext context = _context; _transactionLog.beginTran(context); @@ -319,7 +321,7 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog // Enqueue a message to dequeue testMultipleEnqueueTransactional(); - StoreContext context = new StoreContext(); + StoreContext context = _context; _transactionLog.beginTran(context); |
