summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore/src
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-06-09 11:55:32 +0000
committerKeith Wall <kwall@apache.org>2014-06-09 11:55:32 +0000
commit715eafb68075336fbc72d95d9a5e2b0f82d432f8 (patch)
tree927e9b3e9d2a8df2fd31e239e73455f80aaed541 /qpid/java/bdbstore/src
parente2093e6efc74e01d57df9ae6873d58ceb737055c (diff)
downloadqpid-python-715eafb68075336fbc72d95d9a5e2b0f82d432f8.tar.gz
QPID-5801: [Java Broker] BDB: Cache the sequence handler used for message sequence number
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1601351 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java34
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java8
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java69
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java68
4 files changed, 146 insertions, 33 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
index a116acf59f..279762eacd 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
@@ -26,7 +26,6 @@ import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -130,8 +129,6 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi
private volatile Committer _committer;
- private boolean _isMessageStoreProvider;
-
private String _storeLocation;
private final BDBMessageStore _messageStoreFacade = new BDBMessageStore();
private ConfiguredObject<?> _parent;
@@ -241,8 +238,8 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi
for (ConfiguredObjectRecord record : configuredObjects.values())
{
- boolean shoudlContinue = handler.handle(record);
- if (!shoudlContinue)
+ boolean shouldContinue = handler.handle(record);
+ if (!shouldContinue)
{
break;
}
@@ -670,27 +667,18 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi
public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData)
{
- Sequence mmdSeq = null;
- try
- {
- mmdSeq = getMessageMetaDataSeqDb().openSequence(null, MESSAGE_METADATA_SEQ_KEY, MESSAGE_METADATA_SEQ_CONFIG);
- long newMessageId = mmdSeq.get(null, 1);
+ Sequence mmdSeq = _environmentFacade.openSequence(getMessageMetaDataSeqDb(),
+ MESSAGE_METADATA_SEQ_KEY,
+ MESSAGE_METADATA_SEQ_CONFIG);
+ long newMessageId = mmdSeq.get(null, 1);
- if (metaData.isPersistent())
- {
- return (StoredMessage<T>) new StoredBDBMessage(newMessageId, metaData);
- }
- else
- {
- return new StoredMemoryMessage<T>(newMessageId, metaData);
- }
+ if (metaData.isPersistent())
+ {
+ return (StoredMessage<T>) new StoredBDBMessage(newMessageId, metaData);
}
- finally
+ else
{
- if (mmdSeq != null)
- {
- mmdSeq.close();
- }
+ return new StoredMemoryMessage<T>(newMessageId, metaData);
}
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
index c614ba1367..8e62b4c476 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
@@ -26,9 +26,12 @@ import java.util.Map;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.Sequence;
+import com.sleepycat.je.SequenceConfig;
import com.sleepycat.je.Transaction;
public interface EnvironmentFacade
@@ -43,7 +46,9 @@ public interface EnvironmentFacade
Environment getEnvironment();
- Database openDatabase(String name, DatabaseConfig databaseConfig);
+ Database openDatabase(String databaseName, DatabaseConfig databaseConfig);
+
+ Sequence openSequence(Database database, DatabaseEntry sequenceKey, SequenceConfig sequenceConfig);
Committer createCommitter(String name);
@@ -57,4 +62,5 @@ public interface EnvironmentFacade
void closeDatabase(String name);
void close();
+
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
index f41eca602b..ee7ea79e8e 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
@@ -21,9 +21,14 @@
package org.apache.qpid.server.store.berkeleydb;
import java.io.File;
+import java.net.ServerSocket;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.Sequence;
+import com.sleepycat.je.SequenceConfig;
+import com.sun.org.apache.xerces.internal.dom.DeepNodeListImpl;
import org.apache.log4j.Logger;
import com.sleepycat.je.Database;
@@ -40,6 +45,7 @@ public class StandardEnvironmentFacade implements EnvironmentFacade
private final String _storePath;
private final ConcurrentHashMap<String, Database> _cachedDatabases = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<DatabaseEntry, Sequence> _cachedSequences = new ConcurrentHashMap<>();
private Environment _environment;
@@ -105,20 +111,44 @@ public class StandardEnvironmentFacade implements EnvironmentFacade
@Override
public void close()
{
+ closeSequences();
closeDatabases();
closeEnvironment();
}
+ private void closeSequences()
+ {
+ RuntimeException firstThrownException = null;
+ for (DatabaseEntry sequenceKey : _cachedSequences.keySet())
+ {
+ try
+ {
+ closeSequence(sequenceKey);
+ }
+ catch(DatabaseException de)
+ {
+ if (firstThrownException == null)
+ {
+ firstThrownException = de;
+ }
+ }
+ }
+ if (firstThrownException != null)
+ {
+ throw firstThrownException;
+ }
+ }
+
private void closeDatabases()
{
RuntimeException firstThrownException = null;
- for (Database database : _cachedDatabases.values())
+ for (String databaseName : _cachedDatabases.keySet())
{
try
{
- database.close();
+ closeDatabase(databaseName);
}
- catch(RuntimeException e)
+ catch(DatabaseException e)
{
if (firstThrownException == null)
{
@@ -223,6 +253,39 @@ public class StandardEnvironmentFacade implements EnvironmentFacade
}
@Override
+ public Sequence openSequence(final Database database,
+ final DatabaseEntry sequenceKey,
+ final SequenceConfig sequenceConfig)
+ {
+ Sequence cachedSequence = _cachedSequences.get(sequenceKey);
+ if (cachedSequence == null)
+ {
+ Sequence handle = database.openSequence(null, sequenceKey, sequenceConfig);
+ Sequence existingHandle = _cachedSequences.putIfAbsent(sequenceKey, handle);
+ if (existingHandle == null)
+ {
+ cachedSequence = handle;
+ }
+ else
+ {
+ cachedSequence = existingHandle;
+ handle.close();
+ }
+ }
+ return cachedSequence;
+ }
+
+
+ private void closeSequence(final DatabaseEntry sequenceKey)
+ {
+ Sequence cachedHandle = _cachedSequences.remove(sequenceKey);
+ if (cachedHandle != null)
+ {
+ cachedHandle.close();
+ }
+ }
+
+ @Override
public void closeDatabase(final String name)
{
Database cachedHandle = _cachedDatabases.remove(name);
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index 748f10a01a..7a999374e8 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -42,6 +42,9 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.Sequence;
+import com.sleepycat.je.SequenceConfig;
import org.apache.log4j.Logger;
import org.apache.qpid.server.store.berkeleydb.CoalescingCommiter;
import org.apache.qpid.server.store.berkeleydb.Committer;
@@ -160,6 +163,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
private volatile SyncPolicy _messageStoreRemoteTransactionSyncronizationPolicy = REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY;
private final ConcurrentHashMap<String, Database> _cachedDatabases = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<DatabaseEntry, Sequence> _cachedSequences = new ConcurrentHashMap<>();
public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration)
{
@@ -242,6 +246,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
try
{
+ closeSequences();
closeDatabases();
}
finally
@@ -377,6 +382,38 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
}
}
+ @Override
+ public Sequence openSequence(final Database database,
+ final DatabaseEntry sequenceKey,
+ final SequenceConfig sequenceConfig)
+ {
+ Sequence cachedSequence = _cachedSequences.get(sequenceKey);
+ if (cachedSequence == null)
+ {
+ Sequence handle = database.openSequence(null, sequenceKey, sequenceConfig);
+ Sequence existingHandle = _cachedSequences.putIfAbsent(sequenceKey, handle);
+ if (existingHandle == null)
+ {
+ cachedSequence = handle;
+ }
+ else
+ {
+ cachedSequence = existingHandle;
+ handle.close();
+ }
+ }
+ return cachedSequence;
+ }
+
+
+ private void closeSequence(final DatabaseEntry sequenceKey)
+ {
+ Sequence cachedHandle = _cachedSequences.remove(sequenceKey);
+ if (cachedHandle != null)
+ {
+ cachedHandle.close();
+ }
+ }
@Override
public String getStoreLocation()
@@ -676,11 +713,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
createReplicationGroupAdmin().removeMember(nodeName);
}
- public void updateAddress(final String nodeName, final String newHostName, final int newPort)
- {
- createReplicationGroupAdmin().updateAddress(nodeName, newHostName, newPort);
- }
-
public long getJoinTime()
{
return _joinTime;
@@ -782,7 +814,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
@Override
public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException
{
- if (LOGGER.isInfoEnabled())
+ if (LOGGER.isDebugEnabled())
{
LOGGER.debug(
"When restarting a state change event is received on NOOP listener for state:"
@@ -794,6 +826,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
try
{
+ closeSequences();
closeDatabases();
}
catch(Exception e)
@@ -810,6 +843,29 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
}
+ private void closeSequences()
+ {
+ RuntimeException firstThrownException = null;
+ for (DatabaseEntry sequenceKey : _cachedSequences.keySet())
+ {
+ try
+ {
+ closeSequence(sequenceKey);
+ }
+ catch(DatabaseException de)
+ {
+ if (firstThrownException == null)
+ {
+ firstThrownException = de;
+ }
+ }
+ }
+ if (firstThrownException != null)
+ {
+ throw firstThrownException;
+ }
+ }
+
private void closeDatabases()
{
RuntimeException firstThrownException = null;