summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-07-25 15:25:32 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-07-25 15:25:32 +0000
commit6b02c4c08fdc26de6e048357111d9e0b85ab4927 (patch)
treee18a361536601b5f837b110e5a5651f1bdd7b487 /qpid/java
parent42bfb186da9e911c208f22dd5f6c794b9bddd859 (diff)
downloadqpid-python-6b02c4c08fdc26de6e048357111d9e0b85ab4927.tar.gz
QPID-5907 : [Java Broker] Remove unreferenced messages from the store in asynchronous store recoverer process
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1613449 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java21
1 files changed, 19 insertions, 2 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
index 9742404225..750efc23ae 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
@@ -43,6 +43,7 @@ import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
import org.apache.qpid.server.store.handler.MessageInstanceHandler;
import org.apache.qpid.server.txn.DtxBranch;
import org.apache.qpid.server.txn.DtxRegistry;
@@ -141,10 +142,26 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
private synchronized void completeRecovery()
{
// at this point nothing should be writing to the map of recovered messages
- for (MessageReference<? extends ServerMessage<?>> entry : _recoveredMessages.values())
+ for (Map.Entry<Long,MessageReference<? extends ServerMessage<?>>> entry : _recoveredMessages.entrySet())
{
- entry.release();
+ entry.getValue().release();
+ entry.setValue(null); // free up any memory associated with the reference object
}
+ getStore().visitMessages(new MessageHandler()
+ {
+ @Override
+ public boolean handle(final StoredMessage<?> storedMessage)
+ {
+
+ long messageNumber = storedMessage.getMessageNumber();
+ if(!_recoveredMessages.containsKey(messageNumber))
+ {
+ _logger.info("Message id " + messageNumber + " in store, but not in any queue - removing....");
+ storedMessage.remove();
+ }
+ return messageNumber <_maxMessageId-1;
+ }
+ });
_recoveredMessages.clear();
}