summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/bdbstore/src/main')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java307
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java2
2 files changed, 187 insertions, 122 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index f900159808..1d8187401d 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.store.berkeleydb;
import java.io.File;
import java.lang.ref.SoftReference;
+import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedList;
@@ -32,6 +33,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.je.*;
import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQStoreException;
@@ -43,6 +46,7 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
import org.apache.qpid.server.logging.messages.MessageStoreMessages;
import org.apache.qpid.server.logging.messages.TransactionLogMessages;
+import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import org.apache.qpid.server.store.DurableConfigurationStore;
@@ -70,17 +74,6 @@ import org.apache.qpid.server.store.berkeleydb.tuples.QueueTupleBindingFactory;
import com.sleepycat.bind.EntryBinding;
import com.sleepycat.bind.tuple.ByteBinding;
import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.je.CheckpointConfig;
-import com.sleepycat.je.Cursor;
-import com.sleepycat.je.Database;
-import com.sleepycat.je.DatabaseConfig;
-import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.Environment;
-import com.sleepycat.je.EnvironmentConfig;
-import com.sleepycat.je.LockMode;
-import com.sleepycat.je.OperationStatus;
-import com.sleepycat.je.TransactionConfig;
/**
* BDBMessageStore implements a persistent {@link MessageStore} using the BDB high performance log.
@@ -91,7 +84,7 @@ import com.sleepycat.je.TransactionConfig;
* dequeue messages to queues. <tr><td> Generate message identifiers. </table>
*/
@SuppressWarnings({"unchecked"})
-public class BDBMessageStore implements MessageStore
+public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
private static final Logger _log = Logger.getLogger(BDBMessageStore.class);
@@ -205,18 +198,15 @@ public class BDBMessageStore implements MessageStore
Configuration storeConfiguration,
LogSubject logSubject) throws Exception
{
- _logSubject = logSubject;
- CurrentActor.get().message(_logSubject, ConfigStoreMessages.CREATED(this.getClass().getName()));
+ CurrentActor.get().message(logSubject, ConfigStoreMessages.CREATED(this.getClass().getName()));
- if(_configured)
+ if(!_configured)
{
- throw new Exception("ConfigStore already configured");
+ _logSubject = logSubject;
+ configure(name,storeConfiguration);
+ _configured = true;
+ stateTransition(State.CONFIGURING, State.CONFIGURED);
}
-
- configure(name,storeConfiguration);
-
- _configured = true;
- stateTransition(State.CONFIGURING, State.CONFIGURED);
recover(recoveryHandler);
stateTransition(State.RECOVERING, State.STARTED);
@@ -227,24 +217,31 @@ public class BDBMessageStore implements MessageStore
Configuration storeConfiguration,
LogSubject logSubject) throws Exception
{
- CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName()));
+ CurrentActor.get().message(logSubject, MessageStoreMessages.CREATED(this.getClass().getName()));
if(!_configured)
{
- throw new Exception("ConfigStore not configured");
+ _logSubject = logSubject;
+ configure(name,storeConfiguration);
+ _configured = true;
+ stateTransition(State.CONFIGURING, State.CONFIGURED);
}
-
+
recoverMessages(recoveryHandler);
}
public void configureTransactionLog(String name, TransactionLogRecoveryHandler recoveryHandler,
Configuration storeConfiguration, LogSubject logSubject) throws Exception
{
- CurrentActor.get().message(_logSubject, TransactionLogMessages.CREATED(this.getClass().getName()));
+ CurrentActor.get().message(logSubject, TransactionLogMessages.CREATED(this.getClass().getName()));
+
if(!_configured)
{
- throw new Exception("ConfigStore not configured");
+ _logSubject = logSubject;
+ configure(name,storeConfiguration);
+ _configured = true;
+ stateTransition(State.CONFIGURING, State.CONFIGURED);
}
recoverQueueEntries(recoveryHandler);
@@ -252,7 +249,7 @@ public class BDBMessageStore implements MessageStore
}
- public org.apache.qpid.server.store.TransactionLog.Transaction newTransaction()
+ public org.apache.qpid.server.store.MessageStore.Transaction newTransaction()
{
return new BDBTransaction();
}
@@ -686,8 +683,6 @@ public class BDBMessageStore implements MessageStore
{
cursor = _messageMetaDataDb.openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);;
-
DatabaseEntry value = new DatabaseEntry();
EntryBinding valueBinding = _metaDataTupleBindingFactory.getInstance();
@@ -695,7 +690,7 @@ public class BDBMessageStore implements MessageStore
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
- long messageId = (Long) keyBinding.entryToObject(key);
+ long messageId = LongBinding.entryToLong(key);
StorableMessageMetaData metaData = (StorableMessageMetaData) valueBinding.entryToObject(value);
StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, false);
@@ -781,10 +776,15 @@ public class BDBMessageStore implements MessageStore
*
* @param messageId Identifies the message to remove.
*
- * @throws AMQInternalException If the operation fails for any reason.
+ * @throws AMQStoreException If the operation fails for any reason.
*/
- public void removeMessage(Long messageId) throws AMQStoreException
+ public void removeMessage(long messageId) throws AMQStoreException
+ {
+ removeMessage(messageId, true);
+ }
+ public void removeMessage(long messageId, boolean sync) throws AMQStoreException
{
+
// _log.debug("public void removeMessage(Long messageId = " + messageId): called");
com.sleepycat.je.Transaction tx = null;
@@ -796,8 +796,7 @@ public class BDBMessageStore implements MessageStore
//remove the message meta data from the store
DatabaseEntry key = new DatabaseEntry();
- EntryBinding metaKeyBindingTuple = TupleBinding.getPrimitiveBinding(Long.class);
- metaKeyBindingTuple.objectToEntry(messageId, key);
+ LongBinding.longToEntry(messageId, key);
if (_log.isDebugEnabled())
{
@@ -808,9 +807,8 @@ public class BDBMessageStore implements MessageStore
OperationStatus status = _messageMetaDataDb.delete(tx, key);
if (status == OperationStatus.NOTFOUND)
{
- tx.abort();
-
- throw new AMQStoreException("Message metadata not found for message id " + messageId);
+ _log.info("Message not found (attempt to remove failed - probably application initiated rollback) " +
+ messageId);
}
if (_log.isDebugEnabled())
@@ -868,7 +866,7 @@ public class BDBMessageStore implements MessageStore
cursor.close();
cursor = null;
- commit(tx, true);
+ commit(tx, sync);
}
catch (DatabaseException e)
{
@@ -1174,11 +1172,12 @@ public class BDBMessageStore implements MessageStore
*
* @throws AMQStoreException If the operation fails for any reason.
*/
- public void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, Long messageId) throws AMQStoreException
+ public void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue,
+ long messageId) throws AMQStoreException
{
// _log.debug("public void enqueueMessage(Transaction tx = " + tx + ", AMQShortString name = " + name + ", Long messageId): called");
- AMQShortString name = new AMQShortString(queue.getResourceName());
+ AMQShortString name = AMQShortString.valueOf(queue.getResourceName());
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = new QueueEntryTB();
@@ -1212,7 +1211,8 @@ public class BDBMessageStore implements MessageStore
*
* @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
*/
- public void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, Long messageId) throws AMQStoreException
+ public void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue,
+ long messageId) throws AMQStoreException
{
AMQShortString name = new AMQShortString(queue.getResourceName());
@@ -1383,7 +1383,7 @@ public class BDBMessageStore implements MessageStore
*
* @return A fresh message id.
*/
- public Long getNewMessageId()
+ public long getNewMessageId()
{
return _messageId.incrementAndGet();
}
@@ -1398,7 +1398,7 @@ public class BDBMessageStore implements MessageStore
*
* @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
*/
- protected void addContent(final com.sleepycat.je.Transaction tx, Long messageId, int offset,
+ protected void addContent(final com.sleepycat.je.Transaction tx, long messageId, int offset,
ByteBuffer contentBody) throws AMQStoreException
{
DatabaseEntry key = new DatabaseEntry();
@@ -1436,7 +1436,8 @@ public class BDBMessageStore implements MessageStore
*
* @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
*/
- private void storeMetaData(final com.sleepycat.je.Transaction tx, Long messageId, StorableMessageMetaData messageMetaData)
+ private void storeMetaData(final com.sleepycat.je.Transaction tx, long messageId,
+ StorableMessageMetaData messageMetaData)
throws AMQStoreException
{
if (_log.isDebugEnabled())
@@ -1446,8 +1447,7 @@ public class BDBMessageStore implements MessageStore
}
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);
- keyBinding.objectToEntry(messageId, key);
+ LongBinding.longToEntry(messageId, key);
DatabaseEntry value = new DatabaseEntry();
TupleBinding messageBinding = _metaDataTupleBindingFactory.getInstance();
@@ -1475,7 +1475,7 @@ public class BDBMessageStore implements MessageStore
*
* @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
*/
- public StorableMessageMetaData getMessageMetaData(Long messageId) throws AMQStoreException
+ public StorableMessageMetaData getMessageMetaData(long messageId) throws AMQStoreException
{
if (_log.isDebugEnabled())
{
@@ -1484,8 +1484,7 @@ public class BDBMessageStore implements MessageStore
}
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);
- keyBinding.objectToEntry(messageId, key);
+ LongBinding.longToEntry(messageId, key);
DatabaseEntry value = new DatabaseEntry();
TupleBinding messageBinding = _metaDataTupleBindingFactory.getInstance();
@@ -1519,7 +1518,7 @@ public class BDBMessageStore implements MessageStore
*
* @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
*/
- public int getContent(Long messageId, int offset, ByteBuffer dst) throws AMQStoreException
+ public int getContent(long messageId, int offset, ByteBuffer dst) throws AMQStoreException
{
DatabaseEntry contentKeyEntry = new DatabaseEntry();
@@ -1778,7 +1777,6 @@ public class BDBMessageStore implements MessageStore
{
_log.debug("public synchronized void complete(): called (Transaction = " + _tx + ")");
}
-
_complete = true;
notifyAll();
@@ -1799,7 +1797,7 @@ public class BDBMessageStore implements MessageStore
{
//_log.debug("public void commit(): called");
- _commitThread.addJob(this);
+ _commitThread.addJob(this, _syncCommit);
if(!_syncCommit)
{
@@ -1807,28 +1805,14 @@ public class BDBMessageStore implements MessageStore
return;
}
- synchronized (BDBCommitFuture.this)
- {
- while (!_complete)
- {
- try
- {
- wait(250);
- }
- catch (InterruptedException e)
- {
- // _log.error("Unexpected thread interruption: " + e, e);
- throw new RuntimeException(e);
- }
- }
+ waitForCompletion();
+ // _log.debug("Commit completed, _databaseException = " + _databaseException);
- // _log.debug("Commit completed, _databaseException = " + _databaseException);
-
- if (_databaseException != null)
- {
- throw _databaseException;
- }
+ if (_databaseException != null)
+ {
+ throw _databaseException;
}
+
}
public synchronized boolean isComplete()
@@ -1836,10 +1820,11 @@ public class BDBMessageStore implements MessageStore
return _complete;
}
- public void waitForCompletion()
+ public synchronized void waitForCompletion()
{
while (!isComplete())
{
+ _commitThread.explicitNotify();
try
{
wait(250);
@@ -1866,7 +1851,7 @@ public class BDBMessageStore implements MessageStore
// private final Logger _log = Logger.getLogger(CommitThread.class);
private final AtomicBoolean _stopped = new AtomicBoolean(false);
- private final AtomicReference<Queue<BDBCommitFuture>> _jobQueue = new AtomicReference<Queue<BDBCommitFuture>>(new ConcurrentLinkedQueue<BDBCommitFuture>());
+ private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>();
private final CheckpointConfig _config = new CheckpointConfig();
private final Object _lock = new Object();
@@ -1877,6 +1862,14 @@ public class BDBMessageStore implements MessageStore
}
+ public void explicitNotify()
+ {
+ synchronized (_lock)
+ {
+ _lock.notify();
+ }
+ }
+
public void run()
{
while (!_stopped.get())
@@ -1905,24 +1898,25 @@ public class BDBMessageStore implements MessageStore
{
// _log.debug("private void processJobs(): called");
- // we replace the old queue atomically with a new one and this avoids any need to
- // copy elements out of the queue
- Queue<BDBCommitFuture> jobs = _jobQueue.getAndSet(new ConcurrentLinkedQueue<BDBCommitFuture>());
+ int size = _jobQueue.size();
try
{
- // _environment.checkpoint(_config);
+ //TODO - upgrade to BDB 5.0, then use: _environment.flushLog(true);
_environment.sync();
- for (BDBCommitFuture commit : jobs)
+ for(int i = 0; i < size; i++)
{
+ BDBCommitFuture commit = _jobQueue.poll();
commit.complete();
}
+
}
catch (DatabaseException e)
{
- for (BDBCommitFuture commit : jobs)
+ for(int i = 0; i < size; i++)
{
+ BDBCommitFuture commit = _jobQueue.poll();
commit.abort(e);
}
}
@@ -1931,15 +1925,19 @@ public class BDBMessageStore implements MessageStore
private boolean hasJobs()
{
- return !_jobQueue.get().isEmpty();
+ return !_jobQueue.isEmpty();
}
- public void addJob(BDBCommitFuture commit)
+ public void addJob(BDBCommitFuture commit, final boolean sync)
{
- synchronized (_lock)
+
+ _jobQueue.add(commit);
+ if(sync)
{
- _jobQueue.get().add(commit);
- _lock.notifyAll();
+ synchronized (_lock)
+ {
+ _lock.notifyAll();
+ }
}
}
@@ -1959,7 +1957,10 @@ public class BDBMessageStore implements MessageStore
private final long _messageId;
private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
- private com.sleepycat.je.Transaction _txn;
+
+ private StorableMessageMetaData _metaData;
+ private volatile SoftReference<byte[]> _dataRef;
+ private byte[] _data;
StoredBDBMessage(long messageId, StorableMessageMetaData metaData)
{
@@ -1973,22 +1974,15 @@ public class BDBMessageStore implements MessageStore
try
{
_messageId = messageId;
+ _metaData = metaData;
_metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
- if(persist)
- {
- _txn = _environment.beginTransaction(null, null);
- storeMetaData(_txn, messageId, metaData);
- }
+
}
catch (DatabaseException e)
{
throw new RuntimeException(e);
}
- catch (AMQStoreException e)
- {
- throw new RuntimeException(e);
- }
}
@@ -2018,58 +2012,114 @@ public class BDBMessageStore implements MessageStore
public void addContent(int offsetInMessage, java.nio.ByteBuffer src)
{
- try
+ src = src.slice();
+
+ if(_data == null)
{
- BDBMessageStore.this.addContent(_txn, _messageId, offsetInMessage, src);
+ _data = new byte[src.remaining()];
+ _dataRef = new SoftReference<byte[]>(_data);
+ src.duplicate().get(_data);
}
- catch (AMQStoreException e)
+ else
{
- throw new RuntimeException(e);
+ byte[] oldData = _data;
+ _data = new byte[oldData.length + src.remaining()];
+ _dataRef = new SoftReference<byte[]>(_data);
+
+ System.arraycopy(oldData,0,_data,0,oldData.length);
+ src.duplicate().get(_data, oldData.length, src.remaining());
}
+
}
public int getContent(int offsetInMessage, java.nio.ByteBuffer dst)
{
- try
+ byte[] data = _dataRef == null ? null : _dataRef.get();
+ if(data != null)
{
- return BDBMessageStore.this.getContent(_messageId, offsetInMessage, dst);
+ int length = Math.min(dst.remaining(), data.length - offsetInMessage);
+ dst.put(data, offsetInMessage, length);
+ return length;
}
- catch (AMQStoreException e)
+ else
{
- throw new RuntimeException(e);
+ try
+ {
+ return BDBMessageStore.this.getContent(_messageId, offsetInMessage, dst);
+ }
+ catch (AMQStoreException e)
+ {
+ throw new RuntimeException(e);
+ }
}
}
- public StoreFuture flushToStore()
+ public ByteBuffer getContent(int offsetInMessage, int size)
{
- try
+ byte[] data = _dataRef == null ? null : _dataRef.get();
+ if(data != null)
{
- if(_txn != null)
- {
- //if(_log.isDebugEnabled())
- //{
- // _log.debug("Flushing message " + _messageId + " to store");
- //}
- BDBMessageStore.this.commitTranImpl(_txn, true);
- }
+ return ByteBuffer.wrap(data,offsetInMessage,size);
}
- catch (AMQStoreException e)
+ else
{
- throw new RuntimeException(e);
+ ByteBuffer buf = ByteBuffer.allocate(size);
+ getContent(offsetInMessage, buf);
+ buf.position(0);
+ return buf;
}
- finally
+ }
+
+ synchronized void store(com.sleepycat.je.Transaction txn)
+ {
+
+ if(_metaData != null)
{
- _txn = null;
+ try
+ {
+ _dataRef = new SoftReference<byte[]>(_data);
+ BDBMessageStore.this.storeMetaData(txn, _messageId, _metaData);
+ BDBMessageStore.this.addContent(txn, _messageId, 0,
+ _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data));
+ }
+ catch(DatabaseException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (AMQStoreException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (RuntimeException e)
+ {
+ e.printStackTrace();
+ throw e;
+ }
+ finally
+ {
+ _metaData = null;
+ _data = null;
+ }
+ }
+ }
+
+ public synchronized StoreFuture flushToStore()
+ {
+ if(_metaData != null)
+ {
+ com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null);
+ store(txn);
+ BDBMessageStore.this.commit(txn,true);
+
}
return IMMEDIATE_FUTURE;
}
public void remove()
{
- flushToStore();
try
{
- BDBMessageStore.this.removeMessage(_messageId);
+ BDBMessageStore.this.removeMessage(_messageId, false);
}
catch (AMQStoreException e)
{
@@ -2094,12 +2144,27 @@ public class BDBMessageStore implements MessageStore
}
}
- public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+ public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
+ {
+ if(message.getStoredMessage() instanceof StoredBDBMessage)
+ {
+ ((StoredBDBMessage)message.getStoredMessage()).store(_txn);
+ }
+
+ BDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber());
+ }
+
+ public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
+ {
+ BDBMessageStore.this.dequeueMessage(_txn, queue, message.getMessageNumber());
+ }
+
+ public void enqueueMessage(TransactionLogResource queue, long messageId) throws AMQStoreException
{
BDBMessageStore.this.enqueueMessage(_txn, queue, messageId);
}
- public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+ public void dequeueMessage(TransactionLogResource queue, long messageId) throws AMQStoreException
{
BDBMessageStore.this.dequeueMessage(_txn, queue, messageId);
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java
index 975e558874..68f1e7ce6f 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java
@@ -33,7 +33,7 @@ public class QueueEntryTB extends TupleBinding<QueueEntryKey>
public QueueEntryKey entryToObject(TupleInput tupleInput)
{
AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
- Long messageId = tupleInput.readLong();
+ long messageId = tupleInput.readLong();
return new QueueEntryKey(queueName, messageId);
}