diff options
| author | Keith Wall <kwall@apache.org> | 2014-06-09 11:55:32 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2014-06-09 11:55:32 +0000 |
| commit | 715eafb68075336fbc72d95d9a5e2b0f82d432f8 (patch) | |
| tree | 927e9b3e9d2a8df2fd31e239e73455f80aaed541 /qpid/java/bdbstore/src | |
| parent | e2093e6efc74e01d57df9ae6873d58ceb737055c (diff) | |
| download | qpid-python-715eafb68075336fbc72d95d9a5e2b0f82d432f8.tar.gz | |
QPID-5801: [Java Broker] BDB: Cache the sequence handler used for message sequence number
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1601351 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src')
4 files changed, 146 insertions, 33 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java index a116acf59f..279762eacd 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java @@ -26,7 +26,6 @@ import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -130,8 +129,6 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi private volatile Committer _committer; - private boolean _isMessageStoreProvider; - private String _storeLocation; private final BDBMessageStore _messageStoreFacade = new BDBMessageStore(); private ConfiguredObject<?> _parent; @@ -241,8 +238,8 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi for (ConfiguredObjectRecord record : configuredObjects.values()) { - boolean shoudlContinue = handler.handle(record); - if (!shoudlContinue) + boolean shouldContinue = handler.handle(record); + if (!shouldContinue) { break; } @@ -670,27 +667,18 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData) { - Sequence mmdSeq = null; - try - { - mmdSeq = getMessageMetaDataSeqDb().openSequence(null, MESSAGE_METADATA_SEQ_KEY, MESSAGE_METADATA_SEQ_CONFIG); - long newMessageId = mmdSeq.get(null, 1); + Sequence mmdSeq = _environmentFacade.openSequence(getMessageMetaDataSeqDb(), + MESSAGE_METADATA_SEQ_KEY, + MESSAGE_METADATA_SEQ_CONFIG); + long newMessageId = mmdSeq.get(null, 1); - if (metaData.isPersistent()) - { - return (StoredMessage<T>) new StoredBDBMessage(newMessageId, metaData); - } - else - { - return new StoredMemoryMessage<T>(newMessageId, metaData); - } + if (metaData.isPersistent()) + { + return (StoredMessage<T>) new StoredBDBMessage(newMessageId, metaData); } - finally + else { - if (mmdSeq != null) - { - mmdSeq.close(); - } + return new StoredMemoryMessage<T>(newMessageId, metaData); } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java index c614ba1367..8e62b4c476 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java @@ -26,9 +26,12 @@ import java.util.Map; import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; +import com.sleepycat.je.DatabaseEntry; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.Environment; import com.sleepycat.je.EnvironmentConfig; +import com.sleepycat.je.Sequence; +import com.sleepycat.je.SequenceConfig; import com.sleepycat.je.Transaction; public interface EnvironmentFacade @@ -43,7 +46,9 @@ public interface EnvironmentFacade Environment getEnvironment(); - Database openDatabase(String name, DatabaseConfig databaseConfig); + Database openDatabase(String databaseName, DatabaseConfig databaseConfig); + + Sequence openSequence(Database database, DatabaseEntry sequenceKey, SequenceConfig sequenceConfig); Committer createCommitter(String name); @@ -57,4 +62,5 @@ public interface EnvironmentFacade void closeDatabase(String name); void close(); + } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java index f41eca602b..ee7ea79e8e 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java @@ -21,9 +21,14 @@ package org.apache.qpid.server.store.berkeleydb; import java.io.File; +import java.net.ServerSocket; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import com.sleepycat.je.DatabaseEntry; +import com.sleepycat.je.Sequence; +import com.sleepycat.je.SequenceConfig; +import com.sun.org.apache.xerces.internal.dom.DeepNodeListImpl; import org.apache.log4j.Logger; import com.sleepycat.je.Database; @@ -40,6 +45,7 @@ public class StandardEnvironmentFacade implements EnvironmentFacade private final String _storePath; private final ConcurrentHashMap<String, Database> _cachedDatabases = new ConcurrentHashMap<>(); + private final ConcurrentHashMap<DatabaseEntry, Sequence> _cachedSequences = new ConcurrentHashMap<>(); private Environment _environment; @@ -105,20 +111,44 @@ public class StandardEnvironmentFacade implements EnvironmentFacade @Override public void close() { + closeSequences(); closeDatabases(); closeEnvironment(); } + private void closeSequences() + { + RuntimeException firstThrownException = null; + for (DatabaseEntry sequenceKey : _cachedSequences.keySet()) + { + try + { + closeSequence(sequenceKey); + } + catch(DatabaseException de) + { + if (firstThrownException == null) + { + firstThrownException = de; + } + } + } + if (firstThrownException != null) + { + throw firstThrownException; + } + } + private void closeDatabases() { RuntimeException firstThrownException = null; - for (Database database : _cachedDatabases.values()) + for (String databaseName : _cachedDatabases.keySet()) { try { - database.close(); + closeDatabase(databaseName); } - catch(RuntimeException e) + catch(DatabaseException e) { if (firstThrownException == null) { @@ -223,6 +253,39 @@ public class StandardEnvironmentFacade implements EnvironmentFacade } @Override + public Sequence openSequence(final Database database, + final DatabaseEntry sequenceKey, + final SequenceConfig sequenceConfig) + { + Sequence cachedSequence = _cachedSequences.get(sequenceKey); + if (cachedSequence == null) + { + Sequence handle = database.openSequence(null, sequenceKey, sequenceConfig); + Sequence existingHandle = _cachedSequences.putIfAbsent(sequenceKey, handle); + if (existingHandle == null) + { + cachedSequence = handle; + } + else + { + cachedSequence = existingHandle; + handle.close(); + } + } + return cachedSequence; + } + + + private void closeSequence(final DatabaseEntry sequenceKey) + { + Sequence cachedHandle = _cachedSequences.remove(sequenceKey); + if (cachedHandle != null) + { + cachedHandle.close(); + } + } + + @Override public void closeDatabase(final String name) { Database cachedHandle = _cachedDatabases.remove(name); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index 748f10a01a..7a999374e8 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -42,6 +42,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; +import com.sleepycat.je.DatabaseEntry; +import com.sleepycat.je.Sequence; +import com.sleepycat.je.SequenceConfig; import org.apache.log4j.Logger; import org.apache.qpid.server.store.berkeleydb.CoalescingCommiter; import org.apache.qpid.server.store.berkeleydb.Committer; @@ -160,6 +163,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private volatile SyncPolicy _messageStoreRemoteTransactionSyncronizationPolicy = REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY; private final ConcurrentHashMap<String, Database> _cachedDatabases = new ConcurrentHashMap<>(); + private final ConcurrentHashMap<DatabaseEntry, Sequence> _cachedSequences = new ConcurrentHashMap<>(); public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration) { @@ -242,6 +246,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan try { + closeSequences(); closeDatabases(); } finally @@ -377,6 +382,38 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } } } + @Override + public Sequence openSequence(final Database database, + final DatabaseEntry sequenceKey, + final SequenceConfig sequenceConfig) + { + Sequence cachedSequence = _cachedSequences.get(sequenceKey); + if (cachedSequence == null) + { + Sequence handle = database.openSequence(null, sequenceKey, sequenceConfig); + Sequence existingHandle = _cachedSequences.putIfAbsent(sequenceKey, handle); + if (existingHandle == null) + { + cachedSequence = handle; + } + else + { + cachedSequence = existingHandle; + handle.close(); + } + } + return cachedSequence; + } + + + private void closeSequence(final DatabaseEntry sequenceKey) + { + Sequence cachedHandle = _cachedSequences.remove(sequenceKey); + if (cachedHandle != null) + { + cachedHandle.close(); + } + } @Override public String getStoreLocation() @@ -676,11 +713,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan createReplicationGroupAdmin().removeMember(nodeName); } - public void updateAddress(final String nodeName, final String newHostName, final int newPort) - { - createReplicationGroupAdmin().updateAddress(nodeName, newHostName, newPort); - } - public long getJoinTime() { return _joinTime; @@ -782,7 +814,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan @Override public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException { - if (LOGGER.isInfoEnabled()) + if (LOGGER.isDebugEnabled()) { LOGGER.debug( "When restarting a state change event is received on NOOP listener for state:" @@ -794,6 +826,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan try { + closeSequences(); closeDatabases(); } catch(Exception e) @@ -810,6 +843,29 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } } + private void closeSequences() + { + RuntimeException firstThrownException = null; + for (DatabaseEntry sequenceKey : _cachedSequences.keySet()) + { + try + { + closeSequence(sequenceKey); + } + catch(DatabaseException de) + { + if (firstThrownException == null) + { + firstThrownException = de; + } + } + } + if (firstThrownException != null) + { + throw firstThrownException; + } + } + private void closeDatabases() { RuntimeException firstThrownException = null; |
