summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-07-18 11:11:02 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-07-18 11:11:02 +0000
commita4baab8f1cb12209d8cd624e28caee241553b252 (patch)
tree79cffa16199744d4512469abdfb1dfdb65e746be /qpid/java/bdbstore
parent10a83a7d00452a59bb1223307ce6cb542b4b6039 (diff)
downloadqpid-python-a4baab8f1cb12209d8cd624e28caee241553b252.tar.gz
QPID-4999 : [Java Broker] Strip selector arguments from persistent bindings to non-topic exchanges created by buggy old clients
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1504429 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java122
1 files changed, 98 insertions, 24 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index f6b7e1790f..017b02ac7f 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -21,20 +21,10 @@
package org.apache.qpid.server.store.berkeleydb;
import com.sleepycat.bind.tuple.ByteBinding;
+import com.sleepycat.bind.tuple.IntegerBinding;
import com.sleepycat.bind.tuple.LongBinding;
-import com.sleepycat.je.CheckpointConfig;
-import com.sleepycat.je.Cursor;
-import com.sleepycat.je.Database;
-import com.sleepycat.je.DatabaseConfig;
-import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.Environment;
-import com.sleepycat.je.EnvironmentConfig;
-import com.sleepycat.je.ExceptionEvent;
-import com.sleepycat.je.ExceptionListener;
-import com.sleepycat.je.LockConflictException;
-import com.sleepycat.je.LockMode;
-import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.*;
+import com.sleepycat.je.Transaction;
import java.io.File;
import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
@@ -85,15 +75,17 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
private Environment _environment;
- private String CONFIGURED_OBJECTS = "CONFIGURED_OBJECTS";
- private String MESSAGEMETADATADB_NAME = "MESSAGE_METADATA";
- private String MESSAGECONTENTDB_NAME = "MESSAGE_CONTENT";
- private String DELIVERYDB_NAME = "QUEUE_ENTRIES";
- private String BRIDGEDB_NAME = "BRIDGES";
- private String LINKDB_NAME = "LINKS";
- private String XIDDB_NAME = "XIDS";
+ private static String CONFIGURED_OBJECTS = "CONFIGURED_OBJECTS";
+ private static String MESSAGEMETADATADB_NAME = "MESSAGE_METADATA";
+ private static String MESSAGECONTENTDB_NAME = "MESSAGE_CONTENT";
+ private static String DELIVERYDB_NAME = "QUEUE_ENTRIES";
+ private static String BRIDGEDB_NAME = "BRIDGES";
+ private static String LINKDB_NAME = "LINKS";
+ private static String XIDDB_NAME = "XIDS";
+ private static String CONFIG_VERSION_DB = "CONFIG_VERSION";
private Database _configuredObjectsDb;
+ private Database _configVersionDb;
private Database _messageMetaDataDb;
private Database _messageContentDb;
private Database _deliveryDb;
@@ -326,6 +318,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
dbConfig.setReadOnly(false);
_configuredObjectsDb = openDatabase(CONFIGURED_OBJECTS, dbConfig);
+ _configVersionDb = openDatabase(CONFIG_VERSION_DB, dbConfig);
_messageMetaDataDb = openDatabase(MESSAGEMETADATADB_NAME, dbConfig);
_messageContentDb = openDatabase(MESSAGECONTENTDB_NAME, dbConfig);
_deliveryDb = openDatabase(DELIVERYDB_NAME, dbConfig);
@@ -426,10 +419,15 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
{
try
{
- recoveryHandler.beginConfigurationRecovery(this);
+ final int configVersion = getConfigVersion();
+ recoveryHandler.beginConfigurationRecovery(this, configVersion);
loadConfiguredObjects(recoveryHandler);
- recoveryHandler.completeConfigurationRecovery();
+ final int newConfigVersion = recoveryHandler.completeConfigurationRecovery();
+ if(newConfigVersion != configVersion)
+ {
+ updateConfigVersion(newConfigVersion);
+ }
}
catch (DatabaseException e)
{
@@ -438,6 +436,66 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
}
+ private void updateConfigVersion(int newConfigVersion) throws AMQStoreException
+ {
+ Cursor cursor = null;
+ try
+ {
+ Transaction txn = _environment.beginTransaction(null, null);
+ cursor = _configVersionDb.openCursor(txn, null);
+ DatabaseEntry key = new DatabaseEntry();
+ ByteBinding.byteToEntry((byte) 0,key);
+ DatabaseEntry value = new DatabaseEntry();
+
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ IntegerBinding.intToEntry(newConfigVersion, value);
+ OperationStatus status = cursor.put(key, value);
+ if (status != OperationStatus.SUCCESS)
+ {
+ throw new AMQStoreException("Error setting config version: " + status);
+ }
+ }
+ cursor.close();
+ cursor = null;
+ txn.commit();
+ }
+ finally
+ {
+ closeCursorSafely(cursor);
+ }
+
+ }
+
+ private int getConfigVersion() throws AMQStoreException
+ {
+ Cursor cursor = null;
+ try
+ {
+ cursor = _configVersionDb.openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ return IntegerBinding.entryToInt(value);
+ }
+
+ // Insert 0 as the default config version
+ IntegerBinding.intToEntry(0,value);
+ ByteBinding.byteToEntry((byte) 0,key);
+ OperationStatus status = _configVersionDb.put(null, key, value);
+ if (status != OperationStatus.SUCCESS)
+ {
+ throw new AMQStoreException("Error initialising config version: " + status);
+ }
+ return 0;
+ }
+ finally
+ {
+ closeCursorSafely(cursor);
+ }
+ }
+
private void loadConfiguredObjects(ConfigurationRecoveryHandler crh) throws DatabaseException
{
Cursor cursor = null;
@@ -750,9 +808,25 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
}
}
+
@Override
public void update(UUID id, String type, Map<String, Object> attributes) throws AMQStoreException
{
+ update(id, type, attributes, null);
+ }
+
+ public void update(ConfiguredObjectRecord... records) throws AMQStoreException
+ {
+ com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null);
+ for(ConfiguredObjectRecord record : records)
+ {
+ update(record.getId(), record.getType(), record.getAttributes(), txn);
+ }
+ txn.commit();
+ }
+
+ private void update(UUID id, String type, Map<String, Object> attributes, com.sleepycat.je.Transaction txn) throws AMQStoreException
+ {
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Updating " +type + ", id: " + id);
@@ -768,14 +842,14 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
DatabaseEntry newValue = new DatabaseEntry();
ConfiguredObjectBinding configuredObjectBinding = ConfiguredObjectBinding.getInstance();
- OperationStatus status = _configuredObjectsDb.get(null, key, value, LockMode.DEFAULT);
+ OperationStatus status = _configuredObjectsDb.get(txn, key, value, LockMode.DEFAULT);
if (status == OperationStatus.SUCCESS)
{
ConfiguredObjectRecord newQueueRecord = new ConfiguredObjectRecord(id, type, attributes);
// write the updated entry to the store
configuredObjectBinding.objectToEntry(newQueueRecord, newValue);
- status = _configuredObjectsDb.put(null, key, newValue);
+ status = _configuredObjectsDb.put(txn, key, newValue);
if (status != OperationStatus.SUCCESS)
{
throw new AMQStoreException("Error updating queue details within the store: " + status);