diff options
Diffstat (limited to 'qpid/java/bdbstore')
3 files changed, 35 insertions, 25 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index cf187fe1e9..be592a0d42 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -55,7 +55,6 @@ import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.store.StoreFuture; -import org.apache.qpid.server.store.StoredMemoryMessage; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.store.Xid; @@ -123,14 +122,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore long newMessageId = getNextMessageId(); - if (metaData.isPersistent()) - { - return (StoredMessage<T>) new StoredBDBMessage(newMessageId, metaData); - } - else - { - return new StoredMemoryMessage<T>(newMessageId, metaData); - } + return new StoredBDBMessage<T>(newMessageId, metaData); } public long getNextMessageId() @@ -1049,7 +1041,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore protected abstract Logger getLogger(); - private class StoredBDBMessage<T extends StorableMessageMetaData> implements StoredMessage<T> + class StoredBDBMessage<T extends StorableMessageMetaData> implements StoredMessage<T> { private final long _messageId; @@ -1177,8 +1169,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } - @Override - public synchronized StoreFuture flushToStore() + synchronized StoreFuture flushToStore() { if(!stored()) { @@ -1229,6 +1220,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore { private Transaction _txn; private int _storeSizeIncrease; + private final List<Runnable> _onCommitActions = new ArrayList<>(); private BDBTransaction() throws StoreException { @@ -1250,8 +1242,16 @@ public abstract class AbstractBDBMessageStore implements MessageStore if(message.getStoredMessage() instanceof StoredBDBMessage) { final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage(); - storedMessage.store(_txn); - _storeSizeIncrease += storedMessage.getMetaData().getContentSize(); + _onCommitActions.add(new Runnable() + { + @Override + public void run() + { + storedMessage.store(_txn); + _storeSizeIncrease += storedMessage.getMetaData().getContentSize(); + } + }); + } AbstractBDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber()); @@ -1269,16 +1269,25 @@ public abstract class AbstractBDBMessageStore implements MessageStore public void commitTran() throws StoreException { checkMessageStoreOpen(); - + doPreCommitActions(); AbstractBDBMessageStore.this.commitTranImpl(_txn, true); AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease); } + private void doPreCommitActions() + { + for(Runnable action : _onCommitActions) + { + action.run(); + } + _onCommitActions.clear(); + } + @Override public StoreFuture commitTranAsync() throws StoreException { checkMessageStoreOpen(); - + doPreCommitActions(); AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease); return AbstractBDBMessageStore.this.commitTranImpl(_txn, false); } @@ -1287,7 +1296,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore public void abortTran() throws StoreException { checkMessageStoreOpen(); - + _onCommitActions.clear(); AbstractBDBMessageStore.this.abortTran(_txn); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java index 8fb011152c..1f4cf45ce1 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java @@ -27,6 +27,7 @@ import com.sleepycat.bind.tuple.TupleBinding; import com.sleepycat.bind.tuple.TupleInput; import com.sleepycat.bind.tuple.TupleOutput; import org.apache.qpid.server.message.EnqueueableMessage; +import org.apache.qpid.server.store.MessageDurability; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.store.TransactionLogResource; @@ -131,9 +132,9 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction } @Override - public boolean isDurable() + public MessageDurability getMessageDurability() { - return true; + return MessageDurability.DEFAULT; } } } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java index a96dc8b142..bace585e56 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.server.store.berkeleydb; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.io.File; import java.nio.ByteBuffer; import java.util.Arrays; @@ -51,9 +54,6 @@ import org.apache.qpid.transport.MessageProperties; import org.apache.qpid.transport.MessageTransfer; import org.apache.qpid.util.FileUtils; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - /** * Subclass of MessageStoreTestCase which runs the standard tests from the superclass against * the BDB Store as well as additional tests specific to the BDB store-implementation. @@ -113,7 +113,7 @@ public class BDBMessageStoreTest extends MessageStoreTestCase storedMessage_0_8.addContent(0, firstContentBytes_0_8); storedMessage_0_8.addContent(firstContentBytes_0_8.limit(), secondContentBytes_0_8); - storedMessage_0_8.flushToStore(); + ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_8).flushToStore(); /* * Create and insert a 0-10 message (metadata and content) @@ -132,7 +132,7 @@ public class BDBMessageStoreTest extends MessageStoreTestCase long messageid_0_10 = storedMessage_0_10.getMessageNumber(); storedMessage_0_10.addContent(0, completeContentBody_0_10); - storedMessage_0_10.flushToStore(); + ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_10).flushToStore(); /* * reload the store only (read-only) @@ -387,7 +387,7 @@ public class BDBMessageStoreTest extends MessageStoreTestCase StoredMessage<MessageMetaData> storedMessage_0_8 = store.addMessage(messageMetaData_0_8); storedMessage_0_8.addContent(0, chunk1); - storedMessage_0_8.flushToStore(); + ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_8).flushToStore(); return storedMessage_0_8; } |
