diff options
| author | Keith Wall <kwall@apache.org> | 2012-07-25 22:37:21 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2012-07-25 22:37:21 +0000 |
| commit | 02dc201883cf730b11754aa7861e1d540624ef42 (patch) | |
| tree | 54dec54a1789955f3ed5881339860af30897a316 /java/broker | |
| parent | d2b59f0faca606a677d3fa7e61667e54ae8c57b5 (diff) | |
| download | qpid-python-02dc201883cf730b11754aa7861e1d540624ef42.tar.gz | |
QPID-4164: Prevent the erroneous re-storing of recovered messages during move/copyMessage management functions.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1365832 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
| -rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java | 50 |
1 files changed, 29 insertions, 21 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java index 36ac8b3d40..bc9cda7f71 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java @@ -1575,7 +1575,7 @@ public class DerbyMessageStore implements MessageStore buf = buf.slice(); MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]]; StorableMessageMetaData metaData = type.getFactory().createMetaData(buf); - StoredDerbyMessage message = new StoredDerbyMessage(messageId, metaData, false); + StoredDerbyMessage message = new StoredDerbyMessage(messageId, metaData, true); messageHandler.message(message); } @@ -2037,6 +2037,8 @@ public class DerbyMessageStore implements MessageStore { private final long _messageId; + private final boolean _isRecovered; + private StorableMessageMetaData _metaData; private volatile SoftReference<StorableMessageMetaData> _metaDataRef; private byte[] _data; @@ -2045,21 +2047,18 @@ public class DerbyMessageStore implements MessageStore StoredDerbyMessage(long messageId, StorableMessageMetaData metaData) { - this(messageId, metaData, true); + this(messageId, metaData, false); } StoredDerbyMessage(long messageId, - StorableMessageMetaData metaData, boolean persist) + StorableMessageMetaData metaData, boolean isRecovered) { _messageId = messageId; - + _isRecovered = isRecovered; _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData); - if(persist) - { - _metaData = metaData; - } + _metaData = metaData; } @Override @@ -2140,16 +2139,16 @@ public class DerbyMessageStore implements MessageStore @Override public synchronized StoreFuture flushToStore() { + Connection conn = null; try { - if(_metaData != null) + if(!stored()) { - Connection conn = newConnection(); + conn = newConnection(); store(conn); conn.commit(); - conn.close(); storedSizeChange(getMetaData().getContentSize()); } } @@ -2161,12 +2160,24 @@ public class DerbyMessageStore implements MessageStore } throw new RuntimeException(e); } + finally + { + closeConnection(conn); + } return StoreFuture.IMMEDIATE_FUTURE; } + @Override + public void remove() + { + int delta = getMetaData().getContentSize(); + DerbyMessageStore.this.removeMessage(_messageId); + storedSizeChange(-delta); + } + private synchronized void store(final Connection conn) throws SQLException { - if(_metaData != null) + if (!stored()) { try { @@ -2179,20 +2190,17 @@ public class DerbyMessageStore implements MessageStore _metaData = null; _data = null; } - } - if(_logger.isDebugEnabled()) - { - _logger.debug("Storing message " + _messageId + " to store"); + if(_logger.isDebugEnabled()) + { + _logger.debug("Storing message " + _messageId + " to store"); + } } } - @Override - public void remove() + private boolean stored() { - int delta = getMetaData().getContentSize(); - DerbyMessageStore.this.removeMessage(_messageId); - storedSizeChange(-delta); + return _metaData == null || _isRecovered; } } |
