diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-07-18 11:11:02 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-07-18 11:11:02 +0000 |
| commit | a4baab8f1cb12209d8cd624e28caee241553b252 (patch) | |
| tree | 79cffa16199744d4512469abdfb1dfdb65e746be /qpid/java/bdbstore | |
| parent | 10a83a7d00452a59bb1223307ce6cb542b4b6039 (diff) | |
| download | qpid-python-a4baab8f1cb12209d8cd624e28caee241553b252.tar.gz | |
QPID-4999 : [Java Broker] Strip selector arguments from persistent bindings to non-topic exchanges created by buggy old clients
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1504429 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore')
| -rw-r--r-- | qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java | 122 |
1 files changed, 98 insertions, 24 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index f6b7e1790f..017b02ac7f 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -21,20 +21,10 @@ package org.apache.qpid.server.store.berkeleydb; import com.sleepycat.bind.tuple.ByteBinding; +import com.sleepycat.bind.tuple.IntegerBinding; import com.sleepycat.bind.tuple.LongBinding; -import com.sleepycat.je.CheckpointConfig; -import com.sleepycat.je.Cursor; -import com.sleepycat.je.Database; -import com.sleepycat.je.DatabaseConfig; -import com.sleepycat.je.DatabaseEntry; -import com.sleepycat.je.DatabaseException; -import com.sleepycat.je.Environment; -import com.sleepycat.je.EnvironmentConfig; -import com.sleepycat.je.ExceptionEvent; -import com.sleepycat.je.ExceptionListener; -import com.sleepycat.je.LockConflictException; -import com.sleepycat.je.LockMode; -import com.sleepycat.je.OperationStatus; +import com.sleepycat.je.*; +import com.sleepycat.je.Transaction; import java.io.File; import java.lang.ref.SoftReference; import java.nio.ByteBuffer; @@ -85,15 +75,17 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo private Environment _environment; - private String CONFIGURED_OBJECTS = "CONFIGURED_OBJECTS"; - private String MESSAGEMETADATADB_NAME = "MESSAGE_METADATA"; - private String MESSAGECONTENTDB_NAME = "MESSAGE_CONTENT"; - private String DELIVERYDB_NAME = "QUEUE_ENTRIES"; - private String BRIDGEDB_NAME = "BRIDGES"; - private String LINKDB_NAME = "LINKS"; - private String XIDDB_NAME = "XIDS"; + private static String CONFIGURED_OBJECTS = "CONFIGURED_OBJECTS"; + private static String MESSAGEMETADATADB_NAME = "MESSAGE_METADATA"; + private static String MESSAGECONTENTDB_NAME = "MESSAGE_CONTENT"; + private static String DELIVERYDB_NAME = "QUEUE_ENTRIES"; + private static String BRIDGEDB_NAME = "BRIDGES"; + private static String LINKDB_NAME = "LINKS"; + private static String XIDDB_NAME = "XIDS"; + private static String CONFIG_VERSION_DB = "CONFIG_VERSION"; private Database _configuredObjectsDb; + private Database _configVersionDb; private Database _messageMetaDataDb; private Database _messageContentDb; private Database _deliveryDb; @@ -326,6 +318,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo dbConfig.setReadOnly(false); _configuredObjectsDb = openDatabase(CONFIGURED_OBJECTS, dbConfig); + _configVersionDb = openDatabase(CONFIG_VERSION_DB, dbConfig); _messageMetaDataDb = openDatabase(MESSAGEMETADATADB_NAME, dbConfig); _messageContentDb = openDatabase(MESSAGECONTENTDB_NAME, dbConfig); _deliveryDb = openDatabase(DELIVERYDB_NAME, dbConfig); @@ -426,10 +419,15 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo { try { - recoveryHandler.beginConfigurationRecovery(this); + final int configVersion = getConfigVersion(); + recoveryHandler.beginConfigurationRecovery(this, configVersion); loadConfiguredObjects(recoveryHandler); - recoveryHandler.completeConfigurationRecovery(); + final int newConfigVersion = recoveryHandler.completeConfigurationRecovery(); + if(newConfigVersion != configVersion) + { + updateConfigVersion(newConfigVersion); + } } catch (DatabaseException e) { @@ -438,6 +436,66 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo } + private void updateConfigVersion(int newConfigVersion) throws AMQStoreException + { + Cursor cursor = null; + try + { + Transaction txn = _environment.beginTransaction(null, null); + cursor = _configVersionDb.openCursor(txn, null); + DatabaseEntry key = new DatabaseEntry(); + ByteBinding.byteToEntry((byte) 0,key); + DatabaseEntry value = new DatabaseEntry(); + + while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + IntegerBinding.intToEntry(newConfigVersion, value); + OperationStatus status = cursor.put(key, value); + if (status != OperationStatus.SUCCESS) + { + throw new AMQStoreException("Error setting config version: " + status); + } + } + cursor.close(); + cursor = null; + txn.commit(); + } + finally + { + closeCursorSafely(cursor); + } + + } + + private int getConfigVersion() throws AMQStoreException + { + Cursor cursor = null; + try + { + cursor = _configVersionDb.openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + DatabaseEntry value = new DatabaseEntry(); + while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + return IntegerBinding.entryToInt(value); + } + + // Insert 0 as the default config version + IntegerBinding.intToEntry(0,value); + ByteBinding.byteToEntry((byte) 0,key); + OperationStatus status = _configVersionDb.put(null, key, value); + if (status != OperationStatus.SUCCESS) + { + throw new AMQStoreException("Error initialising config version: " + status); + } + return 0; + } + finally + { + closeCursorSafely(cursor); + } + } + private void loadConfiguredObjects(ConfigurationRecoveryHandler crh) throws DatabaseException { Cursor cursor = null; @@ -750,9 +808,25 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo } } + @Override public void update(UUID id, String type, Map<String, Object> attributes) throws AMQStoreException { + update(id, type, attributes, null); + } + + public void update(ConfiguredObjectRecord... records) throws AMQStoreException + { + com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null); + for(ConfiguredObjectRecord record : records) + { + update(record.getId(), record.getType(), record.getAttributes(), txn); + } + txn.commit(); + } + + private void update(UUID id, String type, Map<String, Object> attributes, com.sleepycat.je.Transaction txn) throws AMQStoreException + { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Updating " +type + ", id: " + id); @@ -768,14 +842,14 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo DatabaseEntry newValue = new DatabaseEntry(); ConfiguredObjectBinding configuredObjectBinding = ConfiguredObjectBinding.getInstance(); - OperationStatus status = _configuredObjectsDb.get(null, key, value, LockMode.DEFAULT); + OperationStatus status = _configuredObjectsDb.get(txn, key, value, LockMode.DEFAULT); if (status == OperationStatus.SUCCESS) { ConfiguredObjectRecord newQueueRecord = new ConfiguredObjectRecord(id, type, attributes); // write the updated entry to the store configuredObjectBinding.objectToEntry(newQueueRecord, newValue); - status = _configuredObjectsDb.put(null, key, newValue); + status = _configuredObjectsDb.put(txn, key, newValue); if (status != OperationStatus.SUCCESS) { throw new AMQStoreException("Error updating queue details within the store: " + status); |
