summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/bdbstore')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java43
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java5
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java12
3 files changed, 35 insertions, 25 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index cf187fe1e9..be592a0d42 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -55,7 +55,6 @@ import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.StoreFuture;
-import org.apache.qpid.server.store.StoredMemoryMessage;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.store.Xid;
@@ -123,14 +122,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
long newMessageId = getNextMessageId();
- if (metaData.isPersistent())
- {
- return (StoredMessage<T>) new StoredBDBMessage(newMessageId, metaData);
- }
- else
- {
- return new StoredMemoryMessage<T>(newMessageId, metaData);
- }
+ return new StoredBDBMessage<T>(newMessageId, metaData);
}
public long getNextMessageId()
@@ -1049,7 +1041,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
protected abstract Logger getLogger();
- private class StoredBDBMessage<T extends StorableMessageMetaData> implements StoredMessage<T>
+ class StoredBDBMessage<T extends StorableMessageMetaData> implements StoredMessage<T>
{
private final long _messageId;
@@ -1177,8 +1169,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
}
- @Override
- public synchronized StoreFuture flushToStore()
+ synchronized StoreFuture flushToStore()
{
if(!stored())
{
@@ -1229,6 +1220,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
{
private Transaction _txn;
private int _storeSizeIncrease;
+ private final List<Runnable> _onCommitActions = new ArrayList<>();
private BDBTransaction() throws StoreException
{
@@ -1250,8 +1242,16 @@ public abstract class AbstractBDBMessageStore implements MessageStore
if(message.getStoredMessage() instanceof StoredBDBMessage)
{
final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage();
- storedMessage.store(_txn);
- _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
+ _onCommitActions.add(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ storedMessage.store(_txn);
+ _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
+ }
+ });
+
}
AbstractBDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber());
@@ -1269,16 +1269,25 @@ public abstract class AbstractBDBMessageStore implements MessageStore
public void commitTran() throws StoreException
{
checkMessageStoreOpen();
-
+ doPreCommitActions();
AbstractBDBMessageStore.this.commitTranImpl(_txn, true);
AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease);
}
+ private void doPreCommitActions()
+ {
+ for(Runnable action : _onCommitActions)
+ {
+ action.run();
+ }
+ _onCommitActions.clear();
+ }
+
@Override
public StoreFuture commitTranAsync() throws StoreException
{
checkMessageStoreOpen();
-
+ doPreCommitActions();
AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease);
return AbstractBDBMessageStore.this.commitTranImpl(_txn, false);
}
@@ -1287,7 +1296,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
public void abortTran() throws StoreException
{
checkMessageStoreOpen();
-
+ _onCommitActions.clear();
AbstractBDBMessageStore.this.abortTran(_txn);
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java
index 8fb011152c..1f4cf45ce1 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java
@@ -27,6 +27,7 @@ import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.TransactionLogResource;
@@ -131,9 +132,9 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction
}
@Override
- public boolean isDurable()
+ public MessageDurability getMessageDurability()
{
- return true;
+ return MessageDurability.DEFAULT;
}
}
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
index a96dc8b142..bace585e56 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.server.store.berkeleydb;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import java.io.File;
import java.nio.ByteBuffer;
import java.util.Arrays;
@@ -51,9 +54,6 @@ import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.MessageTransfer;
import org.apache.qpid.util.FileUtils;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
/**
* Subclass of MessageStoreTestCase which runs the standard tests from the superclass against
* the BDB Store as well as additional tests specific to the BDB store-implementation.
@@ -113,7 +113,7 @@ public class BDBMessageStoreTest extends MessageStoreTestCase
storedMessage_0_8.addContent(0, firstContentBytes_0_8);
storedMessage_0_8.addContent(firstContentBytes_0_8.limit(), secondContentBytes_0_8);
- storedMessage_0_8.flushToStore();
+ ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_8).flushToStore();
/*
* Create and insert a 0-10 message (metadata and content)
@@ -132,7 +132,7 @@ public class BDBMessageStoreTest extends MessageStoreTestCase
long messageid_0_10 = storedMessage_0_10.getMessageNumber();
storedMessage_0_10.addContent(0, completeContentBody_0_10);
- storedMessage_0_10.flushToStore();
+ ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_10).flushToStore();
/*
* reload the store only (read-only)
@@ -387,7 +387,7 @@ public class BDBMessageStoreTest extends MessageStoreTestCase
StoredMessage<MessageMetaData> storedMessage_0_8 = store.addMessage(messageMetaData_0_8);
storedMessage_0_8.addContent(0, chunk1);
- storedMessage_0_8.flushToStore();
+ ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_8).flushToStore();
return storedMessage_0_8;
}