summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2009-04-11 01:57:36 +0000
committerMartin Ritchie <ritchiem@apache.org>2009-04-11 01:57:36 +0000
commitb7a3c353a8cab215c9244702569d0ec65d8f4c7b (patch)
treea92c3b7a6f0fae652d79b1ac89b918f68b6b73fd /qpid/java
parente8e46f3b33179061346b69c2195bcb6b253b377d (diff)
downloadqpid-python-b7a3c353a8cab215c9244702569d0ec65d8f4c7b.tar.gz
QPID-1800: modify SAMQQ to record transactionlog etry even when queue isnt durable(consistent with restof broker) and send both current and new queue to
BTL for enqueue to ensure references are kept properly. Update BTL to check for prevous enqueues and record new enques in any existing list, despatching only new enqueues to the delegate merged from trunk r764075 git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5-release@764148 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java31
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java43
2 files changed, 65 insertions, 9 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 6805d8261e..6a19acddd7 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -823,6 +823,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
TransactionLog transactionLog = getVirtualHost().getTransactionLog();
+
+ if (toQueue.equals(this))
+ {
+ //nothing to do here, message is already at the requested destination
+ return;
+ }
List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
{
@@ -848,19 +854,24 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
// Move the messages in the transaction log.
for (QueueEntry entry : entries)
{
- if (entry.isPersistent() && toQueue.isDurable())
+ if (entry.isPersistent())
{
//FIXME
//fixme
- ArrayList list = new ArrayList();
+
+ // Creating a list with the destination queue AND the current queue.
+ // This is a hack to ensure a reference is kept in the TLog to the new destination when dequeing
+ // the old destination below, thus preventing incorrect removal of the message from the store
+ ArrayList<AMQQueue> list = new ArrayList<AMQQueue>();
list.add(toQueue);
+ list.add(this);
transactionLog.enqueueMessage(storeContext, list, entry.getMessageId());
}
// dequeue will remove the messages from the queue
entry.dequeue(storeContext);
}
- // Commit and flush the move transcations.
+ // Commit and flush the move transactions.
try
{
transactionLog.commitTran(storeContext);
@@ -891,7 +902,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
toQueue.enqueue(storeContext, entry.getMessage());
// As we only did a dequeue above now that we have moved the message we should perform a delete.
// We cannot do this earlier as the message will be lost if flowed.
- //entry.delete();
+ entry.delete();
}
}
catch (MessageCleanupException e)
@@ -913,6 +924,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
TransactionLog transactionLog = getVirtualHost().getTransactionLog();
+ if (toQueue.equals(this))
+ {
+ //nothing to do here, message is already at the requested destination
+ return;
+ }
+
List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
{
@@ -944,11 +961,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
// Move the messages in on the transaction log.
for (QueueEntry entry : entries)
{
- if (!entry.isDeleted() && entry.isPersistent() && toQueue.isDurable())
+ if (!entry.isDeleted() && entry.isPersistent())
{
//fixme
//FIXME
+
+ // Creating a list with the destination queue AND the current queue.
+ // This is a hack to ensure a reference is kept in the TLog to the old destination when enqueing
ArrayList list = new ArrayList();
+ list.add(this);
list.add(toQueue);
transactionLog.enqueueMessage(storeContext, list, entry.getMessageId());
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
index 78efb8b9f7..7bf78ad977 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
@@ -67,12 +67,47 @@ public class BaseTransactionLog implements TransactionLog
{
_logger.info("Recording Enqueue of (" + messageId + ") on queue:" + queues);
}
+
+ //list to hold which new queues to enqueue the message on
+ ArrayList<AMQQueue> toEnqueueList = new ArrayList<AMQQueue>();
+
+ List<AMQQueue> enqueuedList = _idToQueues.get(messageId);
+ if (enqueuedList != null)
+ {
+ //There are previous enqueues for this messageId
+ synchronized (enqueuedList)
+ {
+ for(AMQQueue queue : queues)
+ {
+ if(!enqueuedList.contains(queue))
+ {
+ //update the old list.
+ enqueuedList.add(queue);
+ //keep track of new enqueues to be made
+ toEnqueueList.add(queue);
+ }
+ }
+ }
+
+ if(toEnqueueList.isEmpty())
+ {
+ //no new queues to enqueue message on
+ return;
+ }
+ }
+ else
+ {
+ //No existing list, add all provided queues (cloning toEnqueueList in case someone else changes original).
+ toEnqueueList.addAll(queues);
+ _idToQueues.put(messageId, Collections.synchronizedList((ArrayList<AMQQueue>)toEnqueueList.clone()));
+ }
- //Clone the list incase someone else changes it.
- _idToQueues.put(messageId, Collections.synchronizedList((ArrayList<AMQQueue>)queues.clone()));
+ _delegate.enqueueMessage(context, toEnqueueList, messageId);
+ }
+ else
+ {
+ _delegate.enqueueMessage(context, queues, messageId);
}
-
- _delegate.enqueueMessage(context, queues, messageId);
}
public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException