diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-07-25 15:25:32 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-07-25 15:25:32 +0000 |
| commit | 6b02c4c08fdc26de6e048357111d9e0b85ab4927 (patch) | |
| tree | e18a361536601b5f837b110e5a5651f1bdd7b487 /qpid/java | |
| parent | 42bfb186da9e911c208f22dd5f6c794b9bddd859 (diff) | |
| download | qpid-python-6b02c4c08fdc26de6e048357111d9e0b85ab4927.tar.gz | |
QPID-5907 : [Java Broker] Remove unreferenced messages from the store in asynchronous store recoverer process
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1613449 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
| -rw-r--r-- | qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java | 21 |
1 files changed, 19 insertions, 2 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java index 9742404225..750efc23ae 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java @@ -43,6 +43,7 @@ import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.store.handler.DistributedTransactionHandler; +import org.apache.qpid.server.store.handler.MessageHandler; import org.apache.qpid.server.store.handler.MessageInstanceHandler; import org.apache.qpid.server.txn.DtxBranch; import org.apache.qpid.server.txn.DtxRegistry; @@ -141,10 +142,26 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer private synchronized void completeRecovery() { // at this point nothing should be writing to the map of recovered messages - for (MessageReference<? extends ServerMessage<?>> entry : _recoveredMessages.values()) + for (Map.Entry<Long,MessageReference<? extends ServerMessage<?>>> entry : _recoveredMessages.entrySet()) { - entry.release(); + entry.getValue().release(); + entry.setValue(null); // free up any memory associated with the reference object } + getStore().visitMessages(new MessageHandler() + { + @Override + public boolean handle(final StoredMessage<?> storedMessage) + { + + long messageNumber = storedMessage.getMessageNumber(); + if(!_recoveredMessages.containsKey(messageNumber)) + { + _logger.info("Message id " + messageNumber + " in store, but not in any queue - removing...."); + storedMessage.remove(); + } + return messageNumber <_maxMessageId-1; + } + }); _recoveredMessages.clear(); } |
