summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore
diff options
context:
space:
mode:
authorAlex Rudyy <orudyy@apache.org>2014-06-06 08:26:02 +0000
committerAlex Rudyy <orudyy@apache.org>2014-06-06 08:26:02 +0000
commit53fd008b70676ce1382bec414bcd0d86299a4ced (patch)
tree27c3e38bcc0d4a0551d048e44da19618474bc1e0 /qpid/java/bdbstore
parent60cb3d99e3661103d20cdd7a9d599c62fe2d4b8f (diff)
downloadqpid-python-53fd008b70676ce1382bec414bcd0d86299a4ced.tar.gz
QPID-5715: Fix various issues with configuration upgrader to model 2 and store upgraders to version 8
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1600823 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java6
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java65
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8Test.java26
3 files changed, 71 insertions, 26 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index 39bfdbc6bf..9fc90f9d7c 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -25,7 +25,6 @@ import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -233,11 +232,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
child.addParent(hk.getParentType(), parent);
}
- else if(hk.getParentType().equals("Exchange"))
- {
- // TODO - remove this hack for the pre-defined exchanges
- child.addParent(hk.getParentType(), new BDBConfiguredObjectRecord(parentId, "Exchange", Collections.<String,Object>emptyMap()));
- }
}
}
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java
index faccd2fdf4..05841b86ae 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java
@@ -53,6 +53,15 @@ public class UpgradeFrom7To8 extends AbstractStoreUpgrade
{
private static final TypeReference<HashMap<String, Object>> MAP_TYPE_REFERENCE = new TypeReference<HashMap<String,Object>>(){};
+ @SuppressWarnings("serial")
+ private Map<String, String> _defaultExchanges = new HashMap<String, String>()
+ {{
+ put("amq.direct", "direct");
+ put("amq.topic", "topic");
+ put("amq.fanout", "fanout");
+ put("amq.match", "headers");
+ }};
+
@Override
public void performUpgrade(Environment environment, UpgradeInteractionHandler handler, ConfiguredObject<?> parent)
{
@@ -76,10 +85,10 @@ public class UpgradeFrom7To8 extends AbstractStoreUpgrade
}
configVersionDb.close();
+ String virtualHostName = parent.getName();
Map<String, Object> virtualHostAttributes = new HashMap<String, Object>();
virtualHostAttributes.put("modelVersion", stringifiedConfigVersion);
- virtualHostAttributes.put("name", parent.getName());
- String virtualHostName = parent.getName();
+ virtualHostAttributes.put("name", virtualHostName);
UUID virtualHostId = UUIDGenerator.generateVhostUUID(virtualHostName);
ConfiguredObjectRecord virtualHostRecord = new org.apache.qpid.server.store.ConfiguredObjectRecordImpl(virtualHostId, "VirtualHost", virtualHostAttributes);
@@ -90,33 +99,41 @@ public class UpgradeFrom7To8 extends AbstractStoreUpgrade
objectsCursor = configuredObjectsDb.openCursor(txn, null);
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry value = new DatabaseEntry();
+ ObjectMapper mapper = new ObjectMapper();
while (objectsCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
UUID id = UUIDTupleBinding.getInstance().entryToObject(key);
TupleInput input = TupleBinding.entryToInput(value);
String type = input.readString();
+ String json = input.readString();
+ Map<String,Object> attributes = null;
+ try
+ {
+ attributes = mapper.readValue(json, MAP_TYPE_REFERENCE);
+ }
+ catch (Exception e)
+ {
+ throw new StoreException(e);
+ }
+ String name = (String)attributes.get("name");
+
+ if (type.equals("Exchange"))
+ {
+ _defaultExchanges.remove(name);
+ }
if(!type.endsWith("Binding"))
{
- UUIDTupleBinding.getInstance().objectToEntry(virtualHostId, value);
- TupleOutput tupleOutput = new TupleOutput();
- tupleOutput.writeLong(id.getMostSignificantBits());
- tupleOutput.writeLong(id.getLeastSignificantBits());
- tupleOutput.writeString("VirtualHost");
- TupleBinding.outputToEntry(tupleOutput, key);
- hierarchyDb.put(txn, key, value);
+ storeVirtualHostHierarchyRecord(hierarchyDb, txn, id, virtualHostId);
}
else
{
- String json = input.readString();
- ObjectMapper mapper = new ObjectMapper();
try
{
DatabaseEntry hierarchyKey = new DatabaseEntry();
DatabaseEntry hierarchyValue = new DatabaseEntry();
- Map<String,Object> attributes = mapper.readValue(json, MAP_TYPE_REFERENCE);
Object queueIdString = attributes.remove("queue");
if(queueIdString instanceof String)
{
@@ -165,6 +182,17 @@ public class UpgradeFrom7To8 extends AbstractStoreUpgrade
}
storeConfiguredObjectEntry(configuredObjectsDb, txn, virtualHostRecord);
+ for (Map.Entry<String, String> defaultExchangeEntry : _defaultExchanges.entrySet())
+ {
+ UUID id = UUIDGenerator.generateExchangeUUID(defaultExchangeEntry.getKey(), virtualHostName);
+ Map<String, Object> exchangeAttributes = new HashMap<String, Object>();
+ exchangeAttributes.put("name", defaultExchangeEntry.getKey());
+ exchangeAttributes.put("type", defaultExchangeEntry.getValue());
+ exchangeAttributes.put("lifetimePolicy", "PERMANENT");
+ ConfiguredObjectRecord exchangeRecord = new org.apache.qpid.server.store.ConfiguredObjectRecordImpl(id, "Exchange", exchangeAttributes);
+ storeConfiguredObjectEntry(configuredObjectsDb, txn, exchangeRecord);
+ storeVirtualHostHierarchyRecord(hierarchyDb, txn, id, virtualHostId);
+ }
txn.commit();
hierarchyDb.close();
@@ -173,6 +201,19 @@ public class UpgradeFrom7To8 extends AbstractStoreUpgrade
reportFinished(environment, 8);
}
+ void storeVirtualHostHierarchyRecord(Database hierarchyDb, Transaction txn, UUID id, UUID virtualHostId)
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+ UUIDTupleBinding.getInstance().objectToEntry(virtualHostId, value);
+ TupleOutput tupleOutput = new TupleOutput();
+ tupleOutput.writeLong(id.getMostSignificantBits());
+ tupleOutput.writeLong(id.getLeastSignificantBits());
+ tupleOutput.writeString("VirtualHost");
+ TupleBinding.outputToEntry(tupleOutput, key);
+ hierarchyDb.put(txn, key, value);
+ }
+
private int getConfigVersion(Database configVersionDb)
{
Cursor cursor = null;
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8Test.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8Test.java
index 9bd5c96fe6..fc7142e9e4 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8Test.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8Test.java
@@ -70,8 +70,8 @@ public class UpgradeFrom7To8Test extends AbstractUpgradeTestCase
UpgradeFrom7To8 upgrade = new UpgradeFrom7To8();
upgrade.performUpgrade(_environment, UpgradeInteractionHandler.DEFAULT_HANDLER, getVirtualHost());
- assertDatabaseRecordCount(CONFIGURED_OBJECTS_DB_NAME, 7);
- assertDatabaseRecordCount(CONFIGURED_OBJECT_HIERARCHY_DB_NAME, 9);
+ assertDatabaseRecordCount(CONFIGURED_OBJECTS_DB_NAME, 11);
+ assertDatabaseRecordCount(CONFIGURED_OBJECT_HIERARCHY_DB_NAME, 13);
assertConfiguredObjects();
assertConfiguredObjectHierarchy();
@@ -81,7 +81,7 @@ public class UpgradeFrom7To8Test extends AbstractUpgradeTestCase
private void assertConfiguredObjectHierarchy()
{
Map<UpgradeHierarchyKey, UUID> hierarchy = loadConfiguredObjectHierarchy();
- assertEquals("Unexpected number of configured objects", 9, hierarchy.size());
+ assertEquals("Unexpected number of configured objects", 13, hierarchy.size());
UUID vhUuid = UUIDGenerator.generateVhostUUID(getVirtualHost().getName());
UUID myExchUuid = UUIDGenerator.generateExchangeUUID("myexch", getVirtualHost().getName());
@@ -97,16 +97,21 @@ public class UpgradeFrom7To8Test extends AbstractUpgradeTestCase
UpgradeHierarchyKey queue1ToVhParent = new UpgradeHierarchyKey(queue1Uuid, VirtualHost.class.getSimpleName());
assertExpectedHierarchyEntry(hierarchy, queue1ToVhParent, vhUuid);
- // ! amq.direct -> virtualhost (This will change when the upgrader is changed to create the default exchanges)
- UpgradeHierarchyKey amqDirectToVhParent = new UpgradeHierarchyKey(amqDirectUuid, VirtualHost.class.getSimpleName());
- assertFalse("amq.direct should not have a binding to virtualhost", hierarchy.containsKey(amqDirectToVhParent));
-
// queue1binding -> amq.direct
// queue1binding -> queue1
UpgradeHierarchyKey queue1BindingToAmqDirect = new UpgradeHierarchyKey(queue1ToAmqDirectBindingUuid, Exchange.class.getSimpleName());
UpgradeHierarchyKey queue1BindingToQueue1 = new UpgradeHierarchyKey(queue1ToAmqDirectBindingUuid, Queue.class.getSimpleName());
assertExpectedHierarchyEntry(hierarchy, queue1BindingToAmqDirect, amqDirectUuid);
assertExpectedHierarchyEntry(hierarchy, queue1BindingToQueue1, queue1Uuid);
+
+ String[] defaultExchanges = {"amq.topic", "amq.fanout", "amq.direct", "amq.match"};
+ for (String exchangeName : defaultExchanges)
+ {
+ UUID id = UUIDGenerator.generateExchangeUUID(exchangeName, getVirtualHost().getName());
+ UpgradeHierarchyKey exchangeParent = new UpgradeHierarchyKey(id, VirtualHost.class.getSimpleName());
+ assertExpectedHierarchyEntry(hierarchy, exchangeParent, vhUuid);
+ }
+
}
private void assertExpectedHierarchyEntry(
@@ -121,7 +126,7 @@ public class UpgradeFrom7To8Test extends AbstractUpgradeTestCase
private void assertConfiguredObjects()
{
Map<UUID, UpgradeConfiguredObjectRecord> configuredObjects = loadConfiguredObjects();
- assertEquals("Unexpected number of configured objects", 7, configuredObjects.size());
+ assertEquals("Unexpected number of configured objects", 11, configuredObjects.size());
Map<UUID, Map<String, Object>> expected = new HashMap<UUID, Map<String, Object>>();
@@ -137,6 +142,11 @@ public class UpgradeFrom7To8Test extends AbstractUpgradeTestCase
expected.putAll(createExpectedBindingMap("queue1", "queue1", "myexch", null));
expected.putAll(createExpectedBindingMap("queue2", "queue2", "amq.fanout", null));
+ expected.putAll(createExpectedExchangeMap("amq.direct", "direct"));
+ expected.putAll(createExpectedExchangeMap("amq.fanout", "fanout"));
+ expected.putAll(createExpectedExchangeMap("amq.match", "headers"));
+ expected.putAll(createExpectedExchangeMap("amq.topic", "topic"));
+
MapJsonSerializer jsonSerializer = new MapJsonSerializer();
for (Entry<UUID, UpgradeConfiguredObjectRecord> entry : configuredObjects.entrySet())
{