summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorAlex Rudyy <orudyy@apache.org>2014-06-06 08:26:02 +0000
committerAlex Rudyy <orudyy@apache.org>2014-06-06 08:26:02 +0000
commit53fd008b70676ce1382bec414bcd0d86299a4ced (patch)
tree27c3e38bcc0d4a0551d048e44da19618474bc1e0 /qpid/java
parent60cb3d99e3661103d20cdd7a9d599c62fe2d4b8f (diff)
downloadqpid-python-53fd008b70676ce1382bec414bcd0d86299a4ced.tar.gz
QPID-5715: Fix various issues with configuration upgrader to model 2 and store upgraders to version 8
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1600823 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java6
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java65
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8Test.java26
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java28
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java58
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java272
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java17
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecovererTest.java290
8 files changed, 513 insertions, 249 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index 39bfdbc6bf..9fc90f9d7c 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -25,7 +25,6 @@ import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -233,11 +232,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
child.addParent(hk.getParentType(), parent);
}
- else if(hk.getParentType().equals("Exchange"))
- {
- // TODO - remove this hack for the pre-defined exchanges
- child.addParent(hk.getParentType(), new BDBConfiguredObjectRecord(parentId, "Exchange", Collections.<String,Object>emptyMap()));
- }
}
}
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java
index faccd2fdf4..05841b86ae 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java
@@ -53,6 +53,15 @@ public class UpgradeFrom7To8 extends AbstractStoreUpgrade
{
private static final TypeReference<HashMap<String, Object>> MAP_TYPE_REFERENCE = new TypeReference<HashMap<String,Object>>(){};
+ @SuppressWarnings("serial")
+ private Map<String, String> _defaultExchanges = new HashMap<String, String>()
+ {{
+ put("amq.direct", "direct");
+ put("amq.topic", "topic");
+ put("amq.fanout", "fanout");
+ put("amq.match", "headers");
+ }};
+
@Override
public void performUpgrade(Environment environment, UpgradeInteractionHandler handler, ConfiguredObject<?> parent)
{
@@ -76,10 +85,10 @@ public class UpgradeFrom7To8 extends AbstractStoreUpgrade
}
configVersionDb.close();
+ String virtualHostName = parent.getName();
Map<String, Object> virtualHostAttributes = new HashMap<String, Object>();
virtualHostAttributes.put("modelVersion", stringifiedConfigVersion);
- virtualHostAttributes.put("name", parent.getName());
- String virtualHostName = parent.getName();
+ virtualHostAttributes.put("name", virtualHostName);
UUID virtualHostId = UUIDGenerator.generateVhostUUID(virtualHostName);
ConfiguredObjectRecord virtualHostRecord = new org.apache.qpid.server.store.ConfiguredObjectRecordImpl(virtualHostId, "VirtualHost", virtualHostAttributes);
@@ -90,33 +99,41 @@ public class UpgradeFrom7To8 extends AbstractStoreUpgrade
objectsCursor = configuredObjectsDb.openCursor(txn, null);
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry value = new DatabaseEntry();
+ ObjectMapper mapper = new ObjectMapper();
while (objectsCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
UUID id = UUIDTupleBinding.getInstance().entryToObject(key);
TupleInput input = TupleBinding.entryToInput(value);
String type = input.readString();
+ String json = input.readString();
+ Map<String,Object> attributes = null;
+ try
+ {
+ attributes = mapper.readValue(json, MAP_TYPE_REFERENCE);
+ }
+ catch (Exception e)
+ {
+ throw new StoreException(e);
+ }
+ String name = (String)attributes.get("name");
+
+ if (type.equals("Exchange"))
+ {
+ _defaultExchanges.remove(name);
+ }
if(!type.endsWith("Binding"))
{
- UUIDTupleBinding.getInstance().objectToEntry(virtualHostId, value);
- TupleOutput tupleOutput = new TupleOutput();
- tupleOutput.writeLong(id.getMostSignificantBits());
- tupleOutput.writeLong(id.getLeastSignificantBits());
- tupleOutput.writeString("VirtualHost");
- TupleBinding.outputToEntry(tupleOutput, key);
- hierarchyDb.put(txn, key, value);
+ storeVirtualHostHierarchyRecord(hierarchyDb, txn, id, virtualHostId);
}
else
{
- String json = input.readString();
- ObjectMapper mapper = new ObjectMapper();
try
{
DatabaseEntry hierarchyKey = new DatabaseEntry();
DatabaseEntry hierarchyValue = new DatabaseEntry();
- Map<String,Object> attributes = mapper.readValue(json, MAP_TYPE_REFERENCE);
Object queueIdString = attributes.remove("queue");
if(queueIdString instanceof String)
{
@@ -165,6 +182,17 @@ public class UpgradeFrom7To8 extends AbstractStoreUpgrade
}
storeConfiguredObjectEntry(configuredObjectsDb, txn, virtualHostRecord);
+ for (Map.Entry<String, String> defaultExchangeEntry : _defaultExchanges.entrySet())
+ {
+ UUID id = UUIDGenerator.generateExchangeUUID(defaultExchangeEntry.getKey(), virtualHostName);
+ Map<String, Object> exchangeAttributes = new HashMap<String, Object>();
+ exchangeAttributes.put("name", defaultExchangeEntry.getKey());
+ exchangeAttributes.put("type", defaultExchangeEntry.getValue());
+ exchangeAttributes.put("lifetimePolicy", "PERMANENT");
+ ConfiguredObjectRecord exchangeRecord = new org.apache.qpid.server.store.ConfiguredObjectRecordImpl(id, "Exchange", exchangeAttributes);
+ storeConfiguredObjectEntry(configuredObjectsDb, txn, exchangeRecord);
+ storeVirtualHostHierarchyRecord(hierarchyDb, txn, id, virtualHostId);
+ }
txn.commit();
hierarchyDb.close();
@@ -173,6 +201,19 @@ public class UpgradeFrom7To8 extends AbstractStoreUpgrade
reportFinished(environment, 8);
}
+ void storeVirtualHostHierarchyRecord(Database hierarchyDb, Transaction txn, UUID id, UUID virtualHostId)
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+ UUIDTupleBinding.getInstance().objectToEntry(virtualHostId, value);
+ TupleOutput tupleOutput = new TupleOutput();
+ tupleOutput.writeLong(id.getMostSignificantBits());
+ tupleOutput.writeLong(id.getLeastSignificantBits());
+ tupleOutput.writeString("VirtualHost");
+ TupleBinding.outputToEntry(tupleOutput, key);
+ hierarchyDb.put(txn, key, value);
+ }
+
private int getConfigVersion(Database configVersionDb)
{
Cursor cursor = null;
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8Test.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8Test.java
index 9bd5c96fe6..fc7142e9e4 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8Test.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8Test.java
@@ -70,8 +70,8 @@ public class UpgradeFrom7To8Test extends AbstractUpgradeTestCase
UpgradeFrom7To8 upgrade = new UpgradeFrom7To8();
upgrade.performUpgrade(_environment, UpgradeInteractionHandler.DEFAULT_HANDLER, getVirtualHost());
- assertDatabaseRecordCount(CONFIGURED_OBJECTS_DB_NAME, 7);
- assertDatabaseRecordCount(CONFIGURED_OBJECT_HIERARCHY_DB_NAME, 9);
+ assertDatabaseRecordCount(CONFIGURED_OBJECTS_DB_NAME, 11);
+ assertDatabaseRecordCount(CONFIGURED_OBJECT_HIERARCHY_DB_NAME, 13);
assertConfiguredObjects();
assertConfiguredObjectHierarchy();
@@ -81,7 +81,7 @@ public class UpgradeFrom7To8Test extends AbstractUpgradeTestCase
private void assertConfiguredObjectHierarchy()
{
Map<UpgradeHierarchyKey, UUID> hierarchy = loadConfiguredObjectHierarchy();
- assertEquals("Unexpected number of configured objects", 9, hierarchy.size());
+ assertEquals("Unexpected number of configured objects", 13, hierarchy.size());
UUID vhUuid = UUIDGenerator.generateVhostUUID(getVirtualHost().getName());
UUID myExchUuid = UUIDGenerator.generateExchangeUUID("myexch", getVirtualHost().getName());
@@ -97,16 +97,21 @@ public class UpgradeFrom7To8Test extends AbstractUpgradeTestCase
UpgradeHierarchyKey queue1ToVhParent = new UpgradeHierarchyKey(queue1Uuid, VirtualHost.class.getSimpleName());
assertExpectedHierarchyEntry(hierarchy, queue1ToVhParent, vhUuid);
- // ! amq.direct -> virtualhost (This will change when the upgrader is changed to create the default exchanges)
- UpgradeHierarchyKey amqDirectToVhParent = new UpgradeHierarchyKey(amqDirectUuid, VirtualHost.class.getSimpleName());
- assertFalse("amq.direct should not have a binding to virtualhost", hierarchy.containsKey(amqDirectToVhParent));
-
// queue1binding -> amq.direct
// queue1binding -> queue1
UpgradeHierarchyKey queue1BindingToAmqDirect = new UpgradeHierarchyKey(queue1ToAmqDirectBindingUuid, Exchange.class.getSimpleName());
UpgradeHierarchyKey queue1BindingToQueue1 = new UpgradeHierarchyKey(queue1ToAmqDirectBindingUuid, Queue.class.getSimpleName());
assertExpectedHierarchyEntry(hierarchy, queue1BindingToAmqDirect, amqDirectUuid);
assertExpectedHierarchyEntry(hierarchy, queue1BindingToQueue1, queue1Uuid);
+
+ String[] defaultExchanges = {"amq.topic", "amq.fanout", "amq.direct", "amq.match"};
+ for (String exchangeName : defaultExchanges)
+ {
+ UUID id = UUIDGenerator.generateExchangeUUID(exchangeName, getVirtualHost().getName());
+ UpgradeHierarchyKey exchangeParent = new UpgradeHierarchyKey(id, VirtualHost.class.getSimpleName());
+ assertExpectedHierarchyEntry(hierarchy, exchangeParent, vhUuid);
+ }
+
}
private void assertExpectedHierarchyEntry(
@@ -121,7 +126,7 @@ public class UpgradeFrom7To8Test extends AbstractUpgradeTestCase
private void assertConfiguredObjects()
{
Map<UUID, UpgradeConfiguredObjectRecord> configuredObjects = loadConfiguredObjects();
- assertEquals("Unexpected number of configured objects", 7, configuredObjects.size());
+ assertEquals("Unexpected number of configured objects", 11, configuredObjects.size());
Map<UUID, Map<String, Object>> expected = new HashMap<UUID, Map<String, Object>>();
@@ -137,6 +142,11 @@ public class UpgradeFrom7To8Test extends AbstractUpgradeTestCase
expected.putAll(createExpectedBindingMap("queue1", "queue1", "myexch", null));
expected.putAll(createExpectedBindingMap("queue2", "queue2", "amq.fanout", null));
+ expected.putAll(createExpectedExchangeMap("amq.direct", "direct"));
+ expected.putAll(createExpectedExchangeMap("amq.fanout", "fanout"));
+ expected.putAll(createExpectedExchangeMap("amq.match", "headers"));
+ expected.putAll(createExpectedExchangeMap("amq.topic", "topic"));
+
MapJsonSerializer jsonSerializer = new MapJsonSerializer();
for (Entry<UUID, UpgradeConfiguredObjectRecord> entry : configuredObjects.entrySet())
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java
index 6d95c5d3a2..14c9e7d3e0 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java
@@ -632,8 +632,8 @@ public class MemoryConfigurationEntryStore implements ConfigurationEntryStore
{
Map<String, Class<? extends ConfiguredObject>> relationships = new HashMap<String, Class<? extends ConfiguredObject>>();
- Collection<Class<? extends ConfiguredObject>> children = _parent.getModel().getChildTypes(Broker.class);
- for (Class<? extends ConfiguredObject> childClass : children)
+ Collection<Class<? extends ConfiguredObject>> categories = _parent.getModel().getSupportedCategories();
+ for (Class<? extends ConfiguredObject> childClass : categories)
{
String name = childClass.getSimpleName().toLowerCase();
String relationshipName = name + (name.endsWith("s") ? "es" : "s");
@@ -812,29 +812,7 @@ public class MemoryConfigurationEntryStore implements ConfigurationEntryStore
private Class<? extends ConfiguredObject> findExpectedChildConfiguredObjectClass(String parentFieldName,
Class<? extends ConfiguredObject> parentConfiguredObjectClass)
{
- if (parentConfiguredObjectClass == Broker.class)
- {
- return _brokerChildrenRelationshipMap.get(parentFieldName);
- }
-
- // for non-broker parent classes
- // try to determine the child class from the model by iterating through the children classes
- // for the parent configured object class
- if (parentConfiguredObjectClass != null)
- {
- Collection<Class<? extends ConfiguredObject>> childTypes = _parent.getModel().getChildTypes(parentConfiguredObjectClass);
- for (Class<? extends ConfiguredObject> childType : childTypes)
- {
- String relationship = childType.getSimpleName().toLowerCase();
- relationship += relationship.endsWith("s") ? "es": "s";
- if (parentFieldName.equals(relationship))
- {
- return childType;
- }
- }
- }
-
- return null;
+ return _brokerChildrenRelationshipMap.get(parentFieldName);
}
private Object toObject(JsonNode node)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index 2f83b34692..e0c1f77d2b 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -306,11 +306,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
{
child.addParent(parentType, parent);
}
- else if(child != null && child.getType().endsWith("Binding") && parentType.equals("Exchange"))
- {
- // TODO - remove this hack for amq. exchanges
- child.addParent(parentType, new ConfiguredObjectRecordImpl(parentId, parentType, Collections.<String,Object>emptyMap()));
- }
}
}
finally
@@ -460,10 +455,20 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
private void upgradeFromV7(ConfiguredObject<?> parent) throws SQLException
{
+ @SuppressWarnings("serial")
+ Map<String, String> defaultExchanges = new HashMap<String, String>()
+ {{
+ put("amq.direct", "direct");
+ put("amq.topic", "topic");
+ put("amq.fanout", "fanout");
+ put("amq.match", "headers");
+ }};
+
Connection connection = newConnection();
try
{
- UUID virtualHostId = UUIDGenerator.generateVhostUUID(parent.getName());
+ String virtualHostName = parent.getName();
+ UUID virtualHostId = UUIDGenerator.generateVhostUUID(virtualHostName);
String stringifiedConfigVersion = BrokerModel.MODEL_VERSION;
boolean tableExists = tableExists(CONFIGURATION_VERSION_TABLE_NAME, connection);
@@ -480,9 +485,10 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
Map<String, Object> virtualHostAttributes = new HashMap<String, Object>();
virtualHostAttributes.put("modelVersion", stringifiedConfigVersion);
+ virtualHostAttributes.put("name", virtualHostName);
- ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecordImpl(virtualHostId, "VirtualHost", virtualHostAttributes);
- insertConfiguredObject(configuredObject, connection);
+ ConfiguredObjectRecord virtualHostRecord = new ConfiguredObjectRecordImpl(virtualHostId, "VirtualHost", virtualHostAttributes);
+ insertConfiguredObject(virtualHostRecord, connection);
if (getLogger().isDebugEnabled())
{
@@ -504,13 +510,22 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
{
UUID id = UUID.fromString(rs.getString(1));
String objectType = rs.getString(2);
+ if ("VirtualHost".equals(objectType))
+ {
+ continue;
+ }
Map<String,Object> attributes = objectMapper.readValue(getBlobAsString(rs, 3),Map.class);
+
if(objectType.endsWith("Binding"))
{
bindingsToUpdate.put(id,attributes);
}
else
{
+ if (objectType.equals("Exchange"))
+ {
+ defaultExchanges.remove((String)attributes.get("name"));
+ }
others.add(id);
}
}
@@ -564,6 +579,19 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
{
stmt.close();
}
+
+ for (Map.Entry<String, String> defaultExchangeEntry : defaultExchanges.entrySet())
+ {
+ UUID id = UUIDGenerator.generateExchangeUUID(defaultExchangeEntry.getKey(), virtualHostName);
+ Map<String, Object> exchangeAttributes = new HashMap<String, Object>();
+ exchangeAttributes.put("name", defaultExchangeEntry.getKey());
+ exchangeAttributes.put("type", defaultExchangeEntry.getValue());
+ exchangeAttributes.put("lifetimePolicy", "PERMANENT");
+ Map<String, ConfiguredObjectRecord> parents = Collections.singletonMap("VirtualHost", virtualHostRecord);
+ ConfiguredObjectRecord exchangeRecord = new org.apache.qpid.server.store.ConfiguredObjectRecordImpl(id, "Exchange", exchangeAttributes, parents);
+ insertConfiguredObject(exchangeRecord, connection);
+ }
+
stmt = connection.prepareStatement(UPDATE_CONFIGURED_OBJECTS);
try
{
@@ -604,12 +632,24 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
{
stmt.close();
}
- connection.commit();
if (tableExists)
{
dropConfigVersionTable(connection);
}
+
+ connection.commit();
+ }
+ catch(SQLException e)
+ {
+ try
+ {
+ connection.rollback();
+ }
+ catch(SQLException re)
+ {
+ }
+ throw e;
}
finally
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java
index 7e49e0f6d3..7fff9151d9 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java
@@ -25,7 +25,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.store.StoreConfigurationChangeListener;
@@ -48,7 +47,7 @@ public class BrokerStoreUpgraderAndRecoverer
register(new Upgrader_1_0_to_1_1());
register(new Upgrader_1_1_to_1_2());
register(new Upgrader_1_2_to_1_3());
- register(new Upgrader_1_3_to_1_4());
+ register(new Upgrader_1_3_to_2_0());
}
private void register(StoreUpgraderPhase upgrader)
@@ -159,20 +158,16 @@ public class BrokerStoreUpgraderAndRecoverer
}
- private static final class Upgrader_1_3_to_1_4 extends StoreUpgraderPhase
+ private static final class Upgrader_1_3_to_2_0 extends StoreUpgraderPhase
{
- private Upgrader_1_3_to_1_4()
+ private final VirtualHostEntryUpgrader _virtualHostUpgrader;
+
+ private Upgrader_1_3_to_2_0()
{
- super("modelVersion", "1.3", "1.4");
+ super("modelVersion", "1.3", "2.0");
+ _virtualHostUpgrader = new VirtualHostEntryUpgrader();
}
- @SuppressWarnings("serial")
- private Map<String, VirtualHostEntryUpgrader> _vhostUpgraderMap = new HashMap<String, VirtualHostEntryUpgrader>()
- {{
- put("BDB_HA", new BdbHaVirtualHostUpgrader());
- put("STANDARD", new StandardVirtualHostUpgrader());
- }};
-
@Override
public void configuredObject(ConfiguredObjectRecord record)
{
@@ -184,13 +179,7 @@ public class BrokerStoreUpgraderAndRecoverer
throw new IllegalConfigurationException("Auto-upgrade of virtual host " + attributes.get("name") + " having XML configuration is not supported. Virtual host configuration file is " + attributes.get("configPath"));
}
- String type = (String) attributes.get("type");
- VirtualHostEntryUpgrader vhostUpgrader = _vhostUpgraderMap.get(type);
- if (vhostUpgrader == null)
- {
- throw new IllegalConfigurationException("Don't know how to perform an upgrade from version for virtualhost type " + type);
- }
- record = vhostUpgrader.upgrade(record);
+ record = _virtualHostUpgrader.upgrade(record);
getUpdateMap().put(record.getId(), record);
}
else if (record.getType().equals("Plugin") && record.getAttributes().containsKey("pluginType"))
@@ -218,143 +207,119 @@ public class BrokerStoreUpgraderAndRecoverer
}
- private static interface VirtualHostEntryUpgrader
- {
- ConfiguredObjectRecord upgrade(ConfiguredObjectRecord vhost);
- }
-
- private static class StandardVirtualHostUpgrader implements VirtualHostEntryUpgrader
+ private static class VirtualHostEntryUpgrader
{
@SuppressWarnings("serial")
- Map<String, AttributesTransformer> _messageStoreAttributeTransformers = new HashMap<String, AttributesTransformer>()
+ Map<String, AttributesTransformer> _messageStoreToNodeTransformers = new HashMap<String, AttributesTransformer>()
{{
put("DERBY", new AttributesTransformer().
+ addAttributeTransformer("id", copyAttribute()).
+ addAttributeTransformer("name", copyAttribute()).
+ addAttributeTransformer("createdTime", copyAttribute()).
+ addAttributeTransformer("createdBy", copyAttribute()).
addAttributeTransformer("storePath", copyAttribute()).
addAttributeTransformer("storeUnderfullSize", copyAttribute()).
- addAttributeTransformer("storeOverfullSize", copyAttribute()).
- addAttributeTransformer("storeType", mutateAttributeValue("DERBY")));
- put("MEMORY", new AttributesTransformer().
- addAttributeTransformer("storeType", mutateAttributeValue("Memory")));
+ addAttributeTransformer("storeOverfullSize", copyAttribute()));
+ put("Memory", new AttributesTransformer().
+ addAttributeTransformer("id", copyAttribute()).
+ addAttributeTransformer("name", copyAttribute()).
+ addAttributeTransformer("createdTime", copyAttribute()).
+ addAttributeTransformer("createdBy", copyAttribute()));
put("BDB", new AttributesTransformer().
+ addAttributeTransformer("id", copyAttribute()).
+ addAttributeTransformer("name", copyAttribute()).
+ addAttributeTransformer("createdTime", copyAttribute()).
+ addAttributeTransformer("createdBy", copyAttribute()).
addAttributeTransformer("storePath", copyAttribute()).
addAttributeTransformer("storeUnderfullSize", copyAttribute()).
addAttributeTransformer("storeOverfullSize", copyAttribute()).
- addAttributeTransformer("bdbEnvironmentConfig", copyAttribute()).
- addAttributeTransformer("storeType", mutateAttributeValue("BDB")));
+ addAttributeTransformer("bdbEnvironmentConfig", mutateAttributeName("environmentConfiguration")));
put("JDBC", new AttributesTransformer().
+ addAttributeTransformer("id", copyAttribute()).
+ addAttributeTransformer("name", copyAttribute()).
+ addAttributeTransformer("createdTime", copyAttribute()).
+ addAttributeTransformer("createdBy", copyAttribute()).
addAttributeTransformer("storePath", mutateAttributeName("connectionURL")).
- addAttributeTransformer("connectionURL", copyAttribute()).
- addAttributeTransformer("connectionPool", copyAttribute()).
- addAttributeTransformer("jdbcBigIntType", copyAttribute()).
- addAttributeTransformer("jdbcBytesForBlob", copyAttribute()).
- addAttributeTransformer("jdbcBlobType", copyAttribute()).
- addAttributeTransformer("jdbcVarbinaryType", copyAttribute()).
+ addAttributeTransformer("connectionURL", mutateAttributeName("connectionUrl")).
+ addAttributeTransformer("connectionPool", mutateAttributeName("connectionPoolType")).
+ addAttributeTransformer("jdbcBigIntType", mutateAttributeName("bigIntType")).
+ addAttributeTransformer("jdbcBytesForBlob", mutateAttributeName("bytesForBlob")).
+ addAttributeTransformer("jdbcBlobType", mutateAttributeName("blobType")).
+ addAttributeTransformer("jdbcVarbinaryType", mutateAttributeName("varbinaryType")).
addAttributeTransformer("partitionCount", copyAttribute()).
addAttributeTransformer("maxConnectionsPerPartition", copyAttribute()).
- addAttributeTransformer("minConnectionsPerPartition", copyAttribute()).
- addAttributeTransformer("storeType", mutateAttributeValue("JDBC")));
- }};
-
- @SuppressWarnings("serial")
- Map<String, AttributesTransformer> _configurationStoreAttributeTransformers = new HashMap<String, AttributesTransformer>()
- {{
- put("DERBY", new AttributesTransformer().
- addAttributeTransformer("configStorePath", mutateAttributeName("storePath")).
- addAttributeTransformer("configStoreType", mutateAttributeName("storeType"), mutateAttributeValue("DERBY")));
- put("MEMORY", new AttributesTransformer().
- addAttributeTransformer("configStoreType", mutateAttributeValue("Memory")));
- put("JSON", new AttributesTransformer().
- addAttributeTransformer("configStorePath", mutateAttributeName("storePath")).
- addAttributeTransformer("configStoreType", mutateAttributeName("storeType"), mutateAttributeValue("JSON")));
- put("BDB", new AttributesTransformer().
- addAttributeTransformer("configStorePath", mutateAttributeName("storePath")).
- addAttributeTransformer("bdbEnvironmentConfig", copyAttribute()).
- addAttributeTransformer("configStoreType", mutateAttributeName("storeType"), mutateAttributeValue("BDB")));
- put("JDBC", new AttributesTransformer().
- addAttributeTransformer("configStorePath", mutateAttributeName("connectionURL")).
- addAttributeTransformer("configConnectionURL", mutateAttributeName("connectionURL")).
- addAttributeTransformer("connectionPool", copyAttribute()).
- addAttributeTransformer("jdbcBigIntType", copyAttribute()).
- addAttributeTransformer("jdbcBytesForBlob", copyAttribute()).
- addAttributeTransformer("jdbcBlobType", copyAttribute()).
- addAttributeTransformer("jdbcVarbinaryType", copyAttribute()).
- addAttributeTransformer("partitionCount", copyAttribute()).
- addAttributeTransformer("maxConnectionsPerPartition", copyAttribute()).
- addAttributeTransformer("minConnectionsPerPartition", copyAttribute()).
- addAttributeTransformer("configStoreType", mutateAttributeName("storeType"), mutateAttributeValue("JDBC")));
+ addAttributeTransformer("minConnectionsPerPartition", copyAttribute()));
+ put("BDB_HA", new AttributesTransformer().
+ addAttributeTransformer("id", copyAttribute()).
+ addAttributeTransformer("createdTime", copyAttribute()).
+ addAttributeTransformer("createdBy", copyAttribute()).
+ addAttributeTransformer("storePath", copyAttribute()).
+ addAttributeTransformer("storeUnderfullSize", copyAttribute()).
+ addAttributeTransformer("storeOverfullSize", copyAttribute()).
+ addAttributeTransformer("haNodeName", mutateAttributeName("name")).
+ addAttributeTransformer("haGroupName", mutateAttributeName("groupName")).
+ addAttributeTransformer("haHelperAddress", mutateAttributeName("helperAddress")).
+ addAttributeTransformer("haNodeAddress", mutateAttributeName("address")).
+ addAttributeTransformer("haDesignatedPrimary", mutateAttributeName("designatedPrimary")).
+ addAttributeTransformer("haReplicationConfig", mutateAttributeName("replicatedEnvironmentConfiguration")).
+ addAttributeTransformer("bdbEnvironmentConfig", mutateAttributeName("environmentConfiguration")));
}};
- @Override
public ConfiguredObjectRecord upgrade(ConfiguredObjectRecord vhost)
{
Map<String, Object> attributes = vhost.getAttributes();
- Map<String, Object> newAttributes = new HashMap<String, Object>(attributes);
-
- String capitalisedStoreType = String.valueOf(attributes.get("storeType")).toUpperCase();
- AttributesTransformer vhAttrsToMessageStoreSettings = _messageStoreAttributeTransformers.get(capitalisedStoreType);
- Map<String, Object> messageStoreSettings = null;
- if (vhAttrsToMessageStoreSettings != null)
+ String type = (String) attributes.get("type");
+ AttributesTransformer nodeAttributeTransformer = null;
+ if ("STANDARD".equalsIgnoreCase(type))
{
- messageStoreSettings = vhAttrsToMessageStoreSettings.upgrade(attributes);
+ if (attributes.containsKey("configStoreType"))
+ {
+ throw new IllegalConfigurationException("Auto-upgrade of virtual host " + attributes.get("name")
+ + " with split configuration and message store is not supported."
+ + " Configuration store type is " + attributes.get("configStoreType") + " and message store type is "
+ + attributes.get("storeType"));
+ }
+ else
+ {
+ type = (String) attributes.get("storeType");
+ }
}
- if (attributes.containsKey("configStoreType"))
+ if (type == null)
{
- String capitaliseConfigStoreType = ((String) attributes.get("configStoreType")).toUpperCase();
- AttributesTransformer vhAttrsToConfigurationStoreSettings = _configurationStoreAttributeTransformers
- .get(capitaliseConfigStoreType);
- Map<String, Object> configurationStoreSettings = vhAttrsToConfigurationStoreSettings.upgrade(attributes);
- newAttributes.keySet().removeAll(vhAttrsToConfigurationStoreSettings.getNamesToBeDeleted());
- newAttributes.put("configurationStoreSettings", configurationStoreSettings);
+ throw new IllegalConfigurationException("Cannot auto-upgrade virtual host with attributes: " + attributes);
}
- if (vhAttrsToMessageStoreSettings != null)
+ type = getVirtualHostNodeType(type);
+ nodeAttributeTransformer = _messageStoreToNodeTransformers.get(type);
+
+ if (nodeAttributeTransformer == null)
{
- newAttributes.keySet().removeAll(vhAttrsToMessageStoreSettings.getNamesToBeDeleted());
- newAttributes.put("messageStoreSettings", messageStoreSettings);
+ throw new IllegalConfigurationException("Don't know how to perform an upgrade from version for virtualhost type " + type);
}
- return new ConfiguredObjectRecordImpl(vhost.getId(), vhost.getType(), newAttributes, vhost.getParents());
+ Map<String, Object> nodeAttributes = nodeAttributeTransformer.upgrade(attributes);
+ nodeAttributes.put("type", type);
+ nodeAttributes.put("messageStoreProvider", true);
+ return new ConfiguredObjectRecordImpl(vhost.getId(), "VirtualHostNode", nodeAttributes, vhost.getParents());
}
- }
-
- private static class BdbHaVirtualHostUpgrader implements VirtualHostEntryUpgrader
- {
- private final AttributesTransformer haAttributesTransformer = new AttributesTransformer().
- addAttributeTransformer("storePath", copyAttribute()).
- addAttributeTransformer("storeUnderfullSize", copyAttribute()).
- addAttributeTransformer("storeOverfullSize", copyAttribute()).
- addAttributeTransformer("haNodeName", copyAttribute()).
- addAttributeTransformer("haGroupName", copyAttribute()).
- addAttributeTransformer("haHelperAddress", copyAttribute()).
- addAttributeTransformer("haCoalescingSync", copyAttribute()).
- addAttributeTransformer("haNodeAddress", copyAttribute()).
- addAttributeTransformer("haDurability", copyAttribute()).
- addAttributeTransformer("haDesignatedPrimary", copyAttribute()).
- addAttributeTransformer("haReplicationConfig", copyAttribute()).
- addAttributeTransformer("bdbEnvironmentConfig", copyAttribute()).
- addAttributeTransformer("storeType", removeAttribute());
-
- @Override
- public ConfiguredObjectRecord upgrade(ConfiguredObjectRecord vhost)
+ private String getVirtualHostNodeType(String type)
{
- Map<String, Object> attributes = vhost.getAttributes();
-
- Map<String, Object> messageStoreSettings = haAttributesTransformer.upgrade(attributes);
-
- Map<String, Object> newAttributes = new HashMap<String, Object>(attributes);
- newAttributes.keySet().removeAll(haAttributesTransformer.getNamesToBeDeleted());
- newAttributes.put("messageStoreSettings", messageStoreSettings);
-
- return new ConfiguredObjectRecordImpl(vhost.getId(), vhost.getType(), newAttributes, vhost.getParents());
+ for (String t : _messageStoreToNodeTransformers.keySet())
+ {
+ if (type.equalsIgnoreCase(t))
+ {
+ return t;
+ }
+ }
+ return null;
}
}
private static class AttributesTransformer
{
private final Map<String, List<AttributeTransformer>> _transformers = new HashMap<String, List<AttributeTransformer>>();
- private Set<String> _namesToBeDeleted = new HashSet<String>();
public AttributesTransformer addAttributeTransformer(String string, AttributeTransformer... attributeTransformers)
{
@@ -386,17 +351,10 @@ public class BrokerStoreUpgraderAndRecoverer
{
settings.put(newEntry.getKey(), newEntry.getValue());
}
-
- _namesToBeDeleted.add(attributeName);
}
}
return settings;
}
-
- public Set<String> getNamesToBeDeleted()
- {
- return _namesToBeDeleted;
- }
}
private static AttributeTransformer copyAttribute()
@@ -404,16 +362,6 @@ public class BrokerStoreUpgraderAndRecoverer
return CopyAttribute.INSTANCE;
}
- private static AttributeTransformer removeAttribute()
- {
- return RemoveAttribute.INSTANCE;
- }
-
- private static AttributeTransformer mutateAttributeValue(Object newValue)
- {
- return new MutateAttributeValue(newValue);
- }
-
private static AttributeTransformer mutateAttributeName(String newName)
{
return new MutateAttributeName(newName);
@@ -438,22 +386,6 @@ public class BrokerStoreUpgraderAndRecoverer
return entry;
}
}
-
- private static class RemoveAttribute implements AttributeTransformer
- {
- private static final RemoveAttribute INSTANCE = new RemoveAttribute();
-
- private RemoveAttribute()
- {
- }
-
- @Override
- public MutableEntry transform(MutableEntry entry)
- {
- return null;
- }
- }
-
private static class MutateAttributeName implements AttributeTransformer
{
private final String _newName;
@@ -471,23 +403,6 @@ public class BrokerStoreUpgraderAndRecoverer
}
}
- private static class MutateAttributeValue implements AttributeTransformer
- {
- private final Object _newValue;
-
- public MutateAttributeValue(Object newValue)
- {
- _newValue = newValue;
- }
-
- @Override
- public MutableEntry transform(MutableEntry entry)
- {
- entry.setValue(_newValue);
- return entry;
- }
- }
-
private static class MutableEntry
{
private String _key;
@@ -513,20 +428,12 @@ public class BrokerStoreUpgraderAndRecoverer
{
return _value;
}
-
- public void setValue(Object value)
- {
- _value = value;
- }
}
public Broker<?> perform(final DurableConfigurationStore store)
{
- final String brokerCategory = Broker.class.getSimpleName();
- final GenericStoreUpgrader upgrader = new GenericStoreUpgrader(brokerCategory, Broker.MODEL_VERSION, store, _upgraders);
- upgrader.upgrade();
-
- new GenericRecoverer(_systemContext, brokerCategory).recover(upgrader.getRecords());
+ List<ConfiguredObjectRecord> upgradedRecords = upgrade(store);
+ new GenericRecoverer(_systemContext, Broker.class.getSimpleName()).recover(upgradedRecords);
final StoreConfigurationChangeListener configChangeListener = new StoreConfigurationChangeListener(store);
applyRecursively(_systemContext.getBroker(), new Action<ConfiguredObject<?>>()
@@ -541,6 +448,13 @@ public class BrokerStoreUpgraderAndRecoverer
return _systemContext.getBroker();
}
+ List<ConfiguredObjectRecord> upgrade(final DurableConfigurationStore store)
+ {
+ GenericStoreUpgrader upgrader = new GenericStoreUpgrader(Broker.class.getSimpleName(), Broker.MODEL_VERSION, store, _upgraders);
+ upgrader.upgrade();
+ return upgrader.getRecords();
+ }
+
private void applyRecursively(final ConfiguredObject<?> object, final Action<ConfiguredObject<?>> action)
{
applyRecursively(object, action, new HashSet<ConfiguredObject<?>>());
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
index e1772c037a..a88d647ba7 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
@@ -60,7 +60,7 @@ public class VirtualHostStoreUpgraderAndRecoverer
register(new Upgrader_0_1_to_0_2());
register(new Upgrader_0_2_to_0_3());
register(new Upgrader_0_3_to_0_4());
- register(new Upgrader_0_4_to_0_5());
+ register(new Upgrader_0_4_to_2_0());
Map<String, UUID> defaultExchangeIds = new HashMap<String, UUID>();
for (String exchangeName : DEFAULT_EXCHANGES.keySet())
@@ -344,15 +344,12 @@ public class VirtualHostStoreUpgraderAndRecoverer
}
- private class Upgrader_0_4_to_0_5 extends StoreUpgraderPhase
+ private class Upgrader_0_4_to_2_0 extends StoreUpgraderPhase
{
private Map<String, String> _missingAmqpExchanges = new HashMap<String, String>(DEFAULT_EXCHANGES);
- private static final String EXCHANGE_NAME = "name";
- private static final String EXCHANGE_TYPE = "type";
- private static final String EXCHANGE_DURABLE = "durable";
private ConfiguredObjectRecord _virtualHostRecord;
- public Upgrader_0_4_to_0_5()
+ public Upgrader_0_4_to_2_0()
{
super("modelVersion", "0.4", "2.0");
}
@@ -372,7 +369,7 @@ public class VirtualHostStoreUpgraderAndRecoverer
else if("Exchange".equals(record.getType()))
{
Map<String, Object> attributes = record.getAttributes();
- String name = (String)attributes.get(EXCHANGE_NAME);
+ String name = (String)attributes.get("name");
_missingAmqpExchanges.remove(name);
}
getNextUpgrader().configuredObject(record);
@@ -388,9 +385,9 @@ public class VirtualHostStoreUpgraderAndRecoverer
UUID id = _defaultExchangeIds.get(name);
Map<String, Object> attributes = new HashMap<String, Object>();
- attributes.put(EXCHANGE_NAME, name);
- attributes.put(EXCHANGE_TYPE, type);
- attributes.put(EXCHANGE_DURABLE, true);
+ attributes.put("name", name);
+ attributes.put("type", type);
+ attributes.put("lifetimePolicy", "PERMANENT");
ConfiguredObjectRecord record = new ConfiguredObjectRecordImpl(id, Exchange.class.getSimpleName(), attributes, Collections.singletonMap(_virtualHostRecord.getType(), _virtualHostRecord));
getUpdateMap().put(id, record);
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecovererTest.java
new file mode 100644
index 0000000000..006c96a92a
--- /dev/null
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecovererTest.java
@@ -0,0 +1,290 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import static org.mockito.Mockito.mock;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.server.BrokerOptions;
+import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.LogRecorder;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.SystemContext;
+import org.apache.qpid.server.model.SystemContextImpl;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+
+public class BrokerStoreUpgraderAndRecovererTest extends QpidTestCase
+{
+ private ConfiguredObjectRecord _brokerRecord;
+ private CurrentThreadTaskExecutor _taskExecutor;
+ private SystemContext<?> _systemContext;
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ Map<String, Object> brokerAttributes = new HashMap<>();
+ brokerAttributes.put("createdTime", 1401385808828l);
+ brokerAttributes.put("defaultVirtualHost", "test");
+ brokerAttributes.put("modelVersion", "1.3");
+ brokerAttributes.put("name", "Broker");
+
+ _brokerRecord = new ConfiguredObjectRecordImpl(UUID.randomUUID(), "Broker", brokerAttributes);
+ _taskExecutor = new CurrentThreadTaskExecutor();
+ _taskExecutor.start();
+ _systemContext = new SystemContextImpl(_taskExecutor,
+ mock(EventLogger.class),
+ mock(LogRecorder.class),
+ mock(BrokerOptions.class));
+ }
+
+ public void testUpgradeVirtualHostWithJDBCStore()
+ {
+ Map<String, Object> hostAttributes = new HashMap<>();
+ hostAttributes.put("name", "test");
+ hostAttributes.put("modelVersion", "0.4");
+ hostAttributes.put("connectionPool", "BONECP");
+ hostAttributes.put("connectionURL", "jdbc:derby://localhost:1527/tmp/vh/test;create=true");
+ hostAttributes.put("createdBy", "webadmin");
+ hostAttributes.put("createdTime", 1401385905260l);
+ hostAttributes.put("maxConnectionsPerPartition", 7);
+ hostAttributes.put("minConnectionsPerPartition", 6);
+ hostAttributes.put("partitionCount", 2);
+ hostAttributes.put("storeType", "jdbc");
+ hostAttributes.put("type", "STANDARD");
+
+ ConfiguredObjectRecord virtualHostRecord = new ConfiguredObjectRecordImpl(UUID.randomUUID(), "VirtualHost",
+ hostAttributes, Collections.<String,ConfiguredObjectRecord>singletonMap("Broker", _brokerRecord));
+ DurableConfigurationStore dcs = new DurableConfigurationStoreStub(_brokerRecord, virtualHostRecord);
+
+ BrokerStoreUpgraderAndRecoverer recoverer = new BrokerStoreUpgraderAndRecoverer(_systemContext);
+ List<ConfiguredObjectRecord> records = recoverer.upgrade(dcs);
+
+ ConfiguredObjectRecord upgradedVirtualHostNodeRecord = findRecordById(virtualHostRecord.getId(), records);
+ assertEquals("Unexpected type", "VirtualHostNode", upgradedVirtualHostNodeRecord.getType());
+ Map<String,Object> expectedAttributes = new HashMap<>();
+ expectedAttributes.put("connectionPoolType", "BONECP");
+ expectedAttributes.put("connectionUrl", "jdbc:derby://localhost:1527/tmp/vh/test;create=true");
+ expectedAttributes.put("createdBy", "webadmin");
+ expectedAttributes.put("createdTime", 1401385905260l);
+ expectedAttributes.put("maxConnectionsPerPartition", 7);
+ expectedAttributes.put("minConnectionsPerPartition", 6);
+ expectedAttributes.put("partitionCount", 2);
+ expectedAttributes.put("name", "test");
+ expectedAttributes.put("type", "JDBC");
+ expectedAttributes.put("messageStoreProvider", true);
+ assertEquals("Unexpected attributes", expectedAttributes, upgradedVirtualHostNodeRecord.getAttributes());
+ }
+
+ public void testUpgradeVirtualHostWithDerbyStore()
+ {
+ Map<String, Object> hostAttributes = new HashMap<>();
+ hostAttributes.put("name", "test");
+ hostAttributes.put("modelVersion", "0.4");
+ hostAttributes.put("storePath", "/tmp/vh/derby");
+ hostAttributes.put("storeType", "derby");
+ hostAttributes.put("createdBy", "webadmin");
+ hostAttributes.put("createdTime", 1401385905260l);
+ hostAttributes.put("type", "STANDARD");
+
+ ConfiguredObjectRecord virtualHostRecord = new ConfiguredObjectRecordImpl(UUID.randomUUID(), "VirtualHost",
+ hostAttributes, Collections.<String,ConfiguredObjectRecord>singletonMap("Broker", _brokerRecord));
+ DurableConfigurationStore dcs = new DurableConfigurationStoreStub(_brokerRecord, virtualHostRecord);
+
+ BrokerStoreUpgraderAndRecoverer recoverer = new BrokerStoreUpgraderAndRecoverer(_systemContext);
+ List<ConfiguredObjectRecord> records = recoverer.upgrade(dcs);
+
+ ConfiguredObjectRecord upgradedVirtualHostNodeRecord = findRecordById(virtualHostRecord.getId(), records);
+ assertEquals("Unexpected type", "VirtualHostNode", upgradedVirtualHostNodeRecord.getType());
+ Map<String,Object> expectedAttributes = new HashMap<>();
+ expectedAttributes.put("storePath", "/tmp/vh/derby");
+ expectedAttributes.put("createdBy", "webadmin");
+ expectedAttributes.put("createdTime", 1401385905260l);
+ expectedAttributes.put("name", "test");
+ expectedAttributes.put("type", "DERBY");
+ expectedAttributes.put("messageStoreProvider", true);
+ assertEquals("Unexpected attributes", expectedAttributes, upgradedVirtualHostNodeRecord.getAttributes());
+ }
+
+ public void testUpgradeVirtualHostWithBDBStore()
+ {
+ Map<String, Object> hostAttributes = new HashMap<>();
+ hostAttributes.put("name", "test");
+ hostAttributes.put("modelVersion", "0.4");
+ hostAttributes.put("storePath", "/tmp/vh/bdb");
+ hostAttributes.put("storeType", "bdb");
+ hostAttributes.put("createdBy", "webadmin");
+ hostAttributes.put("createdTime", 1401385905260l);
+ hostAttributes.put("type", "STANDARD");
+
+ ConfiguredObjectRecord virtualHostRecord = new ConfiguredObjectRecordImpl(UUID.randomUUID(), "VirtualHost",
+ hostAttributes, Collections.<String,ConfiguredObjectRecord>singletonMap("Broker", _brokerRecord));
+ DurableConfigurationStore dcs = new DurableConfigurationStoreStub(_brokerRecord, virtualHostRecord);
+
+ BrokerStoreUpgraderAndRecoverer recoverer = new BrokerStoreUpgraderAndRecoverer(_systemContext);
+ List<ConfiguredObjectRecord> records = recoverer.upgrade(dcs);
+
+ ConfiguredObjectRecord upgradedVirtualHostNodeRecord = findRecordById(virtualHostRecord.getId(), records);
+ assertEquals("Unexpected type", "VirtualHostNode", upgradedVirtualHostNodeRecord.getType());
+ Map<String,Object> expectedAttributes = new HashMap<>();
+ expectedAttributes.put("storePath", "/tmp/vh/bdb");
+ expectedAttributes.put("createdBy", "webadmin");
+ expectedAttributes.put("createdTime", 1401385905260l);
+ expectedAttributes.put("name", "test");
+ expectedAttributes.put("type", "BDB");
+ expectedAttributes.put("messageStoreProvider", true);
+ assertEquals("Unexpected attributes", expectedAttributes, upgradedVirtualHostNodeRecord.getAttributes());
+ }
+
+ public void testUpgradeVirtualHostWithBDBHAStore()
+ {
+ Map<String, Object> hostAttributes = new HashMap<>();
+ hostAttributes.put("name", "test");
+ hostAttributes.put("modelVersion", "0.4");
+ hostAttributes.put("createdBy", "webadmin");
+ hostAttributes.put("createdTime", 1401385905260l);
+ hostAttributes.put("type", "BDB_HA");
+ hostAttributes.put("storePath", "/tmp/vh/bdbha");
+ hostAttributes.put("haCoalescingSync", "true");
+ hostAttributes.put("haDesignatedPrimary", "true");
+ hostAttributes.put("haGroupName", "ha");
+ hostAttributes.put("haHelperAddress", "localhost:7000");
+ hostAttributes.put("haNodeAddress", "localhost:7000");
+ hostAttributes.put("haNodeName", "n1");
+
+ ConfiguredObjectRecord virtualHostRecord = new ConfiguredObjectRecordImpl(UUID.randomUUID(), "VirtualHost",
+ hostAttributes, Collections.<String,ConfiguredObjectRecord>singletonMap("Broker", _brokerRecord));
+ DurableConfigurationStore dcs = new DurableConfigurationStoreStub(_brokerRecord, virtualHostRecord);
+
+ BrokerStoreUpgraderAndRecoverer recoverer = new BrokerStoreUpgraderAndRecoverer(_systemContext);
+ List<ConfiguredObjectRecord> records = recoverer.upgrade(dcs);
+
+ ConfiguredObjectRecord upgradedVirtualHostNodeRecord = findRecordById(virtualHostRecord.getId(), records);
+ assertEquals("Unexpected type", "VirtualHostNode", upgradedVirtualHostNodeRecord.getType());
+ Map<String,Object> expectedAttributes = new HashMap<>();
+ expectedAttributes.put("createdBy", "webadmin");
+ expectedAttributes.put("createdTime", 1401385905260l);
+ expectedAttributes.put("type", "BDB_HA");
+ expectedAttributes.put("storePath", "/tmp/vh/bdbha");
+ expectedAttributes.put("designatedPrimary", "true");
+ expectedAttributes.put("groupName", "ha");
+ expectedAttributes.put("address", "localhost:7000");
+ expectedAttributes.put("helperAddress", "localhost:7000");
+ expectedAttributes.put("name", "n1");
+ expectedAttributes.put("messageStoreProvider", true);
+ assertEquals("Unexpected attributes", expectedAttributes, upgradedVirtualHostNodeRecord.getAttributes());
+ }
+
+ public void testUpgradeVirtualHostWithMemoryStore()
+ {
+ Map<String, Object> hostAttributes = new HashMap<>();
+ hostAttributes.put("name", "test");
+ hostAttributes.put("modelVersion", "0.4");
+ hostAttributes.put("storeType", "memory");
+ hostAttributes.put("createdBy", "webadmin");
+ hostAttributes.put("createdTime", 1401385905260l);
+ hostAttributes.put("type", "STANDARD");
+
+ ConfiguredObjectRecord virtualHostRecord = new ConfiguredObjectRecordImpl(UUID.randomUUID(), "VirtualHost",
+ hostAttributes, Collections.<String,ConfiguredObjectRecord>singletonMap("Broker", _brokerRecord));
+ DurableConfigurationStore dcs = new DurableConfigurationStoreStub(_brokerRecord, virtualHostRecord);
+
+ BrokerStoreUpgraderAndRecoverer recoverer = new BrokerStoreUpgraderAndRecoverer(_systemContext);
+ List<ConfiguredObjectRecord> records = recoverer.upgrade(dcs);
+
+ ConfiguredObjectRecord upgradedVirtualHostNodeRecord = findRecordById(virtualHostRecord.getId(), records);
+ assertEquals("Unexpected type", "VirtualHostNode", upgradedVirtualHostNodeRecord.getType());
+ Map<String,Object> expectedAttributes = new HashMap<>();
+ expectedAttributes.put("createdBy", "webadmin");
+ expectedAttributes.put("createdTime", 1401385905260l);
+ expectedAttributes.put("name", "test");
+ expectedAttributes.put("type", "Memory");
+ expectedAttributes.put("messageStoreProvider", true);
+ assertEquals("Unexpected attributes", expectedAttributes, upgradedVirtualHostNodeRecord.getAttributes());
+ }
+
+ private ConfiguredObjectRecord findRecordById(UUID id, List<ConfiguredObjectRecord> records)
+ {
+ for (ConfiguredObjectRecord configuredObjectRecord : records)
+ {
+ if (configuredObjectRecord.getId().equals(id))
+ {
+ return configuredObjectRecord;
+ }
+ }
+ return null;
+ }
+
+ class DurableConfigurationStoreStub implements DurableConfigurationStore
+ {
+ private ConfiguredObjectRecord[] records;
+
+ public DurableConfigurationStoreStub(ConfiguredObjectRecord... records)
+ {
+ super();
+ this.records = records;
+ }
+
+ @Override
+ public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings) throws StoreException
+ {
+ }
+
+ @Override
+ public void create(ConfiguredObjectRecord object) throws StoreException
+ {
+ }
+
+ @Override
+ public UUID[] remove(ConfiguredObjectRecord... objects) throws StoreException
+ {
+ return null;
+ }
+
+ @Override
+ public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException
+ {
+ }
+
+ @Override
+ public void closeConfigurationStore() throws StoreException
+ {
+ }
+
+ @Override
+ public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws StoreException
+ {
+ handler.begin();
+ for (ConfiguredObjectRecord record : records)
+ {
+ handler.handle(record);
+ }
+ handler.end();
+ }
+ }
+}