From a4baab8f1cb12209d8cd624e28caee241553b252 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Thu, 18 Jul 2013 11:11:02 +0000 Subject: QPID-4999 : [Java Broker] Strip selector arguments from persistent bindings to non-topic exchanges created by buggy old clients git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1504429 13f79535-47bb-0310-9956-ffa450edef68 --- .../store/berkeleydb/AbstractBDBMessageStore.java | 122 +++++++++++++++++---- 1 file changed, 98 insertions(+), 24 deletions(-) (limited to 'qpid/java/bdbstore') diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index f6b7e1790f..017b02ac7f 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -21,20 +21,10 @@ package org.apache.qpid.server.store.berkeleydb; import com.sleepycat.bind.tuple.ByteBinding; +import com.sleepycat.bind.tuple.IntegerBinding; import com.sleepycat.bind.tuple.LongBinding; -import com.sleepycat.je.CheckpointConfig; -import com.sleepycat.je.Cursor; -import com.sleepycat.je.Database; -import com.sleepycat.je.DatabaseConfig; -import com.sleepycat.je.DatabaseEntry; -import com.sleepycat.je.DatabaseException; -import com.sleepycat.je.Environment; -import com.sleepycat.je.EnvironmentConfig; -import com.sleepycat.je.ExceptionEvent; -import com.sleepycat.je.ExceptionListener; -import com.sleepycat.je.LockConflictException; -import com.sleepycat.je.LockMode; -import com.sleepycat.je.OperationStatus; +import com.sleepycat.je.*; +import com.sleepycat.je.Transaction; import java.io.File; import java.lang.ref.SoftReference; import java.nio.ByteBuffer; @@ -85,15 +75,17 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo private Environment _environment; - private String CONFIGURED_OBJECTS = "CONFIGURED_OBJECTS"; - private String MESSAGEMETADATADB_NAME = "MESSAGE_METADATA"; - private String MESSAGECONTENTDB_NAME = "MESSAGE_CONTENT"; - private String DELIVERYDB_NAME = "QUEUE_ENTRIES"; - private String BRIDGEDB_NAME = "BRIDGES"; - private String LINKDB_NAME = "LINKS"; - private String XIDDB_NAME = "XIDS"; + private static String CONFIGURED_OBJECTS = "CONFIGURED_OBJECTS"; + private static String MESSAGEMETADATADB_NAME = "MESSAGE_METADATA"; + private static String MESSAGECONTENTDB_NAME = "MESSAGE_CONTENT"; + private static String DELIVERYDB_NAME = "QUEUE_ENTRIES"; + private static String BRIDGEDB_NAME = "BRIDGES"; + private static String LINKDB_NAME = "LINKS"; + private static String XIDDB_NAME = "XIDS"; + private static String CONFIG_VERSION_DB = "CONFIG_VERSION"; private Database _configuredObjectsDb; + private Database _configVersionDb; private Database _messageMetaDataDb; private Database _messageContentDb; private Database _deliveryDb; @@ -326,6 +318,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo dbConfig.setReadOnly(false); _configuredObjectsDb = openDatabase(CONFIGURED_OBJECTS, dbConfig); + _configVersionDb = openDatabase(CONFIG_VERSION_DB, dbConfig); _messageMetaDataDb = openDatabase(MESSAGEMETADATADB_NAME, dbConfig); _messageContentDb = openDatabase(MESSAGECONTENTDB_NAME, dbConfig); _deliveryDb = openDatabase(DELIVERYDB_NAME, dbConfig); @@ -426,10 +419,15 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo { try { - recoveryHandler.beginConfigurationRecovery(this); + final int configVersion = getConfigVersion(); + recoveryHandler.beginConfigurationRecovery(this, configVersion); loadConfiguredObjects(recoveryHandler); - recoveryHandler.completeConfigurationRecovery(); + final int newConfigVersion = recoveryHandler.completeConfigurationRecovery(); + if(newConfigVersion != configVersion) + { + updateConfigVersion(newConfigVersion); + } } catch (DatabaseException e) { @@ -438,6 +436,66 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo } + private void updateConfigVersion(int newConfigVersion) throws AMQStoreException + { + Cursor cursor = null; + try + { + Transaction txn = _environment.beginTransaction(null, null); + cursor = _configVersionDb.openCursor(txn, null); + DatabaseEntry key = new DatabaseEntry(); + ByteBinding.byteToEntry((byte) 0,key); + DatabaseEntry value = new DatabaseEntry(); + + while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + IntegerBinding.intToEntry(newConfigVersion, value); + OperationStatus status = cursor.put(key, value); + if (status != OperationStatus.SUCCESS) + { + throw new AMQStoreException("Error setting config version: " + status); + } + } + cursor.close(); + cursor = null; + txn.commit(); + } + finally + { + closeCursorSafely(cursor); + } + + } + + private int getConfigVersion() throws AMQStoreException + { + Cursor cursor = null; + try + { + cursor = _configVersionDb.openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + DatabaseEntry value = new DatabaseEntry(); + while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + return IntegerBinding.entryToInt(value); + } + + // Insert 0 as the default config version + IntegerBinding.intToEntry(0,value); + ByteBinding.byteToEntry((byte) 0,key); + OperationStatus status = _configVersionDb.put(null, key, value); + if (status != OperationStatus.SUCCESS) + { + throw new AMQStoreException("Error initialising config version: " + status); + } + return 0; + } + finally + { + closeCursorSafely(cursor); + } + } + private void loadConfiguredObjects(ConfigurationRecoveryHandler crh) throws DatabaseException { Cursor cursor = null; @@ -750,8 +808,24 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo } } + @Override public void update(UUID id, String type, Map attributes) throws AMQStoreException + { + update(id, type, attributes, null); + } + + public void update(ConfiguredObjectRecord... records) throws AMQStoreException + { + com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null); + for(ConfiguredObjectRecord record : records) + { + update(record.getId(), record.getType(), record.getAttributes(), txn); + } + txn.commit(); + } + + private void update(UUID id, String type, Map attributes, com.sleepycat.je.Transaction txn) throws AMQStoreException { if (LOGGER.isDebugEnabled()) { @@ -768,14 +842,14 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo DatabaseEntry newValue = new DatabaseEntry(); ConfiguredObjectBinding configuredObjectBinding = ConfiguredObjectBinding.getInstance(); - OperationStatus status = _configuredObjectsDb.get(null, key, value, LockMode.DEFAULT); + OperationStatus status = _configuredObjectsDb.get(txn, key, value, LockMode.DEFAULT); if (status == OperationStatus.SUCCESS) { ConfiguredObjectRecord newQueueRecord = new ConfiguredObjectRecord(id, type, attributes); // write the updated entry to the store configuredObjectBinding.objectToEntry(newQueueRecord, newValue); - status = _configuredObjectsDb.put(null, key, newValue); + status = _configuredObjectsDb.put(txn, key, newValue); if (status != OperationStatus.SUCCESS) { throw new AMQStoreException("Error updating queue details within the store: " + status); -- cgit v1.2.1