diff options
| author | Alex Rudyy <orudyy@apache.org> | 2014-06-06 08:26:02 +0000 |
|---|---|---|
| committer | Alex Rudyy <orudyy@apache.org> | 2014-06-06 08:26:02 +0000 |
| commit | 53fd008b70676ce1382bec414bcd0d86299a4ced (patch) | |
| tree | 27c3e38bcc0d4a0551d048e44da19618474bc1e0 /qpid/java/bdbstore | |
| parent | 60cb3d99e3661103d20cdd7a9d599c62fe2d4b8f (diff) | |
| download | qpid-python-53fd008b70676ce1382bec414bcd0d86299a4ced.tar.gz | |
QPID-5715: Fix various issues with configuration upgrader to model 2 and store upgraders to version 8
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1600823 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore')
3 files changed, 71 insertions, 26 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index 39bfdbc6bf..9fc90f9d7c 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -25,7 +25,6 @@ import java.lang.ref.SoftReference; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -233,11 +232,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { child.addParent(hk.getParentType(), parent); } - else if(hk.getParentType().equals("Exchange")) - { - // TODO - remove this hack for the pre-defined exchanges - child.addParent(hk.getParentType(), new BDBConfiguredObjectRecord(parentId, "Exchange", Collections.<String,Object>emptyMap())); - } } } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java index faccd2fdf4..05841b86ae 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java @@ -53,6 +53,15 @@ public class UpgradeFrom7To8 extends AbstractStoreUpgrade { private static final TypeReference<HashMap<String, Object>> MAP_TYPE_REFERENCE = new TypeReference<HashMap<String,Object>>(){}; + @SuppressWarnings("serial") + private Map<String, String> _defaultExchanges = new HashMap<String, String>() + {{ + put("amq.direct", "direct"); + put("amq.topic", "topic"); + put("amq.fanout", "fanout"); + put("amq.match", "headers"); + }}; + @Override public void performUpgrade(Environment environment, UpgradeInteractionHandler handler, ConfiguredObject<?> parent) { @@ -76,10 +85,10 @@ public class UpgradeFrom7To8 extends AbstractStoreUpgrade } configVersionDb.close(); + String virtualHostName = parent.getName(); Map<String, Object> virtualHostAttributes = new HashMap<String, Object>(); virtualHostAttributes.put("modelVersion", stringifiedConfigVersion); - virtualHostAttributes.put("name", parent.getName()); - String virtualHostName = parent.getName(); + virtualHostAttributes.put("name", virtualHostName); UUID virtualHostId = UUIDGenerator.generateVhostUUID(virtualHostName); ConfiguredObjectRecord virtualHostRecord = new org.apache.qpid.server.store.ConfiguredObjectRecordImpl(virtualHostId, "VirtualHost", virtualHostAttributes); @@ -90,33 +99,41 @@ public class UpgradeFrom7To8 extends AbstractStoreUpgrade objectsCursor = configuredObjectsDb.openCursor(txn, null); DatabaseEntry key = new DatabaseEntry(); DatabaseEntry value = new DatabaseEntry(); + ObjectMapper mapper = new ObjectMapper(); while (objectsCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) { UUID id = UUIDTupleBinding.getInstance().entryToObject(key); TupleInput input = TupleBinding.entryToInput(value); String type = input.readString(); + String json = input.readString(); + Map<String,Object> attributes = null; + try + { + attributes = mapper.readValue(json, MAP_TYPE_REFERENCE); + } + catch (Exception e) + { + throw new StoreException(e); + } + String name = (String)attributes.get("name"); + + if (type.equals("Exchange")) + { + _defaultExchanges.remove(name); + } if(!type.endsWith("Binding")) { - UUIDTupleBinding.getInstance().objectToEntry(virtualHostId, value); - TupleOutput tupleOutput = new TupleOutput(); - tupleOutput.writeLong(id.getMostSignificantBits()); - tupleOutput.writeLong(id.getLeastSignificantBits()); - tupleOutput.writeString("VirtualHost"); - TupleBinding.outputToEntry(tupleOutput, key); - hierarchyDb.put(txn, key, value); + storeVirtualHostHierarchyRecord(hierarchyDb, txn, id, virtualHostId); } else { - String json = input.readString(); - ObjectMapper mapper = new ObjectMapper(); try { DatabaseEntry hierarchyKey = new DatabaseEntry(); DatabaseEntry hierarchyValue = new DatabaseEntry(); - Map<String,Object> attributes = mapper.readValue(json, MAP_TYPE_REFERENCE); Object queueIdString = attributes.remove("queue"); if(queueIdString instanceof String) { @@ -165,6 +182,17 @@ public class UpgradeFrom7To8 extends AbstractStoreUpgrade } storeConfiguredObjectEntry(configuredObjectsDb, txn, virtualHostRecord); + for (Map.Entry<String, String> defaultExchangeEntry : _defaultExchanges.entrySet()) + { + UUID id = UUIDGenerator.generateExchangeUUID(defaultExchangeEntry.getKey(), virtualHostName); + Map<String, Object> exchangeAttributes = new HashMap<String, Object>(); + exchangeAttributes.put("name", defaultExchangeEntry.getKey()); + exchangeAttributes.put("type", defaultExchangeEntry.getValue()); + exchangeAttributes.put("lifetimePolicy", "PERMANENT"); + ConfiguredObjectRecord exchangeRecord = new org.apache.qpid.server.store.ConfiguredObjectRecordImpl(id, "Exchange", exchangeAttributes); + storeConfiguredObjectEntry(configuredObjectsDb, txn, exchangeRecord); + storeVirtualHostHierarchyRecord(hierarchyDb, txn, id, virtualHostId); + } txn.commit(); hierarchyDb.close(); @@ -173,6 +201,19 @@ public class UpgradeFrom7To8 extends AbstractStoreUpgrade reportFinished(environment, 8); } + void storeVirtualHostHierarchyRecord(Database hierarchyDb, Transaction txn, UUID id, UUID virtualHostId) + { + DatabaseEntry key = new DatabaseEntry(); + DatabaseEntry value = new DatabaseEntry(); + UUIDTupleBinding.getInstance().objectToEntry(virtualHostId, value); + TupleOutput tupleOutput = new TupleOutput(); + tupleOutput.writeLong(id.getMostSignificantBits()); + tupleOutput.writeLong(id.getLeastSignificantBits()); + tupleOutput.writeString("VirtualHost"); + TupleBinding.outputToEntry(tupleOutput, key); + hierarchyDb.put(txn, key, value); + } + private int getConfigVersion(Database configVersionDb) { Cursor cursor = null; diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8Test.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8Test.java index 9bd5c96fe6..fc7142e9e4 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8Test.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8Test.java @@ -70,8 +70,8 @@ public class UpgradeFrom7To8Test extends AbstractUpgradeTestCase UpgradeFrom7To8 upgrade = new UpgradeFrom7To8(); upgrade.performUpgrade(_environment, UpgradeInteractionHandler.DEFAULT_HANDLER, getVirtualHost()); - assertDatabaseRecordCount(CONFIGURED_OBJECTS_DB_NAME, 7); - assertDatabaseRecordCount(CONFIGURED_OBJECT_HIERARCHY_DB_NAME, 9); + assertDatabaseRecordCount(CONFIGURED_OBJECTS_DB_NAME, 11); + assertDatabaseRecordCount(CONFIGURED_OBJECT_HIERARCHY_DB_NAME, 13); assertConfiguredObjects(); assertConfiguredObjectHierarchy(); @@ -81,7 +81,7 @@ public class UpgradeFrom7To8Test extends AbstractUpgradeTestCase private void assertConfiguredObjectHierarchy() { Map<UpgradeHierarchyKey, UUID> hierarchy = loadConfiguredObjectHierarchy(); - assertEquals("Unexpected number of configured objects", 9, hierarchy.size()); + assertEquals("Unexpected number of configured objects", 13, hierarchy.size()); UUID vhUuid = UUIDGenerator.generateVhostUUID(getVirtualHost().getName()); UUID myExchUuid = UUIDGenerator.generateExchangeUUID("myexch", getVirtualHost().getName()); @@ -97,16 +97,21 @@ public class UpgradeFrom7To8Test extends AbstractUpgradeTestCase UpgradeHierarchyKey queue1ToVhParent = new UpgradeHierarchyKey(queue1Uuid, VirtualHost.class.getSimpleName()); assertExpectedHierarchyEntry(hierarchy, queue1ToVhParent, vhUuid); - // ! amq.direct -> virtualhost (This will change when the upgrader is changed to create the default exchanges) - UpgradeHierarchyKey amqDirectToVhParent = new UpgradeHierarchyKey(amqDirectUuid, VirtualHost.class.getSimpleName()); - assertFalse("amq.direct should not have a binding to virtualhost", hierarchy.containsKey(amqDirectToVhParent)); - // queue1binding -> amq.direct // queue1binding -> queue1 UpgradeHierarchyKey queue1BindingToAmqDirect = new UpgradeHierarchyKey(queue1ToAmqDirectBindingUuid, Exchange.class.getSimpleName()); UpgradeHierarchyKey queue1BindingToQueue1 = new UpgradeHierarchyKey(queue1ToAmqDirectBindingUuid, Queue.class.getSimpleName()); assertExpectedHierarchyEntry(hierarchy, queue1BindingToAmqDirect, amqDirectUuid); assertExpectedHierarchyEntry(hierarchy, queue1BindingToQueue1, queue1Uuid); + + String[] defaultExchanges = {"amq.topic", "amq.fanout", "amq.direct", "amq.match"}; + for (String exchangeName : defaultExchanges) + { + UUID id = UUIDGenerator.generateExchangeUUID(exchangeName, getVirtualHost().getName()); + UpgradeHierarchyKey exchangeParent = new UpgradeHierarchyKey(id, VirtualHost.class.getSimpleName()); + assertExpectedHierarchyEntry(hierarchy, exchangeParent, vhUuid); + } + } private void assertExpectedHierarchyEntry( @@ -121,7 +126,7 @@ public class UpgradeFrom7To8Test extends AbstractUpgradeTestCase private void assertConfiguredObjects() { Map<UUID, UpgradeConfiguredObjectRecord> configuredObjects = loadConfiguredObjects(); - assertEquals("Unexpected number of configured objects", 7, configuredObjects.size()); + assertEquals("Unexpected number of configured objects", 11, configuredObjects.size()); Map<UUID, Map<String, Object>> expected = new HashMap<UUID, Map<String, Object>>(); @@ -137,6 +142,11 @@ public class UpgradeFrom7To8Test extends AbstractUpgradeTestCase expected.putAll(createExpectedBindingMap("queue1", "queue1", "myexch", null)); expected.putAll(createExpectedBindingMap("queue2", "queue2", "amq.fanout", null)); + expected.putAll(createExpectedExchangeMap("amq.direct", "direct")); + expected.putAll(createExpectedExchangeMap("amq.fanout", "fanout")); + expected.putAll(createExpectedExchangeMap("amq.match", "headers")); + expected.putAll(createExpectedExchangeMap("amq.topic", "topic")); + MapJsonSerializer jsonSerializer = new MapJsonSerializer(); for (Entry<UUID, UpgradeConfiguredObjectRecord> entry : configuredObjects.entrySet()) { |
