diff options
| author | Keith Wall <kwall@apache.org> | 2014-03-25 17:54:10 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2014-03-25 17:54:10 +0000 |
| commit | cd6130384dc5f27ad494eabf8a2b15ca79280aa1 (patch) | |
| tree | 77d7b1f0ced2cea6b031327fcb5c8143d763cf9d /qpid/java/bdbstore | |
| parent | fcc3f654b60b7dd2180afe73e8809545725b41af (diff) | |
| parent | 809061e0024b74f89afdeff8ba83d6514589f417 (diff) | |
| download | qpid-python-cd6130384dc5f27ad494eabf8a2b15ca79280aa1.tar.gz | |
NO-JIRA: Merge changes from trunk.
Command was:
svn merge https://svn.apache.org/repos/asf/qpid/trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-bdb-ha2@1581428 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore')
21 files changed, 733 insertions, 149 deletions
diff --git a/qpid/java/bdbstore/pom.xml b/qpid/java/bdbstore/pom.xml index d7d12f0b33..5d34559a46 100644 --- a/qpid/java/bdbstore/pom.xml +++ b/qpid/java/bdbstore/pom.xml @@ -102,6 +102,20 @@ </includes> </resource> </resources> + + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> </build> </project> diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfiguredObjectRecord.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfiguredObjectRecord.java new file mode 100644 index 0000000000..f13e4dd08b --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfiguredObjectRecord.java @@ -0,0 +1,107 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import org.apache.qpid.server.store.ConfiguredObjectRecord; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +public class BDBConfiguredObjectRecord implements ConfiguredObjectRecord +{ + private final UUID _id; + private final String _type; + private final Map<String,Object> _attributes; + private Map<String, ConfiguredObjectRecord> _parents = new HashMap<String, ConfiguredObjectRecord>(); + + public BDBConfiguredObjectRecord(final UUID id, final String type, final Map<String, Object> attributes) + { + _id = id; + _type = type; + _attributes = Collections.unmodifiableMap(attributes); + } + + public UUID getId() + { + return _id; + } + + public String getType() + { + return _type; + } + + public Map<String, Object> getAttributes() + { + return _attributes; + } + + void addParent(String parentType, ConfiguredObjectRecord parent) + { + _parents.put(parentType, parent); + } + + @Override + public Map<String, ConfiguredObjectRecord> getParents() + { + return Collections.unmodifiableMap(_parents); + } + + @Override + public boolean equals(final Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + + final BDBConfiguredObjectRecord that = (BDBConfiguredObjectRecord) o; + + if (_attributes != null ? !_attributes.equals(that._attributes) : that._attributes != null) + { + return false; + } + if (_id != null ? !_id.equals(that._id) : that._id != null) + { + return false; + } + if (_type != null ? !_type.equals(that._type) : that._type != null) + { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + int result = _id != null ? _id.hashCode() : 0; + result = 31 * result + (_type != null ? _type.hashCode() : 0); + return result; + } +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java index 7e42d09ba6..492ec9d7bf 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java @@ -242,10 +242,10 @@ public class BDBHAVirtualHost extends AbstractVirtualHost DurableConfigurationRecoverer configRecoverer = new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(), new DefaultUpgraderProvider(BDBHAVirtualHost.this, getExchangeRegistry()), getEventLogger()); - _messageStore.recoverConfigurationStore(configRecoverer); + _messageStore.recoverConfigurationStore(getModel(), configRecoverer); VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(BDBHAVirtualHost.this); - _messageStore.recoverMessageStore(recoveryHandler, recoveryHandler); + _messageStore.recoverMessageStore(getModel(), recoveryHandler, recoveryHandler); } catch (Exception e) { diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index 2022f36bd9..ec6ae23367 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -25,6 +25,8 @@ import java.lang.ref.SoftReference; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -34,6 +36,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.log4j.Logger; import org.apache.qpid.server.message.EnqueueableMessage; +import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.ConfigurationRecoveryHandler; import org.apache.qpid.server.store.ConfiguredObjectRecord; @@ -54,11 +57,13 @@ import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogRecoveryHandler; import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler; import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.server.store.berkeleydb.entry.HierarchyKey; import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction; import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey; import org.apache.qpid.server.store.berkeleydb.entry.Xid; import org.apache.qpid.server.store.berkeleydb.tuple.ConfiguredObjectBinding; import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding; +import org.apache.qpid.server.store.berkeleydb.tuple.HierarchyKeyBinding; import org.apache.qpid.server.store.berkeleydb.tuple.MessageMetaDataBinding; import org.apache.qpid.server.store.berkeleydb.tuple.PreparedTransactionBinding; import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding; @@ -94,9 +99,11 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { private static final Logger LOGGER = Logger.getLogger(BDBMessageStore.class); - public static final int VERSION = 7; + public static final int VERSION = 8; private static final int LOCK_RETRY_ATTEMPTS = 5; private static String CONFIGURED_OBJECTS_DB_NAME = "CONFIGURED_OBJECTS"; + private static String CONFIGURED_OBJECT_HIERARCHY_DB_NAME = "CONFIGURED_OBJECT_HIERARCHY"; + private static String MESSAGE_META_DATA_DB_NAME = "MESSAGE_METADATA"; private static String MESSAGE_CONTENT_DB_NAME = "MESSAGE_CONTENT"; private static String DELIVERY_DB_NAME = "QUEUE_ENTRIES"; @@ -106,7 +113,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore private static String LINKDB_NAME = "LINKS"; private static String XID_DB_NAME = "XIDS"; private static String CONFIG_VERSION_DB_NAME = "CONFIG_VERSION"; - private static final String[] CONFIGURATION_STORE_DATABASE_NAMES = new String[] { CONFIGURED_OBJECTS_DB_NAME, CONFIG_VERSION_DB_NAME }; + private static final String[] CONFIGURATION_STORE_DATABASE_NAMES = new String[] { CONFIGURED_OBJECTS_DB_NAME, CONFIG_VERSION_DB_NAME , CONFIGURED_OBJECT_HIERARCHY_DB_NAME}; private static final String[] MESSAGE_STORE_DATABASE_NAMES = new String[] { MESSAGE_META_DATA_DB_NAME, MESSAGE_CONTENT_DB_NAME, DELIVERY_DB_NAME, BRIDGEDB_NAME, LINKDB_NAME, XID_DB_NAME }; private EnvironmentFacade _environmentFacade; @@ -163,7 +170,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } @Override - public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler) + public void recoverConfigurationStore(ConfiguredObject<?> parent, ConfigurationRecoveryHandler recoveryHandler) { _configurationStoreStateManager.attainState(State.ACTIVATING); @@ -172,7 +179,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore dbConfig.setAllowCreate(true); try { - new Upgrader(_environmentFacade.getEnvironment(), _virtualHostName).upgradeIfNecessary(); + new Upgrader(_environmentFacade.getEnvironment(), parent).upgradeIfNecessary(); _environmentFacade.openDatabases(dbConfig, CONFIGURATION_STORE_DATABASE_NAMES); } catch(DatabaseException e) @@ -216,7 +223,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } @Override - public synchronized void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler) throws StoreException + public synchronized void recoverMessageStore(ConfiguredObject<?> parent, MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler) throws StoreException { _messageStoreStateManager.attainState(State.ACTIVATING); DatabaseConfig dbConfig = new DatabaseConfig(); @@ -224,13 +231,13 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore dbConfig.setAllowCreate(true); try { - new Upgrader(_environmentFacade.getEnvironment(), _virtualHostName).upgradeIfNecessary(); + new Upgrader(_environmentFacade.getEnvironment(), parent).upgradeIfNecessary(); _environmentFacade.openDatabases(dbConfig, MESSAGE_STORE_DATABASE_NAMES); _totalStoreSize = getSizeOnDisk(); } catch(DatabaseException e) { - throw _environmentFacade.handleDatabaseException("Cannot activate message store", e); + throw _environmentFacade.handleDatabaseException("Cannot upgrade message store or open datatbases", e); } if(messageRecoveryHandler != null) @@ -352,10 +359,11 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore @SuppressWarnings("resource") private void updateConfigVersion(int newConfigVersion) throws StoreException { + Transaction txn = null; Cursor cursor = null; try { - Transaction txn = _environmentFacade.getEnvironment().beginTransaction(null, null); + txn = _environmentFacade.getEnvironment().beginTransaction(null, null); cursor = getConfigVersionDb().openCursor(txn, null); DatabaseEntry key = new DatabaseEntry(); ByteBinding.byteToEntry((byte) 0,key); @@ -373,10 +381,12 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore cursor.close(); cursor = null; txn.commit(); + txn = null; } finally { closeCursorSafely(cursor); + abortTransactionIgnoringException("Error setting config version", txn);; } } @@ -412,24 +422,57 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore private void loadConfiguredObjects(ConfigurationRecoveryHandler crh) throws DatabaseException, StoreException { - Cursor cursor = null; + Cursor objectsCursor = null; + Cursor hierarchyCursor = null; try { - cursor = getConfiguredObjectsDb().openCursor(null, null); + objectsCursor = getConfiguredObjectsDb().openCursor(null, null); DatabaseEntry key = new DatabaseEntry(); DatabaseEntry value = new DatabaseEntry(); - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + + Map<UUID, BDBConfiguredObjectRecord> configuredObjects = + new HashMap<UUID, BDBConfiguredObjectRecord>(); + + while (objectsCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) { UUID id = UUIDTupleBinding.getInstance().entryToObject(key); - ConfiguredObjectRecord configuredObject = new ConfiguredObjectBinding(id).entryToObject(value); - crh.configuredObject(configuredObject.getId(),configuredObject.getType(),configuredObject.getAttributes()); + BDBConfiguredObjectRecord configuredObject = + (BDBConfiguredObjectRecord) new ConfiguredObjectBinding(id).entryToObject(value); + configuredObjects.put(configuredObject.getId(), configuredObject); + } + + // set parents + hierarchyCursor = getConfiguredObjectHierarchyDb().openCursor(null, null); + while (hierarchyCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + HierarchyKey hk = HierarchyKeyBinding.getInstance().entryToObject(key); + UUID parentId = UUIDTupleBinding.getInstance().entryToObject(value); + BDBConfiguredObjectRecord child = configuredObjects.get(hk.getChildId()); + if(child != null) + { + ConfiguredObjectRecord parent = configuredObjects.get(parentId); + if(parent != null) + { + child.addParent(hk.getParentType(), parent); + } + else if(hk.getParentType().equals("Exchange")) + { + // TODO - remove this hack for the pre-defined exchanges + child.addParent(hk.getParentType(), new BDBConfiguredObjectRecord(parentId, "Exchange", Collections.<String,Object>emptyMap())); + } + } } + for (ConfiguredObjectRecord record : configuredObjects.values()) + { + crh.configuredObject(record); + } } finally { - closeCursorSafely(cursor); + closeCursorSafely(objectsCursor); + closeCursorSafely(hierarchyCursor); } } @@ -448,7 +491,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } } - private void recoverMessages(MessageStoreRecoveryHandler msrh) throws StoreException { StoredMessageRecoveryHandler mrh = msrh.begin(); @@ -568,9 +610,8 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore dtxrh.completeDtxRecordRecovery(); } - public void removeMessage(long messageId, boolean sync) throws StoreException + void removeMessage(long messageId, boolean sync) throws StoreException { - boolean complete = false; com.sleepycat.je.Transaction tx = null; @@ -715,116 +756,130 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } @Override - public void create(UUID id, String type, Map<String, Object> attributes) throws StoreException + public void create(ConfiguredObjectRecord configuredObject) throws StoreException { if (_configurationStoreStateManager.isInState(State.ACTIVE)) { - ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(id, type, attributes); - storeConfiguredObjectEntry(configuredObject); + com.sleepycat.je.Transaction txn = null; + try + { + txn = _environmentFacade.getEnvironment().beginTransaction(null, null); + storeConfiguredObjectEntry(txn, configuredObject); + txn.commit(); + txn = null; + } + catch (DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Error creating configured object " + configuredObject + + " in database: " + e.getMessage(), e); + } + finally + { + if (txn != null) + { + abortTransactionIgnoringException("Error creating configured object", txn); + } + } } } @Override - public void remove(UUID id, String type) throws StoreException + public UUID[] remove(final ConfiguredObjectRecord... objects) throws StoreException { - if (LOGGER.isDebugEnabled()) + com.sleepycat.je.Transaction txn = null; + try { - LOGGER.debug("public void remove(id = " + id + ", type="+type+"): called"); + txn = _environmentFacade.getEnvironment().beginTransaction(null, null); + + Collection<UUID> removed = new ArrayList<UUID>(objects.length); + for(ConfiguredObjectRecord record : objects) + { + if(removeConfiguredObject(txn, record) == OperationStatus.SUCCESS) + { + removed.add(record.getId()); + } + } + + txn.commit(); + txn = null; + return removed.toArray(new UUID[removed.size()]); } - OperationStatus status = removeConfiguredObject(null, id); - if (status == OperationStatus.NOTFOUND) + catch (DatabaseException e) { - throw new StoreException("Configured object of type " + type + " with id " + id + " not found"); + throw _environmentFacade.handleDatabaseException("Error deleting configured objects from database", e); } - } - - @Override - public UUID[] removeConfiguredObjects(final UUID... objects) throws StoreException - { - com.sleepycat.je.Transaction txn = _environmentFacade.getEnvironment().beginTransaction(null, null); - Collection<UUID> removed = new ArrayList<UUID>(objects.length); - for(UUID id : objects) + finally { - if(removeConfiguredObject(txn, id) == OperationStatus.SUCCESS) + if (txn != null) { - removed.add(id); + abortTransactionIgnoringException("Error deleting configured objects", txn); } } - commitTransaction(txn); - return removed.toArray(new UUID[removed.size()]); + } - private void commitTransaction(com.sleepycat.je.Transaction txn) throws StoreException + @Override + public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException { + com.sleepycat.je.Transaction txn = null; try { + txn = _environmentFacade.getEnvironment().beginTransaction(null, null); + for(ConfiguredObjectRecord record : records) + { + update(createIfNecessary, record, txn); + } txn.commit(); + txn = null; } - catch(DatabaseException e) + catch (DatabaseException e) { - throw _environmentFacade.handleDatabaseException("Cannot commit transaction on configured objects removal", e); + throw _environmentFacade.handleDatabaseException("Error updating configuration details within the store: " + e,e); } - } - - @Override - public void update(UUID id, String type, Map<String, Object> attributes) throws StoreException - { - update(false, id, type, attributes, null); - } - - @Override - public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException - { - com.sleepycat.je.Transaction txn = _environmentFacade.getEnvironment().beginTransaction(null, null); - for(ConfiguredObjectRecord record : records) + finally { - update(createIfNecessary, record.getId(), record.getType(), record.getAttributes(), txn); + if (txn != null) + { + abortTransactionIgnoringException("Error updating configuration details within the store", txn); + } } - commitTransaction(txn); + } - private void update(boolean createIfNecessary, UUID id, String type, Map<String, Object> attributes, com.sleepycat.je.Transaction txn) throws StoreException + private void update(boolean createIfNecessary, ConfiguredObjectRecord record, com.sleepycat.je.Transaction txn) throws StoreException { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Updating " + type + ", id: " + id); + LOGGER.debug("Updating " + record.getType() + ", id: " + record.getId()); } - try - { - DatabaseEntry key = new DatabaseEntry(); - UUIDTupleBinding keyBinding = UUIDTupleBinding.getInstance(); - keyBinding.objectToEntry(id, key); + DatabaseEntry key = new DatabaseEntry(); + UUIDTupleBinding keyBinding = UUIDTupleBinding.getInstance(); + keyBinding.objectToEntry(record.getId(), key); - DatabaseEntry value = new DatabaseEntry(); - DatabaseEntry newValue = new DatabaseEntry(); - ConfiguredObjectBinding configuredObjectBinding = ConfiguredObjectBinding.getInstance(); + DatabaseEntry value = new DatabaseEntry(); + DatabaseEntry newValue = new DatabaseEntry(); + ConfiguredObjectBinding configuredObjectBinding = ConfiguredObjectBinding.getInstance(); - OperationStatus status = getConfiguredObjectsDb().get(txn, key, value, LockMode.DEFAULT); - if (status == OperationStatus.SUCCESS || (createIfNecessary && status == OperationStatus.NOTFOUND)) + OperationStatus status = getConfiguredObjectsDb().get(txn, key, value, LockMode.DEFAULT); + final boolean isNewRecord = status == OperationStatus.NOTFOUND; + if (status == OperationStatus.SUCCESS || (createIfNecessary && isNewRecord)) + { + // write the updated entry to the store + configuredObjectBinding.objectToEntry(record, newValue); + status = getConfiguredObjectsDb().put(txn, key, newValue); + if (status != OperationStatus.SUCCESS) { - ConfiguredObjectRecord newQueueRecord = new ConfiguredObjectRecord(id, type, attributes); - - // write the updated entry to the store - configuredObjectBinding.objectToEntry(newQueueRecord, newValue); - status = getConfiguredObjectsDb().put(txn, key, newValue); - if (status != OperationStatus.SUCCESS) - { - throw new StoreException("Error updating configuration details within the store: " + status); - } + throw new StoreException("Error updating configuration details within the store: " + status); } - else if (status != OperationStatus.NOTFOUND) + if(isNewRecord) { - throw new StoreException("Error finding configuration details within the store: " + status); + writeHierarchyRecords(txn, record); } } - catch (DatabaseException e) + else if (status != OperationStatus.NOTFOUND) { - if (txn != null) - { - abortTransactionIgnoringException("Error updating configuration details within the store: " + e.getMessage(), txn); - } - throw _environmentFacade.handleDatabaseException("Error updating configuration details within the store: " + e,e); + throw new StoreException("Error finding configuration details within the store: " + status); } } @@ -1018,7 +1073,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore * * @throws StoreException If the operation fails for any reason. */ - public void abortTran(final com.sleepycat.je.Transaction tx) throws StoreException + private void abortTran(final com.sleepycat.je.Transaction tx) throws StoreException { if (LOGGER.isDebugEnabled()) { @@ -1091,7 +1146,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore * * @return A fresh message id. */ - public long getNewMessageId() + private long getNewMessageId() { return _messageId.incrementAndGet(); } @@ -1106,7 +1161,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore * * @throws StoreException If the operation fails for any reason, or if the specified message does not exist. */ - protected void addContent(final com.sleepycat.je.Transaction tx, long messageId, int offset, + private void addContent(final com.sleepycat.je.Transaction tx, long messageId, int offset, ByteBuffer contentBody) throws StoreException { DatabaseEntry key = new DatabaseEntry(); @@ -1183,7 +1238,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore * * @throws StoreException If the operation fails for any reason, or if the specified message does not exist. */ - public StorableMessageMetaData getMessageMetaData(long messageId) throws StoreException + StorableMessageMetaData getMessageMetaData(long messageId) throws StoreException { if (LOGGER.isDebugEnabled()) { @@ -1226,7 +1281,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore * * @throws StoreException If the operation fails for any reason, or if the specified message does not exist. */ - public int getContent(long messageId, int offset, ByteBuffer dst) throws StoreException + int getContent(long messageId, int offset, ByteBuffer dst) throws StoreException { DatabaseEntry contentKeyEntry = new DatabaseEntry(); LongBinding.longToEntry(messageId, contentKeyEntry); @@ -1291,20 +1346,17 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } } - /** - * Makes the specified configured object persistent. - * - * @param configuredObject Details of the configured object to store. - * @throws StoreException If the operation fails for any reason. - */ - private void storeConfiguredObjectEntry(ConfiguredObjectRecord configuredObject) throws StoreException + private void storeConfiguredObjectEntry(final Transaction txn, ConfiguredObjectRecord configuredObject) throws StoreException { if (_configurationStoreStateManager.isInState(State.ACTIVE)) { - LOGGER.debug("Storing configured object: " + configuredObject); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Storing configured object: " + configuredObject); + } DatabaseEntry key = new DatabaseEntry(); - UUIDTupleBinding keyBinding = UUIDTupleBinding.getInstance(); - keyBinding.objectToEntry(configuredObject.getId(), key); + UUIDTupleBinding uuidBinding = UUIDTupleBinding.getInstance(); + uuidBinding.objectToEntry(configuredObject.getId(), key); DatabaseEntry value = new DatabaseEntry(); ConfiguredObjectBinding queueBinding = ConfiguredObjectBinding.getInstance(); @@ -1312,12 +1364,13 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore queueBinding.objectToEntry(configuredObject, value); try { - OperationStatus status = getConfiguredObjectsDb().put(null, key, value); + OperationStatus status = getConfiguredObjectsDb().put(txn, key, value); if (status != OperationStatus.SUCCESS) { throw new StoreException("Error writing configured object " + configuredObject + " to database: " + status); } + writeHierarchyRecords(txn, configuredObject); } catch (DatabaseException e) { @@ -1326,26 +1379,54 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } } } - - private OperationStatus removeConfiguredObject(Transaction tx, UUID id) throws StoreException + + private void writeHierarchyRecords(final Transaction txn, final ConfiguredObjectRecord configuredObject) { + OperationStatus status; + HierarchyKeyBinding hierarchyBinding = HierarchyKeyBinding.getInstance(); + DatabaseEntry hierarchyKey = new DatabaseEntry(); + DatabaseEntry hierarchyValue = new DatabaseEntry(); - LOGGER.debug("Removing configured object: " + id); + for(Map.Entry<String, ConfiguredObjectRecord> parent : configuredObject.getParents().entrySet()) + { + + hierarchyBinding.objectToEntry(new HierarchyKey(configuredObject.getId(), parent.getKey()), hierarchyKey); + UUIDTupleBinding.getInstance().objectToEntry(parent.getValue().getId(), hierarchyValue); + status = getConfiguredObjectHierarchyDb().put(txn, hierarchyKey, hierarchyValue); + if (status != OperationStatus.SUCCESS) + { + throw new StoreException("Error writing configured object " + configuredObject + " parent record to database: " + + status); + } + } + } + + private OperationStatus removeConfiguredObject(Transaction tx, ConfiguredObjectRecord record) throws StoreException + { + UUID id = record.getId(); + Map<String, ConfiguredObjectRecord> parents = record.getParents(); + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Removing configured object: " + id); + } DatabaseEntry key = new DatabaseEntry(); UUIDTupleBinding uuidBinding = UUIDTupleBinding.getInstance(); uuidBinding.objectToEntry(id, key); - try - { - return getConfiguredObjectsDb().delete(tx, key); - } - catch (DatabaseException e) + OperationStatus status = getConfiguredObjectsDb().delete(tx, key); + if(status == OperationStatus.SUCCESS) { - throw _environmentFacade.handleDatabaseException("Error deleting of configured object with id " + id + " from database", e); + for(String parentType : parents.keySet()) + { + DatabaseEntry hierarchyKey = new DatabaseEntry(); + HierarchyKeyBinding keyBinding = HierarchyKeyBinding.getInstance(); + keyBinding.objectToEntry(new HierarchyKey(record.getId(), parentType), hierarchyKey); + getConfiguredObjectHierarchyDb().delete(tx, hierarchyKey); + } } + return status; } - - private class StoredBDBMessage implements StoredMessage<StorableMessageMetaData> { @@ -1687,14 +1768,19 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore return _type; } - private Database getMessageContentDb() + private Database getConfiguredObjectsDb() { - return _environmentFacade.getOpenDatabase(MESSAGE_CONTENT_DB_NAME); + return _environmentFacade.getOpenDatabase(CONFIGURED_OBJECTS_DB_NAME); } - private Database getConfiguredObjectsDb() + private Database getConfiguredObjectHierarchyDb() { - return _environmentFacade.getOpenDatabase(CONFIGURED_OBJECTS_DB_NAME); + return _environmentFacade.getOpenDatabase(CONFIGURED_OBJECT_HIERARCHY_DB_NAME); + } + + private Database getMessageContentDb() + { + return _environmentFacade.getOpenDatabase(MESSAGE_CONTENT_DB_NAME); } private Database getConfigVersionDb() diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/HierarchyKey.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/HierarchyKey.java new file mode 100644 index 0000000000..d1c341447e --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/HierarchyKey.java @@ -0,0 +1,79 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.entry; + +import java.util.UUID; + +public class HierarchyKey +{ + private final UUID _childId; + private final String _parentType; + + public HierarchyKey(final UUID childId, final String parentType) + { + _childId = childId; + _parentType = parentType; + } + + public UUID getChildId() + { + return _childId; + } + + public String getParentType() + { + return _parentType; + } + + @Override + public boolean equals(final Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + + final HierarchyKey that = (HierarchyKey) o; + + if (!_childId.equals(that._childId)) + { + return false; + } + if (!_parentType.equals(that._parentType)) + { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + int result = _childId.hashCode(); + result = 31 * result + _parentType.hashCode(); + return result; + } +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java index bc3beeb78b..38a2215fe7 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java @@ -32,6 +32,7 @@ import com.sleepycat.bind.tuple.TupleBinding; import com.sleepycat.bind.tuple.TupleInput; import com.sleepycat.bind.tuple.TupleOutput; import org.apache.qpid.server.store.StoreException; +import org.apache.qpid.server.store.berkeleydb.BDBConfiguredObjectRecord; import org.codehaus.jackson.JsonGenerationException; import org.codehaus.jackson.JsonGenerator; import org.codehaus.jackson.JsonProcessingException; @@ -80,7 +81,7 @@ public class ConfiguredObjectBinding extends TupleBinding<ConfiguredObjectRecord _uuid = uuid; } - public ConfiguredObjectRecord entryToObject(TupleInput tupleInput) + public BDBConfiguredObjectRecord entryToObject(TupleInput tupleInput) { String type = tupleInput.readString(); String json = tupleInput.readString(); @@ -88,7 +89,7 @@ public class ConfiguredObjectBinding extends TupleBinding<ConfiguredObjectRecord try { Map<String,Object> value = mapper.readValue(json, Map.class); - ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(_uuid, type, value); + BDBConfiguredObjectRecord configuredObject = new BDBConfiguredObjectRecord(_uuid, type, value); return configuredObject; } catch (IOException e) diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/HierarchyKeyBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/HierarchyKeyBinding.java new file mode 100644 index 0000000000..13adaabfc8 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/HierarchyKeyBinding.java @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.tuple; + +import com.sleepycat.bind.tuple.TupleBinding; +import com.sleepycat.bind.tuple.TupleInput; +import com.sleepycat.bind.tuple.TupleOutput; +import org.apache.qpid.server.store.berkeleydb.entry.HierarchyKey; +import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey; + +import java.util.UUID; + +public class HierarchyKeyBinding extends TupleBinding<HierarchyKey> +{ + + private static final HierarchyKeyBinding INSTANCE = new HierarchyKeyBinding(); + + public static HierarchyKeyBinding getInstance() + { + return INSTANCE; + } + + /** private constructor forces getInstance instead */ + private HierarchyKeyBinding() { } + + public HierarchyKey entryToObject(TupleInput tupleInput) + { + UUID childId = new UUID(tupleInput.readLong(), tupleInput.readLong()); + String parentType = tupleInput.readString(); + + return new HierarchyKey(childId, parentType); + } + + public void objectToEntry(HierarchyKey hk, TupleOutput tupleOutput) + { + UUID uuid = hk.getChildId(); + tupleOutput.writeLong(uuid.getMostSignificantBits()); + tupleOutput.writeLong(uuid.getLeastSignificantBits()); + tupleOutput.writeString(hk.getParentType()); + } +}
\ No newline at end of file diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java index adcaef35ef..0ff90a6d77 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java @@ -22,7 +22,9 @@ package org.apache.qpid.server.store.berkeleydb.upgrade; import com.sleepycat.je.Environment; +import org.apache.qpid.server.model.ConfiguredObject; + public interface StoreUpgrade { - void performUpgrade(Environment environment, UpgradeInteractionHandler handler, String virtualHostName); + void performUpgrade(Environment environment, UpgradeInteractionHandler handler, ConfiguredObject<?> parent); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java index 87f8afde4a..3588b96e88 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java @@ -39,6 +39,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.protocol.v0_8.MessageMetaData; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.store.StorableMessageMetaData; @@ -74,7 +75,7 @@ public class UpgradeFrom4To5 extends AbstractStoreUpgrade private static final Logger _logger = Logger.getLogger(UpgradeFrom4To5.class); - public void performUpgrade(final Environment environment, final UpgradeInteractionHandler handler, String virtualHostName) + public void performUpgrade(final Environment environment, final UpgradeInteractionHandler handler, ConfiguredObject<?> parent) { Transaction transaction = null; reportStarting(environment, 4); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java index 46f2afd741..366b6a1c97 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java @@ -40,6 +40,7 @@ import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.model.Binding; +import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; @@ -118,11 +119,11 @@ public class UpgradeFrom5To6 extends AbstractStoreUpgrade * Queue, Exchange, Bindings entries are stored now as configurable objects * in "CONFIGURED_OBJECTS" table. */ - public void performUpgrade(final Environment environment, final UpgradeInteractionHandler handler, String virtualHostName) + public void performUpgrade(final Environment environment, final UpgradeInteractionHandler handler, ConfiguredObject<?> parent) { reportStarting(environment, 5); upgradeMessages(environment, handler); - upgradeConfiguredObjectsAndDependencies(environment, handler, virtualHostName); + upgradeConfiguredObjectsAndDependencies(environment, handler, parent.getName()); renameDatabases(environment, null); reportFinished(environment, 6); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom6To7.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom6To7.java index ce00fd1a48..9dcd291b9d 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom6To7.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom6To7.java @@ -27,6 +27,8 @@ import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.DatabaseEntry; import com.sleepycat.je.Environment; import com.sleepycat.je.OperationStatus; + +import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.store.StoreException; public class UpgradeFrom6To7 extends AbstractStoreUpgrade @@ -35,7 +37,7 @@ public class UpgradeFrom6To7 extends AbstractStoreUpgrade private static final int DEFAULT_CONFIG_VERSION = 0; @Override - public void performUpgrade(Environment environment, UpgradeInteractionHandler handler, String virtualHostName) + public void performUpgrade(Environment environment, UpgradeInteractionHandler handler, ConfiguredObject<?> parent) { reportStarting(environment, 6); DatabaseConfig dbConfig = new DatabaseConfig(); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java new file mode 100644 index 0000000000..413acc90c4 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom7To8.java @@ -0,0 +1,160 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.upgrade; + +import com.sleepycat.bind.tuple.ByteBinding; +import com.sleepycat.bind.tuple.IntegerBinding; +import com.sleepycat.bind.tuple.TupleBinding; +import com.sleepycat.bind.tuple.TupleInput; +import com.sleepycat.bind.tuple.TupleOutput; +import com.sleepycat.je.*; + +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.store.ConfiguredObjectRecord; +import org.apache.qpid.server.store.StoreException; +import org.apache.qpid.server.store.berkeleydb.BDBConfiguredObjectRecord; +import org.apache.qpid.server.store.berkeleydb.entry.HierarchyKey; +import org.apache.qpid.server.store.berkeleydb.tuple.ConfiguredObjectBinding; +import org.apache.qpid.server.store.berkeleydb.tuple.HierarchyKeyBinding; +import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding; +import org.codehaus.jackson.map.ObjectMapper; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +public class UpgradeFrom7To8 extends AbstractStoreUpgrade +{ + + @Override + public void performUpgrade(Environment environment, UpgradeInteractionHandler handler, ConfiguredObject<?> parent) + { + reportStarting(environment, 7); + + DatabaseConfig dbConfig = new DatabaseConfig(); + dbConfig.setTransactional(true); + dbConfig.setAllowCreate(true); + + Database hierarchyDb = environment.openDatabase(null, "CONFIGURED_OBJECT_HIERARCHY", dbConfig); + Database configuredObjectsDb = environment.openDatabase(null, "CONFIGURED_OBJECTS", dbConfig); + + Cursor objectsCursor = null; + + Transaction txn = environment.beginTransaction(null, null); + + try + { + objectsCursor = configuredObjectsDb.openCursor(txn, null); + DatabaseEntry key = new DatabaseEntry(); + DatabaseEntry value = new DatabaseEntry(); + + Map<UUID, BDBConfiguredObjectRecord> configuredObjects = + new HashMap<UUID, BDBConfiguredObjectRecord>(); + + while (objectsCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + UUID id = UUIDTupleBinding.getInstance().entryToObject(key); + TupleInput input = TupleBinding.entryToInput(value); + String type = input.readString(); + + if(!type.endsWith("Binding")) + { + UUIDTupleBinding.getInstance().objectToEntry(parent.getId(),value); + TupleOutput tupleOutput = new TupleOutput(); + tupleOutput.writeLong(id.getMostSignificantBits()); + tupleOutput.writeLong(id.getLeastSignificantBits()); + tupleOutput.writeString("VirtualHost"); + TupleBinding.outputToEntry(tupleOutput, key); + hierarchyDb.put(txn, key, value); + } + else + { + String json = input.readString(); + ObjectMapper mapper = new ObjectMapper(); + try + { + DatabaseEntry hierarchyKey = new DatabaseEntry(); + DatabaseEntry hierarchyValue = new DatabaseEntry(); + + Map<String,Object> attributes = mapper.readValue(json, Map.class); + Object queueIdString = attributes.remove("queue"); + if(queueIdString instanceof String) + { + UUID queueId = UUID.fromString(queueIdString.toString()); + UUIDTupleBinding.getInstance().objectToEntry(queueId,hierarchyValue); + TupleOutput tupleOutput = new TupleOutput(); + tupleOutput.writeLong(id.getMostSignificantBits()); + tupleOutput.writeLong(id.getLeastSignificantBits()); + tupleOutput.writeString("Queue"); + TupleBinding.outputToEntry(tupleOutput, hierarchyKey); + hierarchyDb.put(txn, hierarchyKey, hierarchyValue); + } + Object exchangeIdString = attributes.remove("exchange"); + if(exchangeIdString instanceof String) + { + UUID exchangeId = UUID.fromString(exchangeIdString.toString()); + UUIDTupleBinding.getInstance().objectToEntry(exchangeId,hierarchyValue); + TupleOutput tupleOutput = new TupleOutput(); + tupleOutput.writeLong(id.getMostSignificantBits()); + tupleOutput.writeLong(id.getLeastSignificantBits()); + tupleOutput.writeString("Exchange"); + TupleBinding.outputToEntry(tupleOutput, hierarchyKey); + hierarchyDb.put(txn, hierarchyKey, hierarchyValue); + } + TupleOutput tupleOutput = new TupleOutput(); + tupleOutput.writeString(type); + StringWriter writer = new StringWriter(); + mapper.writeValue(writer,attributes); + tupleOutput.writeString(writer.getBuffer().toString()); + TupleBinding.outputToEntry(tupleOutput, value); + objectsCursor.putCurrent(value); + } + catch (IOException e) + { + throw new StoreException(e); + } + + } + + + } + + + } + finally + { + if(objectsCursor != null) + { + objectsCursor.close(); + } + } + txn.commit(); + + hierarchyDb.close(); + configuredObjectsDb.close(); + + + + reportFinished(environment, 8); + } +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java index 7852e2d703..e80d60609f 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java @@ -26,6 +26,8 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import org.apache.log4j.Logger; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.store.berkeleydb.BDBMessageStore; @@ -45,12 +47,12 @@ public class Upgrader static final String VERSION_DB_NAME = "DB_VERSION"; private Environment _environment; - private String _virtualHostName; + private ConfiguredObject<?> _parent; - public Upgrader(Environment environment, String virtualHostName) + public Upgrader(Environment environment, ConfiguredObject<?> parent) { _environment = environment; - _virtualHostName = virtualHostName; + _parent = parent; } public void upgradeIfNecessary() @@ -158,7 +160,7 @@ public class Upgrader + "UpgradeFrom"+fromVersion+"To"+toVersion); Constructor<StoreUpgrade> ctr = upgradeClass.getConstructor(); StoreUpgrade upgrade = ctr.newInstance(); - upgrade.performUpgrade(_environment, UpgradeInteractionHandler.DEFAULT_HANDLER, _virtualHostName); + upgrade.performUpgrade(_environment, UpgradeInteractionHandler.DEFAULT_HANDLER, _parent); } catch (ClassNotFoundException e) { diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java index 5a5d39081c..965cad1cb5 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java @@ -29,6 +29,7 @@ import org.apache.qpid.server.store.ConfiguredObjectRecord; import com.sleepycat.bind.tuple.TupleInput; import com.sleepycat.bind.tuple.TupleOutput; +import org.apache.qpid.server.store.ConfiguredObjectRecordImpl; public class ConfiguredObjectBindingTest extends TestCase { @@ -46,7 +47,7 @@ public class ConfiguredObjectBindingTest extends TestCase { super.setUp(); _configuredObjectBinding = ConfiguredObjectBinding.getInstance(); - _object = new ConfiguredObjectRecord(UUIDGenerator.generateRandomUUID(), DUMMY_TYPE_STRING, + _object = new ConfiguredObjectRecordImpl(UUIDGenerator.generateRandomUUID(), DUMMY_TYPE_STRING, DUMMY_ATTRIBUTES_MAP); } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java index b2b28b3c2d..ce143aba1b 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java @@ -25,12 +25,16 @@ import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPrepare import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.PRIORITY_QUEUE_NAME; import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_NAME; import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_WITH_DLQ_NAME; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.File; import java.io.InputStream; +import java.util.UUID; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.subjects.TestBlankSubject; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.util.FileUtils; @@ -167,8 +171,11 @@ public abstract class AbstractUpgradeTestCase extends QpidTestCase return count.longValue(); } - public String getVirtualHostName() + public VirtualHost getVirtualHost() { - return getName(); + VirtualHost virtualHost = mock(VirtualHost.class); + when(virtualHost.getName()).thenReturn(getName()); + when(virtualHost.getId()).thenReturn(UUID.randomUUID()); + return virtualHost; } } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java index 500fb0a919..d0f9455d9a 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java @@ -73,7 +73,7 @@ public class UpgradeFrom4to5Test extends AbstractUpgradeTestCase public void testPerformUpgradeWithHandlerAnsweringYes() throws Exception { UpgradeFrom4To5 upgrade = new UpgradeFrom4To5(); - upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.YES), getVirtualHostName()); + upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.YES), getVirtualHost()); assertQueues(new HashSet<String>(Arrays.asList(QUEUE_NAMES))); @@ -103,7 +103,7 @@ public class UpgradeFrom4to5Test extends AbstractUpgradeTestCase public void testPerformUpgradeWithHandlerAnsweringNo() throws Exception { UpgradeFrom4To5 upgrade = new UpgradeFrom4To5(); - upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.NO), getVirtualHostName()); + upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.NO), getVirtualHost()); HashSet<String> queues = new HashSet<String>(Arrays.asList(QUEUE_NAMES)); assertTrue(NON_DURABLE_QUEUE_NAME + " should be in the list of queues" , queues.remove(NON_DURABLE_QUEUE_NAME)); diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java index 701fd94115..0460b1ce4c 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java @@ -87,7 +87,7 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase public void testPerformUpgrade() throws Exception { UpgradeFrom5To6 upgrade = new UpgradeFrom5To6(); - upgrade.performUpgrade(_environment, UpgradeInteractionHandler.DEFAULT_HANDLER, getVirtualHostName()); + upgrade.performUpgrade(_environment, UpgradeInteractionHandler.DEFAULT_HANDLER, getVirtualHost()); assertDatabaseRecordCounts(); assertContent(); @@ -101,7 +101,7 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase corruptDatabase(); UpgradeFrom5To6 upgrade = new UpgradeFrom5To6(); - upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.YES), getVirtualHostName()); + upgrade.performUpgrade(_environment, new StaticAnswerHandler(UpgradeInteractionResponse.YES), getVirtualHost()); assertDatabaseRecordCounts(); @@ -117,7 +117,7 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase UpgradeInteractionHandler discardMessageInteractionHandler = new StaticAnswerHandler(UpgradeInteractionResponse.NO); - upgrade.performUpgrade(_environment, discardMessageInteractionHandler, getVirtualHostName()); + upgrade.performUpgrade(_environment, discardMessageInteractionHandler, getVirtualHost()); assertDatabaseRecordCount(NEW_METADATA_DB_NAME, 12); assertDatabaseRecordCount(NEW_CONTENT_DB_NAME, 12); @@ -135,7 +135,7 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase { populateOldXidEntries(environment); UpgradeFrom5To6 upgrade = new UpgradeFrom5To6(); - upgrade.performUpgrade(environment, UpgradeInteractionHandler.DEFAULT_HANDLER, getVirtualHostName()); + upgrade.performUpgrade(environment, UpgradeInteractionHandler.DEFAULT_HANDLER, getVirtualHost()); assertXidEntries(environment); } finally @@ -171,11 +171,11 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase NewRecordImpl[] newDequeues = newTransaction.getDequeues(); assertEquals("Unxpected new enqueus number", 1, newEnqueues.length); NewRecordImpl enqueue = newEnqueues[0]; - assertEquals("Unxpected queue id", UUIDGenerator.generateQueueUUID("TEST1", getVirtualHostName()), enqueue.getId()); + assertEquals("Unxpected queue id", UUIDGenerator.generateQueueUUID("TEST1", getVirtualHost().getName()), enqueue.getId()); assertEquals("Unxpected message id", 1, enqueue.getMessageNumber()); assertEquals("Unxpected new dequeues number", 1, newDequeues.length); NewRecordImpl dequeue = newDequeues[0]; - assertEquals("Unxpected queue id", UUIDGenerator.generateQueueUUID("TEST2", getVirtualHostName()), dequeue.getId()); + assertEquals("Unxpected queue id", UUIDGenerator.generateQueueUUID("TEST2", getVirtualHost().getName()), dequeue.getId()); assertEquals("Unxpected message id", 2, dequeue.getMessageNumber()); } @@ -347,13 +347,13 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase { String exchangeName = (String) deserialized.get(Exchange.NAME); assertNotNull(exchangeName); - assertEquals("Unexpected key", key, UUIDGenerator.generateExchangeUUID(exchangeName, getVirtualHostName())); + assertEquals("Unexpected key", key, UUIDGenerator.generateExchangeUUID(exchangeName, getVirtualHost().getName())); } else if (type.equals(Queue.class.getName())) { String queueName = (String) deserialized.get(Queue.NAME); assertNotNull(queueName); - assertEquals("Unexpected key", key, UUIDGenerator.generateQueueUUID(queueName, getVirtualHostName())); + assertEquals("Unexpected key", key, UUIDGenerator.generateQueueUUID(queueName, getVirtualHost().getName())); } else if (type.equals(Binding.class.getName())) { @@ -368,15 +368,15 @@ public class UpgradeFrom5To6Test extends AbstractUpgradeTestCase private Map<String, Object> createExpectedQueueBindingMapAndID(String queue, String bindingName, String exchangeName, Map<String, String> argumentMap, List<UUID> expectedBindingIDs) { Map<String, Object> expectedQueueBinding = new HashMap<String, Object>(); - expectedQueueBinding.put(Binding.QUEUE, UUIDGenerator.generateQueueUUID(queue, getVirtualHostName()).toString()); + expectedQueueBinding.put(Binding.QUEUE, UUIDGenerator.generateQueueUUID(queue, getVirtualHost().getName()).toString()); expectedQueueBinding.put(Binding.NAME, bindingName); - expectedQueueBinding.put(Binding.EXCHANGE, UUIDGenerator.generateExchangeUUID(exchangeName, getVirtualHostName()).toString()); + expectedQueueBinding.put(Binding.EXCHANGE, UUIDGenerator.generateExchangeUUID(exchangeName, getVirtualHost().getName()).toString()); if (argumentMap != null) { expectedQueueBinding.put(Binding.ARGUMENTS, argumentMap); } - expectedBindingIDs.add(UUIDGenerator.generateBindingUUID(exchangeName, queue, bindingName, getVirtualHostName())); + expectedBindingIDs.add(UUIDGenerator.generateBindingUUID(exchangeName, queue, bindingName, getVirtualHost().getName())); return expectedQueueBinding; } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java index 810f4a1fca..c407be50c6 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderFailOnNewerVersionTest.java @@ -43,7 +43,7 @@ public class UpgraderFailOnNewerVersionTest extends AbstractUpgradeTestCase public void setUp() throws Exception { super.setUp(); - _upgrader = new Upgrader(_environment, getVirtualHostName()); + _upgrader = new Upgrader(_environment, getVirtualHost()); } private int getStoreVersion() diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java index 3465f3582f..4b9a8d19a8 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java @@ -51,7 +51,7 @@ public class UpgraderTest extends AbstractUpgradeTestCase public void setUp() throws Exception { super.setUp(); - _upgrader = new Upgrader(_environment, getVirtualHostName()); + _upgrader = new Upgrader(_environment, getVirtualHost()); } private int getStoreVersion(Environment environment) @@ -108,7 +108,7 @@ public class UpgraderTest extends AbstractUpgradeTestCase Environment emptyEnvironment = createEnvironment(nonExistentStoreLocation); try { - _upgrader = new Upgrader(emptyEnvironment, getVirtualHostName()); + _upgrader = new Upgrader(emptyEnvironment, getVirtualHost()); _upgrader.upgradeIfNecessary(); List<String> databaseNames = emptyEnvironment.getDatabaseNames(); diff --git a/qpid/java/bdbstore/systests/pom.xml b/qpid/java/bdbstore/systests/pom.xml index e8620d3426..fe718f9dac 100644 --- a/qpid/java/bdbstore/systests/pom.xml +++ b/qpid/java/bdbstore/systests/pom.xml @@ -33,6 +33,7 @@ <test.log4j.configuration.file>${project.basedir}${file.separator}..${file.separator}..${file.separator}test-profiles${file.separator}log4j-test.xml</test.log4j.configuration.file> <test.working.directory>${basedir}/../..</test.working.directory> <test.resource.directory>${basedir}/../..</test.resource.directory> + <test.systest.resource.directory>${basedir}/../../systests</test.systest.resource.directory> </properties> <dependencies> @@ -65,6 +66,66 @@ <groupId>com.sleepycat</groupId> <artifactId>je</artifactId> </dependency> + + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-bdbstore</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-resources-plugin</artifactId> + <!--version specified in parent pluginManagement --> + <executions> + <!-- copy the bdbstore bin contents to where the tests expect them --> + <execution> + <id>copy-bdbstore-bin-resources</id> + <phase>generate-resources</phase> + <goals> + <goal>copy-resources</goal> + </goals> + <configuration> + <outputDirectory>${qpid.home}</outputDirectory> + <resources> + <resource> + <directory>${basedir}/..</directory> + <includes> + <include>bin/</include> + </includes> + </resource> + </resources> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + <executions> + <!-- fix the fact that the maven-resources-plugin copy-resources doesn't maintain file permissions in unix --> + <execution> + <id>fix-bdb-script-permissions</id> + <phase>package</phase> + <configuration> + <target> + <chmod perm="755"> + <fileset dir="${qpid.home}"> + <include name="bin/**"/> + </fileset> + </chmod> + </target> + </configuration> + <goals><goal>run</goal></goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + </project> diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java index c8fcfe0826..ba84d0682a 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java @@ -236,6 +236,7 @@ public class BDBMessageStoreTest extends MessageStoreTest { messageStore.closeMessageStore(); + BDBMessageStore newStore = new BDBMessageStore(); MessageStoreRecoveryHandler recoveryHandler = mock(MessageStoreRecoveryHandler.class); @@ -243,7 +244,7 @@ public class BDBMessageStoreTest extends MessageStoreTest VirtualHost<?> virtualHost = getVirtualHostModel(); newStore.openMessageStore(virtualHost.getName(), virtualHost.getMessageStoreSettings()); - newStore.recoverMessageStore(recoveryHandler, null); + newStore.recoverMessageStore(getVirtualHostModel(), recoveryHandler, null); return newStore; } |
