diff options
| author | Alex Rudyy <orudyy@apache.org> | 2014-06-06 08:26:02 +0000 |
|---|---|---|
| committer | Alex Rudyy <orudyy@apache.org> | 2014-06-06 08:26:02 +0000 |
| commit | 53fd008b70676ce1382bec414bcd0d86299a4ced (patch) | |
| tree | 27c3e38bcc0d4a0551d048e44da19618474bc1e0 /qpid/java | |
| parent | 60cb3d99e3661103d20cdd7a9d599c62fe2d4b8f (diff) | |
| download | qpid-python-53fd008b70676ce1382bec414bcd0d86299a4ced.tar.gz | |
QPID-5715: Fix various issues with configuration upgrader to model 2 and store upgraders to version 8
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1600823 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
8 files changed, 513 insertions, 249 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index 39bfdbc6bf..9fc90f9d7c 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -25,7 +25,6 @@ import java.lang.ref.SoftReference; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -233,11 +232,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { child.addParent(hk.getParentType(), parent); } - else if(hk.getParentType().equals("Exchange")) - { - // TODO - remove this hack for the pre-defined exchanges - child.addParent(hk.getParentType(), new BDBConfiguredObjectRecord(parentId, "Exchange", Collections.<String,Object>emptyMap())); - } } } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java index faccd2fdf4..05841b86ae 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java @@ -53,6 +53,15 @@ public class UpgradeFrom7To8 extends AbstractStoreUpgrade { private static final TypeReference<HashMap<String, Object>> MAP_TYPE_REFERENCE = new TypeReference<HashMap<String,Object>>(){}; + @SuppressWarnings("serial") + private Map<String, String> _defaultExchanges = new HashMap<String, String>() + {{ + put("amq.direct", "direct"); + put("amq.topic", "topic"); + put("amq.fanout", "fanout"); + put("amq.match", "headers"); + }}; + @Override public void performUpgrade(Environment environment, UpgradeInteractionHandler handler, ConfiguredObject<?> parent) { @@ -76,10 +85,10 @@ public class UpgradeFrom7To8 extends AbstractStoreUpgrade } configVersionDb.close(); + String virtualHostName = parent.getName(); Map<String, Object> virtualHostAttributes = new HashMap<String, Object>(); virtualHostAttributes.put("modelVersion", stringifiedConfigVersion); - virtualHostAttributes.put("name", parent.getName()); - String virtualHostName = parent.getName(); + virtualHostAttributes.put("name", virtualHostName); UUID virtualHostId = UUIDGenerator.generateVhostUUID(virtualHostName); ConfiguredObjectRecord virtualHostRecord = new org.apache.qpid.server.store.ConfiguredObjectRecordImpl(virtualHostId, "VirtualHost", virtualHostAttributes); @@ -90,33 +99,41 @@ public class UpgradeFrom7To8 extends AbstractStoreUpgrade objectsCursor = configuredObjectsDb.openCursor(txn, null); DatabaseEntry key = new DatabaseEntry(); DatabaseEntry value = new DatabaseEntry(); + ObjectMapper mapper = new ObjectMapper(); while (objectsCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) { UUID id = UUIDTupleBinding.getInstance().entryToObject(key); TupleInput input = TupleBinding.entryToInput(value); String type = input.readString(); + String json = input.readString(); + Map<String,Object> attributes = null; + try + { + attributes = mapper.readValue(json, MAP_TYPE_REFERENCE); + } + catch (Exception e) + { + throw new StoreException(e); + } + String name = (String)attributes.get("name"); + + if (type.equals("Exchange")) + { + _defaultExchanges.remove(name); + } if(!type.endsWith("Binding")) { - UUIDTupleBinding.getInstance().objectToEntry(virtualHostId, value); - TupleOutput tupleOutput = new TupleOutput(); - tupleOutput.writeLong(id.getMostSignificantBits()); - tupleOutput.writeLong(id.getLeastSignificantBits()); - tupleOutput.writeString("VirtualHost"); - TupleBinding.outputToEntry(tupleOutput, key); - hierarchyDb.put(txn, key, value); + storeVirtualHostHierarchyRecord(hierarchyDb, txn, id, virtualHostId); } else { - String json = input.readString(); - ObjectMapper mapper = new ObjectMapper(); try { DatabaseEntry hierarchyKey = new DatabaseEntry(); DatabaseEntry hierarchyValue = new DatabaseEntry(); - Map<String,Object> attributes = mapper.readValue(json, MAP_TYPE_REFERENCE); Object queueIdString = attributes.remove("queue"); if(queueIdString instanceof String) { @@ -165,6 +182,17 @@ public class UpgradeFrom7To8 extends AbstractStoreUpgrade } storeConfiguredObjectEntry(configuredObjectsDb, txn, virtualHostRecord); + for (Map.Entry<String, String> defaultExchangeEntry : _defaultExchanges.entrySet()) + { + UUID id = UUIDGenerator.generateExchangeUUID(defaultExchangeEntry.getKey(), virtualHostName); + Map<String, Object> exchangeAttributes = new HashMap<String, Object>(); + exchangeAttributes.put("name", defaultExchangeEntry.getKey()); + exchangeAttributes.put("type", defaultExchangeEntry.getValue()); + exchangeAttributes.put("lifetimePolicy", "PERMANENT"); + ConfiguredObjectRecord exchangeRecord = new org.apache.qpid.server.store.ConfiguredObjectRecordImpl(id, "Exchange", exchangeAttributes); + storeConfiguredObjectEntry(configuredObjectsDb, txn, exchangeRecord); + storeVirtualHostHierarchyRecord(hierarchyDb, txn, id, virtualHostId); + } txn.commit(); hierarchyDb.close(); @@ -173,6 +201,19 @@ public class UpgradeFrom7To8 extends AbstractStoreUpgrade reportFinished(environment, 8); } + void storeVirtualHostHierarchyRecord(Database hierarchyDb, Transaction txn, UUID id, UUID virtualHostId) + { + DatabaseEntry key = new DatabaseEntry(); + DatabaseEntry value = new DatabaseEntry(); + UUIDTupleBinding.getInstance().objectToEntry(virtualHostId, value); + TupleOutput tupleOutput = new TupleOutput(); + tupleOutput.writeLong(id.getMostSignificantBits()); + tupleOutput.writeLong(id.getLeastSignificantBits()); + tupleOutput.writeString("VirtualHost"); + TupleBinding.outputToEntry(tupleOutput, key); + hierarchyDb.put(txn, key, value); + } + private int getConfigVersion(Database configVersionDb) { Cursor cursor = null; diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8Test.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8Test.java index 9bd5c96fe6..fc7142e9e4 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8Test.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8Test.java @@ -70,8 +70,8 @@ public class UpgradeFrom7To8Test extends AbstractUpgradeTestCase UpgradeFrom7To8 upgrade = new UpgradeFrom7To8(); upgrade.performUpgrade(_environment, UpgradeInteractionHandler.DEFAULT_HANDLER, getVirtualHost()); - assertDatabaseRecordCount(CONFIGURED_OBJECTS_DB_NAME, 7); - assertDatabaseRecordCount(CONFIGURED_OBJECT_HIERARCHY_DB_NAME, 9); + assertDatabaseRecordCount(CONFIGURED_OBJECTS_DB_NAME, 11); + assertDatabaseRecordCount(CONFIGURED_OBJECT_HIERARCHY_DB_NAME, 13); assertConfiguredObjects(); assertConfiguredObjectHierarchy(); @@ -81,7 +81,7 @@ public class UpgradeFrom7To8Test extends AbstractUpgradeTestCase private void assertConfiguredObjectHierarchy() { Map<UpgradeHierarchyKey, UUID> hierarchy = loadConfiguredObjectHierarchy(); - assertEquals("Unexpected number of configured objects", 9, hierarchy.size()); + assertEquals("Unexpected number of configured objects", 13, hierarchy.size()); UUID vhUuid = UUIDGenerator.generateVhostUUID(getVirtualHost().getName()); UUID myExchUuid = UUIDGenerator.generateExchangeUUID("myexch", getVirtualHost().getName()); @@ -97,16 +97,21 @@ public class UpgradeFrom7To8Test extends AbstractUpgradeTestCase UpgradeHierarchyKey queue1ToVhParent = new UpgradeHierarchyKey(queue1Uuid, VirtualHost.class.getSimpleName()); assertExpectedHierarchyEntry(hierarchy, queue1ToVhParent, vhUuid); - // ! amq.direct -> virtualhost (This will change when the upgrader is changed to create the default exchanges) - UpgradeHierarchyKey amqDirectToVhParent = new UpgradeHierarchyKey(amqDirectUuid, VirtualHost.class.getSimpleName()); - assertFalse("amq.direct should not have a binding to virtualhost", hierarchy.containsKey(amqDirectToVhParent)); - // queue1binding -> amq.direct // queue1binding -> queue1 UpgradeHierarchyKey queue1BindingToAmqDirect = new UpgradeHierarchyKey(queue1ToAmqDirectBindingUuid, Exchange.class.getSimpleName()); UpgradeHierarchyKey queue1BindingToQueue1 = new UpgradeHierarchyKey(queue1ToAmqDirectBindingUuid, Queue.class.getSimpleName()); assertExpectedHierarchyEntry(hierarchy, queue1BindingToAmqDirect, amqDirectUuid); assertExpectedHierarchyEntry(hierarchy, queue1BindingToQueue1, queue1Uuid); + + String[] defaultExchanges = {"amq.topic", "amq.fanout", "amq.direct", "amq.match"}; + for (String exchangeName : defaultExchanges) + { + UUID id = UUIDGenerator.generateExchangeUUID(exchangeName, getVirtualHost().getName()); + UpgradeHierarchyKey exchangeParent = new UpgradeHierarchyKey(id, VirtualHost.class.getSimpleName()); + assertExpectedHierarchyEntry(hierarchy, exchangeParent, vhUuid); + } + } private void assertExpectedHierarchyEntry( @@ -121,7 +126,7 @@ public class UpgradeFrom7To8Test extends AbstractUpgradeTestCase private void assertConfiguredObjects() { Map<UUID, UpgradeConfiguredObjectRecord> configuredObjects = loadConfiguredObjects(); - assertEquals("Unexpected number of configured objects", 7, configuredObjects.size()); + assertEquals("Unexpected number of configured objects", 11, configuredObjects.size()); Map<UUID, Map<String, Object>> expected = new HashMap<UUID, Map<String, Object>>(); @@ -137,6 +142,11 @@ public class UpgradeFrom7To8Test extends AbstractUpgradeTestCase expected.putAll(createExpectedBindingMap("queue1", "queue1", "myexch", null)); expected.putAll(createExpectedBindingMap("queue2", "queue2", "amq.fanout", null)); + expected.putAll(createExpectedExchangeMap("amq.direct", "direct")); + expected.putAll(createExpectedExchangeMap("amq.fanout", "fanout")); + expected.putAll(createExpectedExchangeMap("amq.match", "headers")); + expected.putAll(createExpectedExchangeMap("amq.topic", "topic")); + MapJsonSerializer jsonSerializer = new MapJsonSerializer(); for (Entry<UUID, UpgradeConfiguredObjectRecord> entry : configuredObjects.entrySet()) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java index 6d95c5d3a2..14c9e7d3e0 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java @@ -632,8 +632,8 @@ public class MemoryConfigurationEntryStore implements ConfigurationEntryStore { Map<String, Class<? extends ConfiguredObject>> relationships = new HashMap<String, Class<? extends ConfiguredObject>>(); - Collection<Class<? extends ConfiguredObject>> children = _parent.getModel().getChildTypes(Broker.class); - for (Class<? extends ConfiguredObject> childClass : children) + Collection<Class<? extends ConfiguredObject>> categories = _parent.getModel().getSupportedCategories(); + for (Class<? extends ConfiguredObject> childClass : categories) { String name = childClass.getSimpleName().toLowerCase(); String relationshipName = name + (name.endsWith("s") ? "es" : "s"); @@ -812,29 +812,7 @@ public class MemoryConfigurationEntryStore implements ConfigurationEntryStore private Class<? extends ConfiguredObject> findExpectedChildConfiguredObjectClass(String parentFieldName, Class<? extends ConfiguredObject> parentConfiguredObjectClass) { - if (parentConfiguredObjectClass == Broker.class) - { - return _brokerChildrenRelationshipMap.get(parentFieldName); - } - - // for non-broker parent classes - // try to determine the child class from the model by iterating through the children classes - // for the parent configured object class - if (parentConfiguredObjectClass != null) - { - Collection<Class<? extends ConfiguredObject>> childTypes = _parent.getModel().getChildTypes(parentConfiguredObjectClass); - for (Class<? extends ConfiguredObject> childType : childTypes) - { - String relationship = childType.getSimpleName().toLowerCase(); - relationship += relationship.endsWith("s") ? "es": "s"; - if (parentFieldName.equals(relationship)) - { - return childType; - } - } - } - - return null; + return _brokerChildrenRelationshipMap.get(parentFieldName); } private Object toObject(JsonNode node) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index 2f83b34692..e0c1f77d2b 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -306,11 +306,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC { child.addParent(parentType, parent); } - else if(child != null && child.getType().endsWith("Binding") && parentType.equals("Exchange")) - { - // TODO - remove this hack for amq. exchanges - child.addParent(parentType, new ConfiguredObjectRecordImpl(parentId, parentType, Collections.<String,Object>emptyMap())); - } } } finally @@ -460,10 +455,20 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC private void upgradeFromV7(ConfiguredObject<?> parent) throws SQLException { + @SuppressWarnings("serial") + Map<String, String> defaultExchanges = new HashMap<String, String>() + {{ + put("amq.direct", "direct"); + put("amq.topic", "topic"); + put("amq.fanout", "fanout"); + put("amq.match", "headers"); + }}; + Connection connection = newConnection(); try { - UUID virtualHostId = UUIDGenerator.generateVhostUUID(parent.getName()); + String virtualHostName = parent.getName(); + UUID virtualHostId = UUIDGenerator.generateVhostUUID(virtualHostName); String stringifiedConfigVersion = BrokerModel.MODEL_VERSION; boolean tableExists = tableExists(CONFIGURATION_VERSION_TABLE_NAME, connection); @@ -480,9 +485,10 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC Map<String, Object> virtualHostAttributes = new HashMap<String, Object>(); virtualHostAttributes.put("modelVersion", stringifiedConfigVersion); + virtualHostAttributes.put("name", virtualHostName); - ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecordImpl(virtualHostId, "VirtualHost", virtualHostAttributes); - insertConfiguredObject(configuredObject, connection); + ConfiguredObjectRecord virtualHostRecord = new ConfiguredObjectRecordImpl(virtualHostId, "VirtualHost", virtualHostAttributes); + insertConfiguredObject(virtualHostRecord, connection); if (getLogger().isDebugEnabled()) { @@ -504,13 +510,22 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC { UUID id = UUID.fromString(rs.getString(1)); String objectType = rs.getString(2); + if ("VirtualHost".equals(objectType)) + { + continue; + } Map<String,Object> attributes = objectMapper.readValue(getBlobAsString(rs, 3),Map.class); + if(objectType.endsWith("Binding")) { bindingsToUpdate.put(id,attributes); } else { + if (objectType.equals("Exchange")) + { + defaultExchanges.remove((String)attributes.get("name")); + } others.add(id); } } @@ -564,6 +579,19 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC { stmt.close(); } + + for (Map.Entry<String, String> defaultExchangeEntry : defaultExchanges.entrySet()) + { + UUID id = UUIDGenerator.generateExchangeUUID(defaultExchangeEntry.getKey(), virtualHostName); + Map<String, Object> exchangeAttributes = new HashMap<String, Object>(); + exchangeAttributes.put("name", defaultExchangeEntry.getKey()); + exchangeAttributes.put("type", defaultExchangeEntry.getValue()); + exchangeAttributes.put("lifetimePolicy", "PERMANENT"); + Map<String, ConfiguredObjectRecord> parents = Collections.singletonMap("VirtualHost", virtualHostRecord); + ConfiguredObjectRecord exchangeRecord = new org.apache.qpid.server.store.ConfiguredObjectRecordImpl(id, "Exchange", exchangeAttributes, parents); + insertConfiguredObject(exchangeRecord, connection); + } + stmt = connection.prepareStatement(UPDATE_CONFIGURED_OBJECTS); try { @@ -604,12 +632,24 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC { stmt.close(); } - connection.commit(); if (tableExists) { dropConfigVersionTable(connection); } + + connection.commit(); + } + catch(SQLException e) + { + try + { + connection.rollback(); + } + catch(SQLException re) + { + } + throw e; } finally { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java index 7e49e0f6d3..7fff9151d9 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java @@ -25,7 +25,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.configuration.store.StoreConfigurationChangeListener; @@ -48,7 +47,7 @@ public class BrokerStoreUpgraderAndRecoverer register(new Upgrader_1_0_to_1_1()); register(new Upgrader_1_1_to_1_2()); register(new Upgrader_1_2_to_1_3()); - register(new Upgrader_1_3_to_1_4()); + register(new Upgrader_1_3_to_2_0()); } private void register(StoreUpgraderPhase upgrader) @@ -159,20 +158,16 @@ public class BrokerStoreUpgraderAndRecoverer } - private static final class Upgrader_1_3_to_1_4 extends StoreUpgraderPhase + private static final class Upgrader_1_3_to_2_0 extends StoreUpgraderPhase { - private Upgrader_1_3_to_1_4() + private final VirtualHostEntryUpgrader _virtualHostUpgrader; + + private Upgrader_1_3_to_2_0() { - super("modelVersion", "1.3", "1.4"); + super("modelVersion", "1.3", "2.0"); + _virtualHostUpgrader = new VirtualHostEntryUpgrader(); } - @SuppressWarnings("serial") - private Map<String, VirtualHostEntryUpgrader> _vhostUpgraderMap = new HashMap<String, VirtualHostEntryUpgrader>() - {{ - put("BDB_HA", new BdbHaVirtualHostUpgrader()); - put("STANDARD", new StandardVirtualHostUpgrader()); - }}; - @Override public void configuredObject(ConfiguredObjectRecord record) { @@ -184,13 +179,7 @@ public class BrokerStoreUpgraderAndRecoverer throw new IllegalConfigurationException("Auto-upgrade of virtual host " + attributes.get("name") + " having XML configuration is not supported. Virtual host configuration file is " + attributes.get("configPath")); } - String type = (String) attributes.get("type"); - VirtualHostEntryUpgrader vhostUpgrader = _vhostUpgraderMap.get(type); - if (vhostUpgrader == null) - { - throw new IllegalConfigurationException("Don't know how to perform an upgrade from version for virtualhost type " + type); - } - record = vhostUpgrader.upgrade(record); + record = _virtualHostUpgrader.upgrade(record); getUpdateMap().put(record.getId(), record); } else if (record.getType().equals("Plugin") && record.getAttributes().containsKey("pluginType")) @@ -218,143 +207,119 @@ public class BrokerStoreUpgraderAndRecoverer } - private static interface VirtualHostEntryUpgrader - { - ConfiguredObjectRecord upgrade(ConfiguredObjectRecord vhost); - } - - private static class StandardVirtualHostUpgrader implements VirtualHostEntryUpgrader + private static class VirtualHostEntryUpgrader { @SuppressWarnings("serial") - Map<String, AttributesTransformer> _messageStoreAttributeTransformers = new HashMap<String, AttributesTransformer>() + Map<String, AttributesTransformer> _messageStoreToNodeTransformers = new HashMap<String, AttributesTransformer>() {{ put("DERBY", new AttributesTransformer(). + addAttributeTransformer("id", copyAttribute()). + addAttributeTransformer("name", copyAttribute()). + addAttributeTransformer("createdTime", copyAttribute()). + addAttributeTransformer("createdBy", copyAttribute()). addAttributeTransformer("storePath", copyAttribute()). addAttributeTransformer("storeUnderfullSize", copyAttribute()). - addAttributeTransformer("storeOverfullSize", copyAttribute()). - addAttributeTransformer("storeType", mutateAttributeValue("DERBY"))); - put("MEMORY", new AttributesTransformer(). - addAttributeTransformer("storeType", mutateAttributeValue("Memory"))); + addAttributeTransformer("storeOverfullSize", copyAttribute())); + put("Memory", new AttributesTransformer(). + addAttributeTransformer("id", copyAttribute()). + addAttributeTransformer("name", copyAttribute()). + addAttributeTransformer("createdTime", copyAttribute()). + addAttributeTransformer("createdBy", copyAttribute())); put("BDB", new AttributesTransformer(). + addAttributeTransformer("id", copyAttribute()). + addAttributeTransformer("name", copyAttribute()). + addAttributeTransformer("createdTime", copyAttribute()). + addAttributeTransformer("createdBy", copyAttribute()). addAttributeTransformer("storePath", copyAttribute()). addAttributeTransformer("storeUnderfullSize", copyAttribute()). addAttributeTransformer("storeOverfullSize", copyAttribute()). - addAttributeTransformer("bdbEnvironmentConfig", copyAttribute()). - addAttributeTransformer("storeType", mutateAttributeValue("BDB"))); + addAttributeTransformer("bdbEnvironmentConfig", mutateAttributeName("environmentConfiguration"))); put("JDBC", new AttributesTransformer(). + addAttributeTransformer("id", copyAttribute()). + addAttributeTransformer("name", copyAttribute()). + addAttributeTransformer("createdTime", copyAttribute()). + addAttributeTransformer("createdBy", copyAttribute()). addAttributeTransformer("storePath", mutateAttributeName("connectionURL")). - addAttributeTransformer("connectionURL", copyAttribute()). - addAttributeTransformer("connectionPool", copyAttribute()). - addAttributeTransformer("jdbcBigIntType", copyAttribute()). - addAttributeTransformer("jdbcBytesForBlob", copyAttribute()). - addAttributeTransformer("jdbcBlobType", copyAttribute()). - addAttributeTransformer("jdbcVarbinaryType", copyAttribute()). + addAttributeTransformer("connectionURL", mutateAttributeName("connectionUrl")). + addAttributeTransformer("connectionPool", mutateAttributeName("connectionPoolType")). + addAttributeTransformer("jdbcBigIntType", mutateAttributeName("bigIntType")). + addAttributeTransformer("jdbcBytesForBlob", mutateAttributeName("bytesForBlob")). + addAttributeTransformer("jdbcBlobType", mutateAttributeName("blobType")). + addAttributeTransformer("jdbcVarbinaryType", mutateAttributeName("varbinaryType")). addAttributeTransformer("partitionCount", copyAttribute()). addAttributeTransformer("maxConnectionsPerPartition", copyAttribute()). - addAttributeTransformer("minConnectionsPerPartition", copyAttribute()). - addAttributeTransformer("storeType", mutateAttributeValue("JDBC"))); - }}; - - @SuppressWarnings("serial") - Map<String, AttributesTransformer> _configurationStoreAttributeTransformers = new HashMap<String, AttributesTransformer>() - {{ - put("DERBY", new AttributesTransformer(). - addAttributeTransformer("configStorePath", mutateAttributeName("storePath")). - addAttributeTransformer("configStoreType", mutateAttributeName("storeType"), mutateAttributeValue("DERBY"))); - put("MEMORY", new AttributesTransformer(). - addAttributeTransformer("configStoreType", mutateAttributeValue("Memory"))); - put("JSON", new AttributesTransformer(). - addAttributeTransformer("configStorePath", mutateAttributeName("storePath")). - addAttributeTransformer("configStoreType", mutateAttributeName("storeType"), mutateAttributeValue("JSON"))); - put("BDB", new AttributesTransformer(). - addAttributeTransformer("configStorePath", mutateAttributeName("storePath")). - addAttributeTransformer("bdbEnvironmentConfig", copyAttribute()). - addAttributeTransformer("configStoreType", mutateAttributeName("storeType"), mutateAttributeValue("BDB"))); - put("JDBC", new AttributesTransformer(). - addAttributeTransformer("configStorePath", mutateAttributeName("connectionURL")). - addAttributeTransformer("configConnectionURL", mutateAttributeName("connectionURL")). - addAttributeTransformer("connectionPool", copyAttribute()). - addAttributeTransformer("jdbcBigIntType", copyAttribute()). - addAttributeTransformer("jdbcBytesForBlob", copyAttribute()). - addAttributeTransformer("jdbcBlobType", copyAttribute()). - addAttributeTransformer("jdbcVarbinaryType", copyAttribute()). - addAttributeTransformer("partitionCount", copyAttribute()). - addAttributeTransformer("maxConnectionsPerPartition", copyAttribute()). - addAttributeTransformer("minConnectionsPerPartition", copyAttribute()). - addAttributeTransformer("configStoreType", mutateAttributeName("storeType"), mutateAttributeValue("JDBC"))); + addAttributeTransformer("minConnectionsPerPartition", copyAttribute())); + put("BDB_HA", new AttributesTransformer(). + addAttributeTransformer("id", copyAttribute()). + addAttributeTransformer("createdTime", copyAttribute()). + addAttributeTransformer("createdBy", copyAttribute()). + addAttributeTransformer("storePath", copyAttribute()). + addAttributeTransformer("storeUnderfullSize", copyAttribute()). + addAttributeTransformer("storeOverfullSize", copyAttribute()). + addAttributeTransformer("haNodeName", mutateAttributeName("name")). + addAttributeTransformer("haGroupName", mutateAttributeName("groupName")). + addAttributeTransformer("haHelperAddress", mutateAttributeName("helperAddress")). + addAttributeTransformer("haNodeAddress", mutateAttributeName("address")). + addAttributeTransformer("haDesignatedPrimary", mutateAttributeName("designatedPrimary")). + addAttributeTransformer("haReplicationConfig", mutateAttributeName("replicatedEnvironmentConfiguration")). + addAttributeTransformer("bdbEnvironmentConfig", mutateAttributeName("environmentConfiguration"))); }}; - @Override public ConfiguredObjectRecord upgrade(ConfiguredObjectRecord vhost) { Map<String, Object> attributes = vhost.getAttributes(); - Map<String, Object> newAttributes = new HashMap<String, Object>(attributes); - - String capitalisedStoreType = String.valueOf(attributes.get("storeType")).toUpperCase(); - AttributesTransformer vhAttrsToMessageStoreSettings = _messageStoreAttributeTransformers.get(capitalisedStoreType); - Map<String, Object> messageStoreSettings = null; - if (vhAttrsToMessageStoreSettings != null) + String type = (String) attributes.get("type"); + AttributesTransformer nodeAttributeTransformer = null; + if ("STANDARD".equalsIgnoreCase(type)) { - messageStoreSettings = vhAttrsToMessageStoreSettings.upgrade(attributes); + if (attributes.containsKey("configStoreType")) + { + throw new IllegalConfigurationException("Auto-upgrade of virtual host " + attributes.get("name") + + " with split configuration and message store is not supported." + + " Configuration store type is " + attributes.get("configStoreType") + " and message store type is " + + attributes.get("storeType")); + } + else + { + type = (String) attributes.get("storeType"); + } } - if (attributes.containsKey("configStoreType")) + if (type == null) { - String capitaliseConfigStoreType = ((String) attributes.get("configStoreType")).toUpperCase(); - AttributesTransformer vhAttrsToConfigurationStoreSettings = _configurationStoreAttributeTransformers - .get(capitaliseConfigStoreType); - Map<String, Object> configurationStoreSettings = vhAttrsToConfigurationStoreSettings.upgrade(attributes); - newAttributes.keySet().removeAll(vhAttrsToConfigurationStoreSettings.getNamesToBeDeleted()); - newAttributes.put("configurationStoreSettings", configurationStoreSettings); + throw new IllegalConfigurationException("Cannot auto-upgrade virtual host with attributes: " + attributes); } - if (vhAttrsToMessageStoreSettings != null) + type = getVirtualHostNodeType(type); + nodeAttributeTransformer = _messageStoreToNodeTransformers.get(type); + + if (nodeAttributeTransformer == null) { - newAttributes.keySet().removeAll(vhAttrsToMessageStoreSettings.getNamesToBeDeleted()); - newAttributes.put("messageStoreSettings", messageStoreSettings); + throw new IllegalConfigurationException("Don't know how to perform an upgrade from version for virtualhost type " + type); } - return new ConfiguredObjectRecordImpl(vhost.getId(), vhost.getType(), newAttributes, vhost.getParents()); + Map<String, Object> nodeAttributes = nodeAttributeTransformer.upgrade(attributes); + nodeAttributes.put("type", type); + nodeAttributes.put("messageStoreProvider", true); + return new ConfiguredObjectRecordImpl(vhost.getId(), "VirtualHostNode", nodeAttributes, vhost.getParents()); } - } - - private static class BdbHaVirtualHostUpgrader implements VirtualHostEntryUpgrader - { - private final AttributesTransformer haAttributesTransformer = new AttributesTransformer(). - addAttributeTransformer("storePath", copyAttribute()). - addAttributeTransformer("storeUnderfullSize", copyAttribute()). - addAttributeTransformer("storeOverfullSize", copyAttribute()). - addAttributeTransformer("haNodeName", copyAttribute()). - addAttributeTransformer("haGroupName", copyAttribute()). - addAttributeTransformer("haHelperAddress", copyAttribute()). - addAttributeTransformer("haCoalescingSync", copyAttribute()). - addAttributeTransformer("haNodeAddress", copyAttribute()). - addAttributeTransformer("haDurability", copyAttribute()). - addAttributeTransformer("haDesignatedPrimary", copyAttribute()). - addAttributeTransformer("haReplicationConfig", copyAttribute()). - addAttributeTransformer("bdbEnvironmentConfig", copyAttribute()). - addAttributeTransformer("storeType", removeAttribute()); - - @Override - public ConfiguredObjectRecord upgrade(ConfiguredObjectRecord vhost) + private String getVirtualHostNodeType(String type) { - Map<String, Object> attributes = vhost.getAttributes(); - - Map<String, Object> messageStoreSettings = haAttributesTransformer.upgrade(attributes); - - Map<String, Object> newAttributes = new HashMap<String, Object>(attributes); - newAttributes.keySet().removeAll(haAttributesTransformer.getNamesToBeDeleted()); - newAttributes.put("messageStoreSettings", messageStoreSettings); - - return new ConfiguredObjectRecordImpl(vhost.getId(), vhost.getType(), newAttributes, vhost.getParents()); + for (String t : _messageStoreToNodeTransformers.keySet()) + { + if (type.equalsIgnoreCase(t)) + { + return t; + } + } + return null; } } private static class AttributesTransformer { private final Map<String, List<AttributeTransformer>> _transformers = new HashMap<String, List<AttributeTransformer>>(); - private Set<String> _namesToBeDeleted = new HashSet<String>(); public AttributesTransformer addAttributeTransformer(String string, AttributeTransformer... attributeTransformers) { @@ -386,17 +351,10 @@ public class BrokerStoreUpgraderAndRecoverer { settings.put(newEntry.getKey(), newEntry.getValue()); } - - _namesToBeDeleted.add(attributeName); } } return settings; } - - public Set<String> getNamesToBeDeleted() - { - return _namesToBeDeleted; - } } private static AttributeTransformer copyAttribute() @@ -404,16 +362,6 @@ public class BrokerStoreUpgraderAndRecoverer return CopyAttribute.INSTANCE; } - private static AttributeTransformer removeAttribute() - { - return RemoveAttribute.INSTANCE; - } - - private static AttributeTransformer mutateAttributeValue(Object newValue) - { - return new MutateAttributeValue(newValue); - } - private static AttributeTransformer mutateAttributeName(String newName) { return new MutateAttributeName(newName); @@ -438,22 +386,6 @@ public class BrokerStoreUpgraderAndRecoverer return entry; } } - - private static class RemoveAttribute implements AttributeTransformer - { - private static final RemoveAttribute INSTANCE = new RemoveAttribute(); - - private RemoveAttribute() - { - } - - @Override - public MutableEntry transform(MutableEntry entry) - { - return null; - } - } - private static class MutateAttributeName implements AttributeTransformer { private final String _newName; @@ -471,23 +403,6 @@ public class BrokerStoreUpgraderAndRecoverer } } - private static class MutateAttributeValue implements AttributeTransformer - { - private final Object _newValue; - - public MutateAttributeValue(Object newValue) - { - _newValue = newValue; - } - - @Override - public MutableEntry transform(MutableEntry entry) - { - entry.setValue(_newValue); - return entry; - } - } - private static class MutableEntry { private String _key; @@ -513,20 +428,12 @@ public class BrokerStoreUpgraderAndRecoverer { return _value; } - - public void setValue(Object value) - { - _value = value; - } } public Broker<?> perform(final DurableConfigurationStore store) { - final String brokerCategory = Broker.class.getSimpleName(); - final GenericStoreUpgrader upgrader = new GenericStoreUpgrader(brokerCategory, Broker.MODEL_VERSION, store, _upgraders); - upgrader.upgrade(); - - new GenericRecoverer(_systemContext, brokerCategory).recover(upgrader.getRecords()); + List<ConfiguredObjectRecord> upgradedRecords = upgrade(store); + new GenericRecoverer(_systemContext, Broker.class.getSimpleName()).recover(upgradedRecords); final StoreConfigurationChangeListener configChangeListener = new StoreConfigurationChangeListener(store); applyRecursively(_systemContext.getBroker(), new Action<ConfiguredObject<?>>() @@ -541,6 +448,13 @@ public class BrokerStoreUpgraderAndRecoverer return _systemContext.getBroker(); } + List<ConfiguredObjectRecord> upgrade(final DurableConfigurationStore store) + { + GenericStoreUpgrader upgrader = new GenericStoreUpgrader(Broker.class.getSimpleName(), Broker.MODEL_VERSION, store, _upgraders); + upgrader.upgrade(); + return upgrader.getRecords(); + } + private void applyRecursively(final ConfiguredObject<?> object, final Action<ConfiguredObject<?>> action) { applyRecursively(object, action, new HashSet<ConfiguredObject<?>>()); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java index e1772c037a..a88d647ba7 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java @@ -60,7 +60,7 @@ public class VirtualHostStoreUpgraderAndRecoverer register(new Upgrader_0_1_to_0_2()); register(new Upgrader_0_2_to_0_3()); register(new Upgrader_0_3_to_0_4()); - register(new Upgrader_0_4_to_0_5()); + register(new Upgrader_0_4_to_2_0()); Map<String, UUID> defaultExchangeIds = new HashMap<String, UUID>(); for (String exchangeName : DEFAULT_EXCHANGES.keySet()) @@ -344,15 +344,12 @@ public class VirtualHostStoreUpgraderAndRecoverer } - private class Upgrader_0_4_to_0_5 extends StoreUpgraderPhase + private class Upgrader_0_4_to_2_0 extends StoreUpgraderPhase { private Map<String, String> _missingAmqpExchanges = new HashMap<String, String>(DEFAULT_EXCHANGES); - private static final String EXCHANGE_NAME = "name"; - private static final String EXCHANGE_TYPE = "type"; - private static final String EXCHANGE_DURABLE = "durable"; private ConfiguredObjectRecord _virtualHostRecord; - public Upgrader_0_4_to_0_5() + public Upgrader_0_4_to_2_0() { super("modelVersion", "0.4", "2.0"); } @@ -372,7 +369,7 @@ public class VirtualHostStoreUpgraderAndRecoverer else if("Exchange".equals(record.getType())) { Map<String, Object> attributes = record.getAttributes(); - String name = (String)attributes.get(EXCHANGE_NAME); + String name = (String)attributes.get("name"); _missingAmqpExchanges.remove(name); } getNextUpgrader().configuredObject(record); @@ -388,9 +385,9 @@ public class VirtualHostStoreUpgraderAndRecoverer UUID id = _defaultExchangeIds.get(name); Map<String, Object> attributes = new HashMap<String, Object>(); - attributes.put(EXCHANGE_NAME, name); - attributes.put(EXCHANGE_TYPE, type); - attributes.put(EXCHANGE_DURABLE, true); + attributes.put("name", name); + attributes.put("type", type); + attributes.put("lifetimePolicy", "PERMANENT"); ConfiguredObjectRecord record = new ConfiguredObjectRecordImpl(id, Exchange.class.getSimpleName(), attributes, Collections.singletonMap(_virtualHostRecord.getType(), _virtualHostRecord)); getUpdateMap().put(id, record); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecovererTest.java new file mode 100644 index 0000000000..006c96a92a --- /dev/null +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecovererTest.java @@ -0,0 +1,290 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import static org.mockito.Mockito.mock; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.apache.qpid.server.BrokerOptions; +import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; +import org.apache.qpid.server.logging.EventLogger; +import org.apache.qpid.server.logging.LogRecorder; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.SystemContext; +import org.apache.qpid.server.model.SystemContextImpl; +import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; +import org.apache.qpid.test.utils.QpidTestCase; + + +public class BrokerStoreUpgraderAndRecovererTest extends QpidTestCase +{ + private ConfiguredObjectRecord _brokerRecord; + private CurrentThreadTaskExecutor _taskExecutor; + private SystemContext<?> _systemContext; + + public void setUp() throws Exception + { + super.setUp(); + Map<String, Object> brokerAttributes = new HashMap<>(); + brokerAttributes.put("createdTime", 1401385808828l); + brokerAttributes.put("defaultVirtualHost", "test"); + brokerAttributes.put("modelVersion", "1.3"); + brokerAttributes.put("name", "Broker"); + + _brokerRecord = new ConfiguredObjectRecordImpl(UUID.randomUUID(), "Broker", brokerAttributes); + _taskExecutor = new CurrentThreadTaskExecutor(); + _taskExecutor.start(); + _systemContext = new SystemContextImpl(_taskExecutor, + mock(EventLogger.class), + mock(LogRecorder.class), + mock(BrokerOptions.class)); + } + + public void testUpgradeVirtualHostWithJDBCStore() + { + Map<String, Object> hostAttributes = new HashMap<>(); + hostAttributes.put("name", "test"); + hostAttributes.put("modelVersion", "0.4"); + hostAttributes.put("connectionPool", "BONECP"); + hostAttributes.put("connectionURL", "jdbc:derby://localhost:1527/tmp/vh/test;create=true"); + hostAttributes.put("createdBy", "webadmin"); + hostAttributes.put("createdTime", 1401385905260l); + hostAttributes.put("maxConnectionsPerPartition", 7); + hostAttributes.put("minConnectionsPerPartition", 6); + hostAttributes.put("partitionCount", 2); + hostAttributes.put("storeType", "jdbc"); + hostAttributes.put("type", "STANDARD"); + + ConfiguredObjectRecord virtualHostRecord = new ConfiguredObjectRecordImpl(UUID.randomUUID(), "VirtualHost", + hostAttributes, Collections.<String,ConfiguredObjectRecord>singletonMap("Broker", _brokerRecord)); + DurableConfigurationStore dcs = new DurableConfigurationStoreStub(_brokerRecord, virtualHostRecord); + + BrokerStoreUpgraderAndRecoverer recoverer = new BrokerStoreUpgraderAndRecoverer(_systemContext); + List<ConfiguredObjectRecord> records = recoverer.upgrade(dcs); + + ConfiguredObjectRecord upgradedVirtualHostNodeRecord = findRecordById(virtualHostRecord.getId(), records); + assertEquals("Unexpected type", "VirtualHostNode", upgradedVirtualHostNodeRecord.getType()); + Map<String,Object> expectedAttributes = new HashMap<>(); + expectedAttributes.put("connectionPoolType", "BONECP"); + expectedAttributes.put("connectionUrl", "jdbc:derby://localhost:1527/tmp/vh/test;create=true"); + expectedAttributes.put("createdBy", "webadmin"); + expectedAttributes.put("createdTime", 1401385905260l); + expectedAttributes.put("maxConnectionsPerPartition", 7); + expectedAttributes.put("minConnectionsPerPartition", 6); + expectedAttributes.put("partitionCount", 2); + expectedAttributes.put("name", "test"); + expectedAttributes.put("type", "JDBC"); + expectedAttributes.put("messageStoreProvider", true); + assertEquals("Unexpected attributes", expectedAttributes, upgradedVirtualHostNodeRecord.getAttributes()); + } + + public void testUpgradeVirtualHostWithDerbyStore() + { + Map<String, Object> hostAttributes = new HashMap<>(); + hostAttributes.put("name", "test"); + hostAttributes.put("modelVersion", "0.4"); + hostAttributes.put("storePath", "/tmp/vh/derby"); + hostAttributes.put("storeType", "derby"); + hostAttributes.put("createdBy", "webadmin"); + hostAttributes.put("createdTime", 1401385905260l); + hostAttributes.put("type", "STANDARD"); + + ConfiguredObjectRecord virtualHostRecord = new ConfiguredObjectRecordImpl(UUID.randomUUID(), "VirtualHost", + hostAttributes, Collections.<String,ConfiguredObjectRecord>singletonMap("Broker", _brokerRecord)); + DurableConfigurationStore dcs = new DurableConfigurationStoreStub(_brokerRecord, virtualHostRecord); + + BrokerStoreUpgraderAndRecoverer recoverer = new BrokerStoreUpgraderAndRecoverer(_systemContext); + List<ConfiguredObjectRecord> records = recoverer.upgrade(dcs); + + ConfiguredObjectRecord upgradedVirtualHostNodeRecord = findRecordById(virtualHostRecord.getId(), records); + assertEquals("Unexpected type", "VirtualHostNode", upgradedVirtualHostNodeRecord.getType()); + Map<String,Object> expectedAttributes = new HashMap<>(); + expectedAttributes.put("storePath", "/tmp/vh/derby"); + expectedAttributes.put("createdBy", "webadmin"); + expectedAttributes.put("createdTime", 1401385905260l); + expectedAttributes.put("name", "test"); + expectedAttributes.put("type", "DERBY"); + expectedAttributes.put("messageStoreProvider", true); + assertEquals("Unexpected attributes", expectedAttributes, upgradedVirtualHostNodeRecord.getAttributes()); + } + + public void testUpgradeVirtualHostWithBDBStore() + { + Map<String, Object> hostAttributes = new HashMap<>(); + hostAttributes.put("name", "test"); + hostAttributes.put("modelVersion", "0.4"); + hostAttributes.put("storePath", "/tmp/vh/bdb"); + hostAttributes.put("storeType", "bdb"); + hostAttributes.put("createdBy", "webadmin"); + hostAttributes.put("createdTime", 1401385905260l); + hostAttributes.put("type", "STANDARD"); + + ConfiguredObjectRecord virtualHostRecord = new ConfiguredObjectRecordImpl(UUID.randomUUID(), "VirtualHost", + hostAttributes, Collections.<String,ConfiguredObjectRecord>singletonMap("Broker", _brokerRecord)); + DurableConfigurationStore dcs = new DurableConfigurationStoreStub(_brokerRecord, virtualHostRecord); + + BrokerStoreUpgraderAndRecoverer recoverer = new BrokerStoreUpgraderAndRecoverer(_systemContext); + List<ConfiguredObjectRecord> records = recoverer.upgrade(dcs); + + ConfiguredObjectRecord upgradedVirtualHostNodeRecord = findRecordById(virtualHostRecord.getId(), records); + assertEquals("Unexpected type", "VirtualHostNode", upgradedVirtualHostNodeRecord.getType()); + Map<String,Object> expectedAttributes = new HashMap<>(); + expectedAttributes.put("storePath", "/tmp/vh/bdb"); + expectedAttributes.put("createdBy", "webadmin"); + expectedAttributes.put("createdTime", 1401385905260l); + expectedAttributes.put("name", "test"); + expectedAttributes.put("type", "BDB"); + expectedAttributes.put("messageStoreProvider", true); + assertEquals("Unexpected attributes", expectedAttributes, upgradedVirtualHostNodeRecord.getAttributes()); + } + + public void testUpgradeVirtualHostWithBDBHAStore() + { + Map<String, Object> hostAttributes = new HashMap<>(); + hostAttributes.put("name", "test"); + hostAttributes.put("modelVersion", "0.4"); + hostAttributes.put("createdBy", "webadmin"); + hostAttributes.put("createdTime", 1401385905260l); + hostAttributes.put("type", "BDB_HA"); + hostAttributes.put("storePath", "/tmp/vh/bdbha"); + hostAttributes.put("haCoalescingSync", "true"); + hostAttributes.put("haDesignatedPrimary", "true"); + hostAttributes.put("haGroupName", "ha"); + hostAttributes.put("haHelperAddress", "localhost:7000"); + hostAttributes.put("haNodeAddress", "localhost:7000"); + hostAttributes.put("haNodeName", "n1"); + + ConfiguredObjectRecord virtualHostRecord = new ConfiguredObjectRecordImpl(UUID.randomUUID(), "VirtualHost", + hostAttributes, Collections.<String,ConfiguredObjectRecord>singletonMap("Broker", _brokerRecord)); + DurableConfigurationStore dcs = new DurableConfigurationStoreStub(_brokerRecord, virtualHostRecord); + + BrokerStoreUpgraderAndRecoverer recoverer = new BrokerStoreUpgraderAndRecoverer(_systemContext); + List<ConfiguredObjectRecord> records = recoverer.upgrade(dcs); + + ConfiguredObjectRecord upgradedVirtualHostNodeRecord = findRecordById(virtualHostRecord.getId(), records); + assertEquals("Unexpected type", "VirtualHostNode", upgradedVirtualHostNodeRecord.getType()); + Map<String,Object> expectedAttributes = new HashMap<>(); + expectedAttributes.put("createdBy", "webadmin"); + expectedAttributes.put("createdTime", 1401385905260l); + expectedAttributes.put("type", "BDB_HA"); + expectedAttributes.put("storePath", "/tmp/vh/bdbha"); + expectedAttributes.put("designatedPrimary", "true"); + expectedAttributes.put("groupName", "ha"); + expectedAttributes.put("address", "localhost:7000"); + expectedAttributes.put("helperAddress", "localhost:7000"); + expectedAttributes.put("name", "n1"); + expectedAttributes.put("messageStoreProvider", true); + assertEquals("Unexpected attributes", expectedAttributes, upgradedVirtualHostNodeRecord.getAttributes()); + } + + public void testUpgradeVirtualHostWithMemoryStore() + { + Map<String, Object> hostAttributes = new HashMap<>(); + hostAttributes.put("name", "test"); + hostAttributes.put("modelVersion", "0.4"); + hostAttributes.put("storeType", "memory"); + hostAttributes.put("createdBy", "webadmin"); + hostAttributes.put("createdTime", 1401385905260l); + hostAttributes.put("type", "STANDARD"); + + ConfiguredObjectRecord virtualHostRecord = new ConfiguredObjectRecordImpl(UUID.randomUUID(), "VirtualHost", + hostAttributes, Collections.<String,ConfiguredObjectRecord>singletonMap("Broker", _brokerRecord)); + DurableConfigurationStore dcs = new DurableConfigurationStoreStub(_brokerRecord, virtualHostRecord); + + BrokerStoreUpgraderAndRecoverer recoverer = new BrokerStoreUpgraderAndRecoverer(_systemContext); + List<ConfiguredObjectRecord> records = recoverer.upgrade(dcs); + + ConfiguredObjectRecord upgradedVirtualHostNodeRecord = findRecordById(virtualHostRecord.getId(), records); + assertEquals("Unexpected type", "VirtualHostNode", upgradedVirtualHostNodeRecord.getType()); + Map<String,Object> expectedAttributes = new HashMap<>(); + expectedAttributes.put("createdBy", "webadmin"); + expectedAttributes.put("createdTime", 1401385905260l); + expectedAttributes.put("name", "test"); + expectedAttributes.put("type", "Memory"); + expectedAttributes.put("messageStoreProvider", true); + assertEquals("Unexpected attributes", expectedAttributes, upgradedVirtualHostNodeRecord.getAttributes()); + } + + private ConfiguredObjectRecord findRecordById(UUID id, List<ConfiguredObjectRecord> records) + { + for (ConfiguredObjectRecord configuredObjectRecord : records) + { + if (configuredObjectRecord.getId().equals(id)) + { + return configuredObjectRecord; + } + } + return null; + } + + class DurableConfigurationStoreStub implements DurableConfigurationStore + { + private ConfiguredObjectRecord[] records; + + public DurableConfigurationStoreStub(ConfiguredObjectRecord... records) + { + super(); + this.records = records; + } + + @Override + public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings) throws StoreException + { + } + + @Override + public void create(ConfiguredObjectRecord object) throws StoreException + { + } + + @Override + public UUID[] remove(ConfiguredObjectRecord... objects) throws StoreException + { + return null; + } + + @Override + public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException + { + } + + @Override + public void closeConfigurationStore() throws StoreException + { + } + + @Override + public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws StoreException + { + handler.begin(); + for (ConfiguredObjectRecord record : records) + { + handler.handle(record); + } + handler.end(); + } + } +} |
