summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-07-25 14:24:36 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-07-25 14:24:36 +0000
commit42bfb186da9e911c208f22dd5f6c794b9bddd859 (patch)
tree5c0d345cd36b6ef2d17f0abf39f247fc7fdc21c9 /qpid/java/bdbstore
parent9a08d3ffc0a21d501a33a2b318fca72f85d0c096 (diff)
downloadqpid-python-42bfb186da9e911c208f22dd5f6c794b9bddd859.tar.gz
QPID-4304 : [Java Broker] Add an attribute to queues - "messageDurability" - which controls whether message data is persisted or not. By default, depend on the persistence setting of the message, but allow an individual queue to declare that all (or no) messages should be persisted on the queue
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1613440 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java43
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java5
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java12
3 files changed, 35 insertions, 25 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index cf187fe1e9..be592a0d42 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -55,7 +55,6 @@ import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.StoreFuture;
-import org.apache.qpid.server.store.StoredMemoryMessage;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.store.Xid;
@@ -123,14 +122,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
long newMessageId = getNextMessageId();
- if (metaData.isPersistent())
- {
- return (StoredMessage<T>) new StoredBDBMessage(newMessageId, metaData);
- }
- else
- {
- return new StoredMemoryMessage<T>(newMessageId, metaData);
- }
+ return new StoredBDBMessage<T>(newMessageId, metaData);
}
public long getNextMessageId()
@@ -1049,7 +1041,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
protected abstract Logger getLogger();
- private class StoredBDBMessage<T extends StorableMessageMetaData> implements StoredMessage<T>
+ class StoredBDBMessage<T extends StorableMessageMetaData> implements StoredMessage<T>
{
private final long _messageId;
@@ -1177,8 +1169,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
}
- @Override
- public synchronized StoreFuture flushToStore()
+ synchronized StoreFuture flushToStore()
{
if(!stored())
{
@@ -1229,6 +1220,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
{
private Transaction _txn;
private int _storeSizeIncrease;
+ private final List<Runnable> _onCommitActions = new ArrayList<>();
private BDBTransaction() throws StoreException
{
@@ -1250,8 +1242,16 @@ public abstract class AbstractBDBMessageStore implements MessageStore
if(message.getStoredMessage() instanceof StoredBDBMessage)
{
final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage();
- storedMessage.store(_txn);
- _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
+ _onCommitActions.add(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ storedMessage.store(_txn);
+ _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
+ }
+ });
+
}
AbstractBDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber());
@@ -1269,16 +1269,25 @@ public abstract class AbstractBDBMessageStore implements MessageStore
public void commitTran() throws StoreException
{
checkMessageStoreOpen();
-
+ doPreCommitActions();
AbstractBDBMessageStore.this.commitTranImpl(_txn, true);
AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease);
}
+ private void doPreCommitActions()
+ {
+ for(Runnable action : _onCommitActions)
+ {
+ action.run();
+ }
+ _onCommitActions.clear();
+ }
+
@Override
public StoreFuture commitTranAsync() throws StoreException
{
checkMessageStoreOpen();
-
+ doPreCommitActions();
AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease);
return AbstractBDBMessageStore.this.commitTranImpl(_txn, false);
}
@@ -1287,7 +1296,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
public void abortTran() throws StoreException
{
checkMessageStoreOpen();
-
+ _onCommitActions.clear();
AbstractBDBMessageStore.this.abortTran(_txn);
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java
index 8fb011152c..1f4cf45ce1 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java
@@ -27,6 +27,7 @@ import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.TransactionLogResource;
@@ -131,9 +132,9 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction
}
@Override
- public boolean isDurable()
+ public MessageDurability getMessageDurability()
{
- return true;
+ return MessageDurability.DEFAULT;
}
}
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
index a96dc8b142..bace585e56 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.server.store.berkeleydb;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import java.io.File;
import java.nio.ByteBuffer;
import java.util.Arrays;
@@ -51,9 +54,6 @@ import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.MessageTransfer;
import org.apache.qpid.util.FileUtils;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
/**
* Subclass of MessageStoreTestCase which runs the standard tests from the superclass against
* the BDB Store as well as additional tests specific to the BDB store-implementation.
@@ -113,7 +113,7 @@ public class BDBMessageStoreTest extends MessageStoreTestCase
storedMessage_0_8.addContent(0, firstContentBytes_0_8);
storedMessage_0_8.addContent(firstContentBytes_0_8.limit(), secondContentBytes_0_8);
- storedMessage_0_8.flushToStore();
+ ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_8).flushToStore();
/*
* Create and insert a 0-10 message (metadata and content)
@@ -132,7 +132,7 @@ public class BDBMessageStoreTest extends MessageStoreTestCase
long messageid_0_10 = storedMessage_0_10.getMessageNumber();
storedMessage_0_10.addContent(0, completeContentBody_0_10);
- storedMessage_0_10.flushToStore();
+ ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_10).flushToStore();
/*
* reload the store only (read-only)
@@ -387,7 +387,7 @@ public class BDBMessageStoreTest extends MessageStoreTestCase
StoredMessage<MessageMetaData> storedMessage_0_8 = store.addMessage(messageMetaData_0_8);
storedMessage_0_8.addContent(0, chunk1);
- storedMessage_0_8.flushToStore();
+ ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_8).flushToStore();
return storedMessage_0_8;
}