From 8368bc980e2989299f032749d62f7ae20ceec4da Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Sat, 21 Jan 2012 19:58:14 +0000 Subject: QPID-3774 : Work around Java BDB issue with cursors and flushLog git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1234410 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/store/berkeleydb/BDBMessageStore.java | 88 ++++++---------------- 1 file changed, 25 insertions(+), 63 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index 045fe3b1f2..8884a99923 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import com.sleepycat.bind.tuple.IntegerBinding; import com.sleepycat.bind.tuple.LongBinding; import com.sleepycat.bind.tuple.StringBinding; import com.sleepycat.je.*; @@ -899,7 +900,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore boolean complete = false; com.sleepycat.je.Transaction tx = null; - Cursor cursor = null; Random rand = null; int attempts = 0; try @@ -907,7 +907,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore do { tx = null; - cursor = null; try { tx = _environment.beginTransaction(null, null); @@ -936,74 +935,41 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore //now remove the content data from the store if there is any. - DatabaseEntry contentKeyEntry = new DatabaseEntry(); - MessageContentKey_5 mck = new MessageContentKey_5(messageId,0); - - TupleBinding contentKeyTupleBinding = new MessageContentKeyTB_5(); - contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry); - //Use a partial record for the value to prevent retrieving the - //data itself as we only need the key to identify what to remove. - DatabaseEntry value = new DatabaseEntry(); - value.setPartial(0, 0, true); - cursor = _messageContentDb.openCursor(tx, null); - - status = cursor.getSearchKeyRange(contentKeyEntry, value, LockMode.RMW); - while (status == OperationStatus.SUCCESS) + int offset = 0; + do { - mck = (MessageContentKey_5) contentKeyTupleBinding.entryToObject(contentKeyEntry); - - if(mck.getMessageId() != messageId) + DatabaseEntry contentKeyEntry = new DatabaseEntry(); + MessageContentKey_5 mck = new MessageContentKey_5(messageId,offset); + TupleBinding contentKeyTupleBinding = new MessageContentKeyTB_5(); + contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry); + //Use a partial record for the value to prevent retrieving the + //data itself as we only need the key to identify what to remove. + DatabaseEntry value = new DatabaseEntry(); + value.setPartial(0, 4, true); + + status = _messageContentDb.get(null,contentKeyEntry, value, LockMode.READ_COMMITTED); + + if(status == OperationStatus.SUCCESS) { - //we have exhausted all chunks for this message id, break - break; - } - else - { - status = cursor.delete(); - - if(status == OperationStatus.NOTFOUND) - { - cursor.close(); - cursor = null; - - tx.abort(); - throw new AMQStoreException("Content chunk offset" + mck.getOffset() + " not found for message " + messageId); - } + offset += IntegerBinding.entryToInt(value); + _messageContentDb.delete(tx, contentKeyEntry); if (_log.isDebugEnabled()) { _log.debug("Deleted content chunk offset " + mck.getOffset() + " for message " + messageId); } } - - status = cursor.getNext(contentKeyEntry, value, LockMode.RMW); } - - cursor.close(); - - cursor = null; + while (status == OperationStatus.SUCCESS); commit(tx, sync); complete = true; + tx = null; } catch (LockConflictException e) { - try - { - if(cursor != null) - { - cursor.close(); - } - } - catch(DatabaseException e1) - { - _log.warn("Unable to close cursor after LockConflictException", e1); - // rethrow the original log conflict exception, the secondary exception should already have - // been logged. - throw e; - } try { if(tx != null) @@ -1056,13 +1022,8 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { try { - if(cursor != null) - { - cursor.close(); - cursor = null; - } - tx.abort(); + tx = null; } catch (DatabaseException e1) { @@ -1074,15 +1035,16 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } finally { - if(cursor != null) + if (tx != null) { try { - cursor.close(); + tx.abort(); + tx = null; } - catch (DatabaseException e) + catch (DatabaseException e1) { - throw new AMQStoreException("Error closing cursor: " + e.getMessage(), e); + throw new AMQStoreException("Error aborting transaction " + e1, e1); } } } -- cgit v1.2.1