From db86d03af2ce0f704398c2a9392e91e9637154ec Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Fri, 4 Apr 2014 10:21:37 +0000 Subject: QPID-5653: Replace DurableConfigurationStore/MessageStore recoverers with visitors. * MS/DCS impls now have stateless visitXXX methods to retrieve message/configuration data (replaces the recoverXXXX methods) * VH implementations now uses Handlers to perform the recovery operation. * DCS's handler (ConfiguredObjectRecordRecoveverAndUpgrader) currently implemented in terms of the old DefaultUpgradeProvider/DurableConfigurationRecoverer. This will be refactored by a future commit. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1584600 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/store/berkeleydb/BDBHAVirtualHost.java | 17 +- .../server/store/berkeleydb/BDBMessageStore.java | 541 ++++++++--------- .../qpid/server/store/berkeleydb/entry/Xid.java | 52 -- .../server/store/berkeleydb/tuple/XidBinding.java | 2 +- .../store/berkeleydb/BDBMessageStoreTest.java | 436 ++++++++++++++ .../berkeleydb/upgrade/UpgradeFrom5To6Test.java | 2 +- .../store/berkeleydb/BDBMessageStoreTest.java | 666 --------------------- 7 files changed, 690 insertions(+), 1026 deletions(-) delete mode 100644 qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/Xid.java create mode 100644 qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java delete mode 100644 qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java (limited to 'qpid/java/bdbstore') diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java index aae0a56a40..a58bc274a9 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java @@ -29,15 +29,15 @@ import org.apache.qpid.server.logging.messages.MessageStoreMessages; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.stats.StatisticsGatherer; -import org.apache.qpid.server.store.DurableConfigurationRecoverer; +import org.apache.qpid.server.store.ConfiguredObjectRecordRecoveverAndUpgrader; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory; +import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; import org.apache.qpid.server.virtualhost.AbstractVirtualHost; -import org.apache.qpid.server.virtualhost.DefaultUpgraderProvider; +import org.apache.qpid.server.virtualhost.MessageStoreRecoverer; import org.apache.qpid.server.virtualhost.State; -import org.apache.qpid.server.virtualhost.VirtualHostConfigRecoveryHandler; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import com.sleepycat.je.rep.StateChangeEvent; @@ -98,17 +98,12 @@ public class BDBHAVirtualHost extends AbstractVirtualHost { _messageStore.getEnvironmentFacade().getEnvironment().flushLog(true); - DefaultUpgraderProvider upgraderProvider = new DefaultUpgraderProvider(this); - - DurableConfigurationRecoverer configRecoverer = - new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(), - upgraderProvider, getEventLogger()); - _messageStore.recoverConfigurationStore(configRecoverer); + ConfiguredObjectRecordHandler upgraderRecoverer = new ConfiguredObjectRecordRecoveverAndUpgrader(this, getDurableConfigurationRecoverers()); + _messageStore.visitConfiguredObjectRecords(upgraderRecoverer); initialiseModel(); - VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(BDBHAVirtualHost.this, getMessageStoreLogSubject()); - _messageStore.recoverMessageStore(recoveryHandler, recoveryHandler); + new MessageStoreRecoverer(this, getMessageStoreLogSubject()).recover(); attainActivation(); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index 8aac9a6247..652e4c135d 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -27,8 +27,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; import java.util.Map; import java.util.Random; import java.util.UUID; @@ -38,29 +36,23 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.log4j.Logger; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.store.EventManager; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreRecoveryHandler; -import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMemoryMessage; import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.store.TransactionLogRecoveryHandler; -import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler; import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.server.store.Xid; import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory.EnvironmentFacadeTask; import org.apache.qpid.server.store.berkeleydb.entry.HierarchyKey; import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction; import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey; -import org.apache.qpid.server.store.berkeleydb.entry.Xid; import org.apache.qpid.server.store.berkeleydb.tuple.ConfiguredObjectBinding; import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding; import org.apache.qpid.server.store.berkeleydb.tuple.HierarchyKeyBinding; @@ -70,6 +62,10 @@ import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding; import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding; import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding; import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader; +import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; +import org.apache.qpid.server.store.handler.DistributedTransactionHandler; +import org.apache.qpid.server.store.handler.MessageHandler; +import org.apache.qpid.server.store.handler.MessageInstanceHandler; import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.util.FileUtils; @@ -129,7 +125,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore private long _persistentSizeHighThreshold; private final EventManager _eventManager = new EventManager(); - private final String _type; private final EnvironmentFacadeFactory _environmentFacadeFactory; @@ -143,7 +138,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore public BDBMessageStore(EnvironmentFacadeFactory environmentFacadeFactory) { - _type = environmentFacadeFactory.getType(); _environmentFacadeFactory = environmentFacadeFactory; } @@ -160,18 +154,19 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { if (_environmentFacade == null) { - String[] databaseNames = null; + EnvironmentFacadeTask[] initialisationTasks = null; if (MapValueConverter.getBooleanAttribute(IS_MESSAGE_STORE_TOO, storeSettings, false)) { - databaseNames = new String[CONFIGURATION_STORE_DATABASE_NAMES.length + MESSAGE_STORE_DATABASE_NAMES.length]; + String[] databaseNames = new String[CONFIGURATION_STORE_DATABASE_NAMES.length + MESSAGE_STORE_DATABASE_NAMES.length]; System.arraycopy(CONFIGURATION_STORE_DATABASE_NAMES, 0, databaseNames, 0, CONFIGURATION_STORE_DATABASE_NAMES.length); System.arraycopy(MESSAGE_STORE_DATABASE_NAMES, 0, databaseNames, CONFIGURATION_STORE_DATABASE_NAMES.length, MESSAGE_STORE_DATABASE_NAMES.length); + initialisationTasks = new EnvironmentFacadeTask[]{new UpgradeTask(parent), new OpenDatabasesTask(databaseNames), new DiskSpaceTask(), new MaxMessageIdTask() }; } else { - databaseNames = CONFIGURATION_STORE_DATABASE_NAMES; + initialisationTasks = new EnvironmentFacadeTask[]{new UpgradeTask(parent), new OpenDatabasesTask(CONFIGURATION_STORE_DATABASE_NAMES)}; } - _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(storeSettings, new UpgradeTask(parent), new OpenDatabasesTask(databaseNames)); + _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(storeSettings, initialisationTasks); } else { @@ -181,12 +176,88 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } @Override - public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler) + public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) { checkConfigurationStoreOpen(); + try + { + int configVersion = getConfigVersion(); + + handler.begin(configVersion); + doVisitAllConfiguredObjectRecords(handler); + + int newConfigVersion = handler.end(); + if(newConfigVersion != configVersion) + { + updateConfigVersion(newConfigVersion); + } + } + catch (DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Cannot visit configured object records", e); + } + + } + + private void doVisitAllConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) + { + Map configuredObjects = new HashMap(); + Cursor objectsCursor = null; + Cursor hierarchyCursor = null; + try + { + objectsCursor = getConfiguredObjectsDb().openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + DatabaseEntry value = new DatabaseEntry(); + + + while (objectsCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + UUID id = UUIDTupleBinding.getInstance().entryToObject(key); + + BDBConfiguredObjectRecord configuredObject = + (BDBConfiguredObjectRecord) new ConfiguredObjectBinding(id).entryToObject(value); + configuredObjects.put(configuredObject.getId(), configuredObject); + } + + // set parents + hierarchyCursor = getConfiguredObjectHierarchyDb().openCursor(null, null); + while (hierarchyCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + HierarchyKey hk = HierarchyKeyBinding.getInstance().entryToObject(key); + UUID parentId = UUIDTupleBinding.getInstance().entryToObject(value); + BDBConfiguredObjectRecord child = configuredObjects.get(hk.getChildId()); + if(child != null) + { + ConfiguredObjectRecord parent = configuredObjects.get(parentId); + if(parent != null) + { + child.addParent(hk.getParentType(), parent); + } + else if(hk.getParentType().equals("Exchange")) + { + // TODO - remove this hack for the pre-defined exchanges + child.addParent(hk.getParentType(), new BDBConfiguredObjectRecord(parentId, "Exchange", Collections.emptyMap())); + } + } + } + } + finally + { + closeCursorSafely(objectsCursor); + closeCursorSafely(hierarchyCursor); + } + + for (ConfiguredObjectRecord record : configuredObjects.values()) + { + boolean shoudlContinue = handler.handle(record); + if (!shoudlContinue) + { + break; + } + } - recoverConfig(recoveryHandler); } @Override @@ -210,7 +281,8 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore if (_environmentFacade == null) { - _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(messageStoreSettings, new UpgradeTask(parent), new OpenDatabasesTask(MESSAGE_STORE_DATABASE_NAMES), new DiskSpaceTask()); + _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(messageStoreSettings, + new UpgradeTask(parent), new OpenDatabasesTask(MESSAGE_STORE_DATABASE_NAMES), new DiskSpaceTask(), new MaxMessageIdTask()); } _committer = _environmentFacade.createCommitter(parent.getName()); @@ -218,21 +290,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } } - @Override - public synchronized void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler) throws StoreException - { - checkMessageStoreOpen(); - - if(messageRecoveryHandler != null) - { - recoverMessages(messageRecoveryHandler); - } - if(transactionLogRecoveryHandler != null) - { - recoverQueueEntries(transactionLogRecoveryHandler); - } - } - @Override public org.apache.qpid.server.store.Transaction newTransaction() throws StoreException { @@ -315,27 +372,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } } - private void recoverConfig(ConfigurationRecoveryHandler recoveryHandler) throws StoreException - { - try - { - final int configVersion = getConfigVersion(); - recoveryHandler.beginConfigurationRecovery(this, configVersion); - loadConfiguredObjects(recoveryHandler); - - final int newConfigVersion = recoveryHandler.completeConfigurationRecovery(); - if(newConfigVersion != configVersion) - { - updateConfigVersion(newConfigVersion); - } - } - catch (DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Error recovering persistent state: " + e.getMessage(), e); - } - - } - @SuppressWarnings("resource") private void updateConfigVersion(int newConfigVersion) throws StoreException { @@ -400,62 +436,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } } - private void loadConfiguredObjects(ConfigurationRecoveryHandler crh) throws DatabaseException, StoreException - { - Cursor objectsCursor = null; - Cursor hierarchyCursor = null; - try - { - objectsCursor = getConfiguredObjectsDb().openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - DatabaseEntry value = new DatabaseEntry(); - - Map configuredObjects = - new HashMap(); - - while (objectsCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - UUID id = UUIDTupleBinding.getInstance().entryToObject(key); - - BDBConfiguredObjectRecord configuredObject = - (BDBConfiguredObjectRecord) new ConfiguredObjectBinding(id).entryToObject(value); - configuredObjects.put(configuredObject.getId(), configuredObject); - } - - // set parents - hierarchyCursor = getConfiguredObjectHierarchyDb().openCursor(null, null); - while (hierarchyCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - HierarchyKey hk = HierarchyKeyBinding.getInstance().entryToObject(key); - UUID parentId = UUIDTupleBinding.getInstance().entryToObject(value); - BDBConfiguredObjectRecord child = configuredObjects.get(hk.getChildId()); - if(child != null) - { - ConfiguredObjectRecord parent = configuredObjects.get(parentId); - if(parent != null) - { - child.addParent(hk.getParentType(), parent); - } - else if(hk.getParentType().equals("Exchange")) - { - // TODO - remove this hack for the pre-defined exchanges - child.addParent(hk.getParentType(), new BDBConfiguredObjectRecord(parentId, "Exchange", Collections.emptyMap())); - } - } - } - - for (ConfiguredObjectRecord record : configuredObjects.values()) - { - crh.configuredObject(record); - } - } - finally - { - closeCursorSafely(objectsCursor); - closeCursorSafely(hierarchyCursor); - } - } - private void closeCursorSafely(Cursor cursor) throws StoreException { if (cursor != null) @@ -471,124 +451,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } } - private void recoverMessages(MessageStoreRecoveryHandler msrh) throws StoreException - { - StoredMessageRecoveryHandler mrh = msrh.begin(); - - Cursor cursor = null; - try - { - cursor = getMessageMetaDataDb().openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - DatabaseEntry value = new DatabaseEntry(); - MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance(); - - long maxId = 0; - - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - long messageId = LongBinding.entryToLong(key); - StorableMessageMetaData metaData = valueBinding.entryToObject(value); - - StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true); - - mrh.message(message); - - maxId = Math.max(maxId, messageId); - } - - _messageId.set(maxId); - mrh.completeMessageRecovery(); - } - catch (DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Cannot recover messages", e); - } - finally - { - closeCursorSafely(cursor); - } - } - - private void recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) - throws StoreException - { - QueueEntryRecoveryHandler qerh = recoveryHandler.begin(this); - - ArrayList entries = new ArrayList(); - - Cursor cursor = null; - try - { - cursor = getDeliveryDb().openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); - - DatabaseEntry value = new DatabaseEntry(); - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - QueueEntryKey qek = keyBinding.entryToObject(key); - - entries.add(qek); - } - - try - { - cursor.close(); - } - finally - { - cursor = null; - } - - for(QueueEntryKey entry : entries) - { - UUID queueId = entry.getQueueId(); - long messageId = entry.getMessageId(); - qerh.queueEntry(queueId, messageId); - } - } - catch (DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Cannot recover queue entries", e); - } - finally - { - closeCursorSafely(cursor); - } - - TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = qerh.completeQueueEntryRecovery(); - - cursor = null; - try - { - cursor = getXidDb().openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - XidBinding keyBinding = XidBinding.getInstance(); - PreparedTransactionBinding valueBinding = new PreparedTransactionBinding(); - DatabaseEntry value = new DatabaseEntry(); - - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - Xid xid = keyBinding.entryToObject(key); - PreparedTransaction preparedTransaction = valueBinding.entryToObject(value); - dtxrh.dtxRecord(xid.getFormat(),xid.getGlobalId(),xid.getBranchId(), - preparedTransaction.getEnqueues(),preparedTransaction.getDequeues()); - } - - } - catch (DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Cannot recover transactions", e); - } - finally - { - closeCursorSafely(cursor); - } - - - dtxrh.completeDtxRecordRecovery(); - } void removeMessage(long messageId, boolean sync) throws StoreException { @@ -739,6 +601,12 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore public void create(ConfiguredObjectRecord configuredObject) throws StoreException { checkConfigurationStoreOpen(); + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Create " + configuredObject); + } + com.sleepycat.je.Transaction txn = null; try { @@ -832,7 +700,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Updating " + record.getType() + ", id: " + record.getId()); + LOGGER.debug("Updating, creating " + createIfNecessary + " : " + record); } DatabaseEntry key = new DatabaseEntry(); @@ -890,8 +758,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore if (LOGGER.isDebugEnabled()) { LOGGER.debug("Enqueuing message " + messageId + " on queue " - + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId() - + " in transaction " + tx); + + queue.getName() + " with id " + queue.getId() + " in transaction " + tx); } getDeliveryDb().put(tx, key, value); } @@ -899,8 +766,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { LOGGER.error("Failed to enqueue: " + e.getMessage(), e); throw _environmentFacade.handleDatabaseException("Error writing enqueued message with id " + messageId + " for queue " - + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId() - + " to database", e); + + queue.getName() + " with id " + queue.getId() + " to database", e); } } @@ -925,7 +791,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore if (LOGGER.isDebugEnabled()) { LOGGER.debug("Dequeue message id " + messageId + " from queue " - + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id); + + queue.getName() + " with id " + id); } try @@ -935,19 +801,18 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore if (status == OperationStatus.NOTFOUND) { throw new StoreException("Unable to find message with id " + messageId + " on queue " - + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id); + + queue.getName() + " with id " + id); } else if (status != OperationStatus.SUCCESS) { throw new StoreException("Unable to remove message with id " + messageId + " on queue" - + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id); + + queue.getName() + " with id " + id); } if (LOGGER.isDebugEnabled()) { LOGGER.debug("Removed message " + messageId + " on queue " - + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id - + " from delivery db"); + + queue.getName() + " with id " + id); } } @@ -1072,57 +937,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } } - /** - * Primarily for testing purposes. - * - * @param queueId - * - * @return a list of message ids for messages enqueued for a particular queue - */ - List getEnqueuedMessages(UUID queueId) throws StoreException - { - Cursor cursor = null; - try - { - cursor = getDeliveryDb().openCursor(null, null); - - DatabaseEntry key = new DatabaseEntry(); - - QueueEntryKey dd = new QueueEntryKey(queueId, 0); - - QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); - keyBinding.objectToEntry(dd, key); - - DatabaseEntry value = new DatabaseEntry(); - - LinkedList messageIds = new LinkedList(); - - OperationStatus status = cursor.getSearchKeyRange(key, value, LockMode.DEFAULT); - dd = keyBinding.entryToObject(key); - - while ((status == OperationStatus.SUCCESS) && dd.getQueueId().equals(queueId)) - { - - messageIds.add(dd.getMessageId()); - status = cursor.getNext(key, value, LockMode.DEFAULT); - if (status == OperationStatus.SUCCESS) - { - dd = keyBinding.entryToObject(key); - } - } - - return messageIds; - } - catch (DatabaseException e) - { - throw new StoreException("Database error: " + e.getMessage(), e); - } - finally - { - closeCursorSafely(cursor); - } - } - /** * Return a valid, currently unused message id. * @@ -1793,12 +1607,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } } - @Override - public String getStoreType() - { - return _type; - } - private Database getConfiguredObjectsDb() { return _environmentFacade.getOpenDatabase(CONFIGURED_OBJECTS_DB_NAME); @@ -1902,4 +1710,147 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } } + + public class MaxMessageIdTask implements EnvironmentFacadeTask, MessageHandler + { + private long _maxId; + + @Override + public void execute(EnvironmentFacade facade) + { + visitMessagesInternal(this, facade); + _messageId.set(_maxId); + } + + @Override + public boolean handle(StoredMessage storedMessage) + { + long id = storedMessage.getMessageNumber(); + if (_maxId { diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java new file mode 100644 index 0000000000..6fba1b215e --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java @@ -0,0 +1,436 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.protocol.v0_10.MessageMetaDataType_0_10; +import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10; +import org.apache.qpid.server.protocol.v0_8.MessageMetaData; +import org.apache.qpid.server.protocol.v0_8.MessageMetaDataType_0_8; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreTestCase; +import org.apache.qpid.server.store.StorableMessageMetaData; +import org.apache.qpid.server.store.StoreException; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.MessageDeliveryMode; +import org.apache.qpid.transport.MessageDeliveryPriority; +import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.MessageTransfer; +import org.apache.qpid.util.FileUtils; + +/** + * Subclass of MessageStoreTestCase which runs the standard tests from the superclass against + * the BDB Store as well as additional tests specific to the BDB store-implementation. + */ +public class BDBMessageStoreTest extends MessageStoreTestCase +{ + private static byte[] CONTENT_BYTES = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; + + private String _storeLocation; + + @Override + protected void tearDown() throws Exception + { + try + { + super.tearDown(); + } + finally + { + deleteStoreIfExists(); + } + } + + /** + * Tests that message metadata and content are successfully read back from a + * store after it has been reloaded. Both 0-8 and 0-10 metadata is used to + * verify their ability to co-exist within the store and be successful retrieved. + */ + public void testBDBMessagePersistence() throws Exception + { + BDBMessageStore bdbStore = (BDBMessageStore)getStore(); + + // Create content ByteBuffers. + // Split the content into 2 chunks for the 0-8 message, as per broker behaviour. + // Use a single chunk for the 0-10 message as per broker behaviour. + String bodyText = "jfhdjsflsdhfjdshfjdslhfjdslhfsjlhfsjkhfdsjkhfdsjkfhdslkjf"; + + ByteBuffer firstContentBytes_0_8 = ByteBuffer.wrap(bodyText.substring(0, 10).getBytes()); + ByteBuffer secondContentBytes_0_8 = ByteBuffer.wrap(bodyText.substring(10).getBytes()); + + ByteBuffer completeContentBody_0_10 = ByteBuffer.wrap(bodyText.getBytes()); + int bodySize = completeContentBody_0_10.limit(); + + /* + * Create and insert a 0-8 message (metadata and multi-chunk content) + */ + MessagePublishInfo pubInfoBody_0_8 = createPublishInfoBody_0_8(); + BasicContentHeaderProperties props_0_8 = createContentHeaderProperties_0_8(); + + ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize); + + MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8); + StoredMessage storedMessage_0_8 = bdbStore.addMessage(messageMetaData_0_8); + + long origArrivalTime_0_8 = messageMetaData_0_8.getArrivalTime(); + long messageid_0_8 = storedMessage_0_8.getMessageNumber(); + + storedMessage_0_8.addContent(0, firstContentBytes_0_8); + storedMessage_0_8.addContent(firstContentBytes_0_8.limit(), secondContentBytes_0_8); + storedMessage_0_8.flushToStore(); + + /* + * Create and insert a 0-10 message (metadata and content) + */ + MessageProperties msgProps_0_10 = createMessageProperties_0_10(bodySize); + DeliveryProperties delProps_0_10 = createDeliveryProperties_0_10(); + Header header_0_10 = new Header(delProps_0_10, msgProps_0_10); + + MessageTransfer xfr_0_10 = new MessageTransfer("destination", MessageAcceptMode.EXPLICIT, + MessageAcquireMode.PRE_ACQUIRED, header_0_10, completeContentBody_0_10); + + MessageMetaData_0_10 messageMetaData_0_10 = new MessageMetaData_0_10(xfr_0_10); + StoredMessage storedMessage_0_10 = bdbStore.addMessage(messageMetaData_0_10); + + long origArrivalTime_0_10 = messageMetaData_0_10.getArrivalTime(); + long messageid_0_10 = storedMessage_0_10.getMessageNumber(); + + storedMessage_0_10.addContent(0, completeContentBody_0_10); + storedMessage_0_10.flushToStore(); + + /* + * reload the store only (read-only) + */ + reopenStore(); + + /* + * Read back and validate the 0-8 message metadata and content + */ + BDBMessageStore reopenedBdbStore = (BDBMessageStore) getStore(); + StorableMessageMetaData storeableMMD_0_8 = reopenedBdbStore.getMessageMetaData(messageid_0_8); + + assertEquals("Unexpected message type", MessageMetaDataType_0_8.TYPE, storeableMMD_0_8.getType().ordinal()); + assertTrue("Unexpected instance type", storeableMMD_0_8 instanceof MessageMetaData); + MessageMetaData returnedMMD_0_8 = (MessageMetaData) storeableMMD_0_8; + + assertEquals("Message arrival time has changed", origArrivalTime_0_8, returnedMMD_0_8.getArrivalTime()); + + MessagePublishInfo returnedPubBody_0_8 = returnedMMD_0_8.getMessagePublishInfo(); + assertEquals("Message exchange has changed", pubInfoBody_0_8.getExchange(), returnedPubBody_0_8.getExchange()); + assertEquals("Immediate flag has changed", pubInfoBody_0_8.isImmediate(), returnedPubBody_0_8.isImmediate()); + assertEquals("Mandatory flag has changed", pubInfoBody_0_8.isMandatory(), returnedPubBody_0_8.isMandatory()); + assertEquals("Routing key has changed", pubInfoBody_0_8.getRoutingKey(), returnedPubBody_0_8.getRoutingKey()); + + ContentHeaderBody returnedHeaderBody_0_8 = returnedMMD_0_8.getContentHeaderBody(); + assertEquals("ContentHeader ClassID has changed", chb_0_8.getClassId(), returnedHeaderBody_0_8.getClassId()); + assertEquals("ContentHeader weight has changed", chb_0_8.getWeight(), returnedHeaderBody_0_8.getWeight()); + assertEquals("ContentHeader bodySize has changed", chb_0_8.getBodySize(), returnedHeaderBody_0_8.getBodySize()); + + BasicContentHeaderProperties returnedProperties_0_8 = returnedHeaderBody_0_8.getProperties(); + assertEquals("Property ContentType has changed", props_0_8.getContentTypeAsString(), returnedProperties_0_8.getContentTypeAsString()); + assertEquals("Property MessageID has changed", props_0_8.getMessageIdAsString(), returnedProperties_0_8.getMessageIdAsString()); + + ByteBuffer recoveredContent_0_8 = ByteBuffer.allocate((int) chb_0_8.getBodySize()) ; + long recoveredCount_0_8 = reopenedBdbStore.getContent(messageid_0_8, 0, recoveredContent_0_8); + assertEquals("Incorrect amount of payload data recovered", chb_0_8.getBodySize(), recoveredCount_0_8); + String returnedPayloadString_0_8 = new String(recoveredContent_0_8.array()); + assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_8); + + /* + * Read back and validate the 0-10 message metadata and content + */ + StorableMessageMetaData storeableMMD_0_10 = reopenedBdbStore.getMessageMetaData(messageid_0_10); + + assertEquals("Unexpected message type", MessageMetaDataType_0_10.TYPE, storeableMMD_0_10.getType().ordinal()); + assertTrue("Unexpected instance type", storeableMMD_0_10 instanceof MessageMetaData_0_10); + MessageMetaData_0_10 returnedMMD_0_10 = (MessageMetaData_0_10) storeableMMD_0_10; + + assertEquals("Message arrival time has changed", origArrivalTime_0_10, returnedMMD_0_10.getArrivalTime()); + + DeliveryProperties returnedDelProps_0_10 = returnedMMD_0_10.getHeader().getDeliveryProperties(); + assertNotNull("DeliveryProperties were not returned", returnedDelProps_0_10); + assertEquals("Immediate flag has changed", delProps_0_10.getImmediate(), returnedDelProps_0_10.getImmediate()); + assertEquals("Routing key has changed", delProps_0_10.getRoutingKey(), returnedDelProps_0_10.getRoutingKey()); + assertEquals("Message exchange has changed", delProps_0_10.getExchange(), returnedDelProps_0_10.getExchange()); + assertEquals("Message expiration has changed", delProps_0_10.getExpiration(), returnedDelProps_0_10.getExpiration()); + assertEquals("Message delivery priority has changed", delProps_0_10.getPriority(), returnedDelProps_0_10.getPriority()); + + MessageProperties returnedMsgProps = returnedMMD_0_10.getHeader().getMessageProperties(); + assertNotNull("MessageProperties were not returned", returnedMsgProps); + assertTrue("Message correlationID has changed", Arrays.equals(msgProps_0_10.getCorrelationId(), returnedMsgProps.getCorrelationId())); + assertEquals("Message content length has changed", msgProps_0_10.getContentLength(), returnedMsgProps.getContentLength()); + assertEquals("Message content type has changed", msgProps_0_10.getContentType(), returnedMsgProps.getContentType()); + + ByteBuffer recoveredContent = ByteBuffer.allocate((int) msgProps_0_10.getContentLength()) ; + long recoveredCount = reopenedBdbStore.getContent(messageid_0_10, 0, recoveredContent); + assertEquals("Incorrect amount of payload data recovered", msgProps_0_10.getContentLength(), recoveredCount); + + String returnedPayloadString_0_10 = new String(recoveredContent.array()); + assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_10); + + reopenedBdbStore.closeMessageStore(); + } + + private DeliveryProperties createDeliveryProperties_0_10() + { + DeliveryProperties delProps_0_10 = new DeliveryProperties(); + + delProps_0_10.setDeliveryMode(MessageDeliveryMode.PERSISTENT); + delProps_0_10.setImmediate(true); + delProps_0_10.setExchange("exchange12345"); + delProps_0_10.setRoutingKey("routingKey12345"); + delProps_0_10.setExpiration(5); + delProps_0_10.setPriority(MessageDeliveryPriority.ABOVE_AVERAGE); + + return delProps_0_10; + } + + private MessageProperties createMessageProperties_0_10(int bodySize) + { + MessageProperties msgProps_0_10 = new MessageProperties(); + msgProps_0_10.setContentLength(bodySize); + msgProps_0_10.setCorrelationId("qwerty".getBytes()); + msgProps_0_10.setContentType("text/html"); + + return msgProps_0_10; + } + + + private MessagePublishInfo createPublishInfoBody_0_8() + { + return new MessagePublishInfo() + { + public AMQShortString getExchange() + { + return new AMQShortString("exchange12345"); + } + + @Override + public void setExchange(AMQShortString exchange) + { + } + + @Override + public boolean isImmediate() + { + return false; + } + + @Override + public boolean isMandatory() + { + return true; + } + + @Override + public AMQShortString getRoutingKey() + { + return new AMQShortString("routingKey12345"); + } + }; + + } + + private ContentHeaderBody createContentHeaderBody_0_8(BasicContentHeaderProperties props, int length) + { + MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9); + int classForBasic = methodRegistry.createBasicQosOkBody().getClazz(); + return new ContentHeaderBody(classForBasic, 1, props, length); + } + + private BasicContentHeaderProperties createContentHeaderProperties_0_8() + { + BasicContentHeaderProperties props = new BasicContentHeaderProperties(); + props.setDeliveryMode(Integer.valueOf(BasicContentHeaderProperties.PERSISTENT).byteValue()); + props.setContentType("text/html"); + props.getHeaders().setString("Test", "MST"); + return props; + } + + public void testGetContentWithOffset() throws Exception + { + BDBMessageStore bdbStore = (BDBMessageStore) getStore(); + StoredMessage storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(bdbStore); + long messageid_0_8 = storedMessage_0_8.getMessageNumber(); + + // normal case: offset is 0 + ByteBuffer dst = ByteBuffer.allocate(10); + int length = bdbStore.getContent(messageid_0_8, 0, dst); + assertEquals("Unexpected length", CONTENT_BYTES.length, length); + byte[] array = dst.array(); + assertTrue("Unexpected content", Arrays.equals(CONTENT_BYTES, array)); + + // offset is in the middle + dst = ByteBuffer.allocate(10); + length = bdbStore.getContent(messageid_0_8, 5, dst); + assertEquals("Unexpected length", 5, length); + array = dst.array(); + byte[] expected = new byte[10]; + System.arraycopy(CONTENT_BYTES, 5, expected, 0, 5); + assertTrue("Unexpected content", Arrays.equals(expected, array)); + + // offset beyond the content length + dst = ByteBuffer.allocate(10); + try + { + bdbStore.getContent(messageid_0_8, 15, dst); + fail("Should fail for the offset greater than message size"); + } + catch (RuntimeException e) + { + assertEquals("Unexpected exception message", "Offset 15 is greater than message size 10 for message id " + + messageid_0_8 + "!", e.getMessage()); + } + + // buffer is smaller then message size + dst = ByteBuffer.allocate(5); + length = bdbStore.getContent(messageid_0_8, 0, dst); + assertEquals("Unexpected length", 5, length); + array = dst.array(); + expected = new byte[5]; + System.arraycopy(CONTENT_BYTES, 0, expected, 0, 5); + assertTrue("Unexpected content", Arrays.equals(expected, array)); + + // buffer is smaller then message size, offset is not 0 + dst = ByteBuffer.allocate(5); + length = bdbStore.getContent(messageid_0_8, 2, dst); + assertEquals("Unexpected length", 5, length); + array = dst.array(); + expected = new byte[5]; + System.arraycopy(CONTENT_BYTES, 2, expected, 0, 5); + assertTrue("Unexpected content", Arrays.equals(expected, array)); + } + + /** + * Tests that messages which are added to the store and then removed using the + * public MessageStore interfaces are actually removed from the store by then + * interrogating the store with its own implementation methods and verifying + * expected exceptions are thrown to indicate the message is not present. + */ + public void testMessageCreationAndRemoval() throws Exception + { + BDBMessageStore bdbStore = (BDBMessageStore)getStore(); + + StoredMessage storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(bdbStore); + long messageid_0_8 = storedMessage_0_8.getMessageNumber(); + + bdbStore.removeMessage(messageid_0_8, true); + + //verify the removal using the BDB store implementation methods directly + try + { + // the next line should throw since the message id should not be found + bdbStore.getMessageMetaData(messageid_0_8); + fail("No exception thrown when message id not found getting metadata"); + } + catch (StoreException e) + { + // pass since exception expected + } + + //expecting no content, allocate a 1 byte + ByteBuffer dst = ByteBuffer.allocate(1); + + assertEquals("Retrieved content when none was expected", + 0, bdbStore.getContent(messageid_0_8, 0, dst)); + } + + private StoredMessage createAndStoreSingleChunkMessage_0_8(MessageStore store) + { + ByteBuffer chunk1 = ByteBuffer.wrap(CONTENT_BYTES); + + int bodySize = CONTENT_BYTES.length; + + //create and store the message using the MessageStore interface + MessagePublishInfo pubInfoBody_0_8 = createPublishInfoBody_0_8(); + BasicContentHeaderProperties props_0_8 = createContentHeaderProperties_0_8(); + + ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize); + + MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8); + StoredMessage storedMessage_0_8 = store.addMessage(messageMetaData_0_8); + + storedMessage_0_8.addContent(0, chunk1); + storedMessage_0_8.flushToStore(); + + return storedMessage_0_8; + } + + public void testOnDelete() throws Exception + { + String storeLocation = getStore().getStoreLocation(); + + File location = new File(storeLocation); + assertTrue("Store does not exist at " + storeLocation, location.exists()); + + getStore().closeMessageStore(); + assertTrue("Store does not exist at " + storeLocation, location.exists()); + + getStore().onDelete(); + assertFalse("Store exists at " + storeLocation, location.exists()); + } + + + @Override + protected Map getStoreSettings() throws Exception + { + _storeLocation = TMP_FOLDER + File.separator + getTestName(); + deleteStoreIfExists(); + Map messageStoreSettings = new HashMap(); + messageStoreSettings.put(MessageStore.STORE_PATH, _storeLocation); + return messageStoreSettings; + + } + + private void deleteStoreIfExists() + { + if (_storeLocation != null) + { + File location = new File(_storeLocation); + if (location.exists()) + { + FileUtils.delete(location, true); + } + } + } + + @Override + protected MessageStore createMessageStore() + { + return new BDBMessageStore(); + } + +} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java index 0460b1ce4c..717534a6b8 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java @@ -48,7 +48,7 @@ import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.QueueArgumentsConverter; -import org.apache.qpid.server.store.berkeleydb.entry.Xid; +import org.apache.qpid.server.store.Xid; import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding; import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CompoundKey; import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CompoundKeyBinding; diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java deleted file mode 100644 index 465c49e0c4..0000000000 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java +++ /dev/null @@ -1,666 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.File; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.List; -import java.util.UUID; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.message.AMQMessageHeader; -import org.apache.qpid.server.message.EnqueueableMessage; -import org.apache.qpid.server.message.MessageReference; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.protocol.v0_10.MessageMetaDataType_0_10; -import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10; -import org.apache.qpid.server.protocol.v0_8.MessageMetaData; -import org.apache.qpid.server.protocol.v0_8.MessageMetaDataType_0_8; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreRecoveryHandler; -import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; -import org.apache.qpid.server.store.MessageStoreTest; -import org.apache.qpid.server.store.StorableMessageMetaData; -import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.store.Transaction; -import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.transport.DeliveryProperties; -import org.apache.qpid.transport.Header; -import org.apache.qpid.transport.MessageAcceptMode; -import org.apache.qpid.transport.MessageAcquireMode; -import org.apache.qpid.transport.MessageDeliveryMode; -import org.apache.qpid.transport.MessageDeliveryPriority; -import org.apache.qpid.transport.MessageProperties; -import org.apache.qpid.transport.MessageTransfer; - -/** - * Subclass of MessageStoreTest which runs the standard tests from the superclass against - * the BDB Store as well as additional tests specific to the BDB store-implementation. - */ -public class BDBMessageStoreTest extends MessageStoreTest -{ - private static byte[] CONTENT_BYTES = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; - - /** - * Tests that message metadata and content are successfully read back from a - * store after it has been reloaded. Both 0-8 and 0-10 metadata is used to - * verify their ability to co-exist within the store and be successful retrieved. - */ - public void testBDBMessagePersistence() throws Exception - { - MessageStore store = getVirtualHost().getMessageStore(); - - BDBMessageStore bdbStore = assertBDBStore(store); - - // Create content ByteBuffers. - // Split the content into 2 chunks for the 0-8 message, as per broker behaviour. - // Use a single chunk for the 0-10 message as per broker behaviour. - String bodyText = "jfhdjsflsdhfjdshfjdslhfjdslhfsjlhfsjkhfdsjkhfdsjkfhdslkjf"; - - ByteBuffer firstContentBytes_0_8 = ByteBuffer.wrap(bodyText.substring(0, 10).getBytes()); - ByteBuffer secondContentBytes_0_8 = ByteBuffer.wrap(bodyText.substring(10).getBytes()); - - ByteBuffer completeContentBody_0_10 = ByteBuffer.wrap(bodyText.getBytes()); - int bodySize = completeContentBody_0_10.limit(); - - /* - * Create and insert a 0-8 message (metadata and multi-chunk content) - */ - MessagePublishInfo pubInfoBody_0_8 = createPublishInfoBody_0_8(); - BasicContentHeaderProperties props_0_8 = createContentHeaderProperties_0_8(); - - ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize); - - MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8); - StoredMessage storedMessage_0_8 = bdbStore.addMessage(messageMetaData_0_8); - - long origArrivalTime_0_8 = messageMetaData_0_8.getArrivalTime(); - long messageid_0_8 = storedMessage_0_8.getMessageNumber(); - - storedMessage_0_8.addContent(0, firstContentBytes_0_8); - storedMessage_0_8.addContent(firstContentBytes_0_8.limit(), secondContentBytes_0_8); - storedMessage_0_8.flushToStore(); - - /* - * Create and insert a 0-10 message (metadata and content) - */ - MessageProperties msgProps_0_10 = createMessageProperties_0_10(bodySize); - DeliveryProperties delProps_0_10 = createDeliveryProperties_0_10(); - Header header_0_10 = new Header(delProps_0_10, msgProps_0_10); - - MessageTransfer xfr_0_10 = new MessageTransfer("destination", MessageAcceptMode.EXPLICIT, - MessageAcquireMode.PRE_ACQUIRED, header_0_10, completeContentBody_0_10); - - MessageMetaData_0_10 messageMetaData_0_10 = new MessageMetaData_0_10(xfr_0_10); - StoredMessage storedMessage_0_10 = bdbStore.addMessage(messageMetaData_0_10); - - long origArrivalTime_0_10 = messageMetaData_0_10.getArrivalTime(); - long messageid_0_10 = storedMessage_0_10.getMessageNumber(); - - storedMessage_0_10.addContent(0, completeContentBody_0_10); - storedMessage_0_10.flushToStore(); - - /* - * reload the store only (read-only) - */ - BDBMessageStore readOnlyStore = reloadStore(bdbStore); - - /* - * Read back and validate the 0-8 message metadata and content - */ - StorableMessageMetaData storeableMMD_0_8 = readOnlyStore.getMessageMetaData(messageid_0_8); - - assertEquals("Unexpected message type", MessageMetaDataType_0_8.TYPE, storeableMMD_0_8.getType().ordinal()); - assertTrue("Unexpected instance type", storeableMMD_0_8 instanceof MessageMetaData); - MessageMetaData returnedMMD_0_8 = (MessageMetaData) storeableMMD_0_8; - - assertEquals("Message arrival time has changed", origArrivalTime_0_8, returnedMMD_0_8.getArrivalTime()); - - MessagePublishInfo returnedPubBody_0_8 = returnedMMD_0_8.getMessagePublishInfo(); - assertEquals("Message exchange has changed", pubInfoBody_0_8.getExchange(), returnedPubBody_0_8.getExchange()); - assertEquals("Immediate flag has changed", pubInfoBody_0_8.isImmediate(), returnedPubBody_0_8.isImmediate()); - assertEquals("Mandatory flag has changed", pubInfoBody_0_8.isMandatory(), returnedPubBody_0_8.isMandatory()); - assertEquals("Routing key has changed", pubInfoBody_0_8.getRoutingKey(), returnedPubBody_0_8.getRoutingKey()); - - ContentHeaderBody returnedHeaderBody_0_8 = returnedMMD_0_8.getContentHeaderBody(); - assertEquals("ContentHeader ClassID has changed", chb_0_8.getClassId(), returnedHeaderBody_0_8.getClassId()); - assertEquals("ContentHeader weight has changed", chb_0_8.getWeight(), returnedHeaderBody_0_8.getWeight()); - assertEquals("ContentHeader bodySize has changed", chb_0_8.getBodySize(), returnedHeaderBody_0_8.getBodySize()); - - BasicContentHeaderProperties returnedProperties_0_8 = returnedHeaderBody_0_8.getProperties(); - assertEquals("Property ContentType has changed", props_0_8.getContentTypeAsString(), returnedProperties_0_8.getContentTypeAsString()); - assertEquals("Property MessageID has changed", props_0_8.getMessageIdAsString(), returnedProperties_0_8.getMessageIdAsString()); - - ByteBuffer recoveredContent_0_8 = ByteBuffer.allocate((int) chb_0_8.getBodySize()) ; - long recoveredCount_0_8 = readOnlyStore.getContent(messageid_0_8, 0, recoveredContent_0_8); - assertEquals("Incorrect amount of payload data recovered", chb_0_8.getBodySize(), recoveredCount_0_8); - String returnedPayloadString_0_8 = new String(recoveredContent_0_8.array()); - assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_8); - - /* - * Read back and validate the 0-10 message metadata and content - */ - StorableMessageMetaData storeableMMD_0_10 = readOnlyStore.getMessageMetaData(messageid_0_10); - - assertEquals("Unexpected message type", MessageMetaDataType_0_10.TYPE, storeableMMD_0_10.getType().ordinal()); - assertTrue("Unexpected instance type", storeableMMD_0_10 instanceof MessageMetaData_0_10); - MessageMetaData_0_10 returnedMMD_0_10 = (MessageMetaData_0_10) storeableMMD_0_10; - - assertEquals("Message arrival time has changed", origArrivalTime_0_10, returnedMMD_0_10.getArrivalTime()); - - DeliveryProperties returnedDelProps_0_10 = returnedMMD_0_10.getHeader().getDeliveryProperties(); - assertNotNull("DeliveryProperties were not returned", returnedDelProps_0_10); - assertEquals("Immediate flag has changed", delProps_0_10.getImmediate(), returnedDelProps_0_10.getImmediate()); - assertEquals("Routing key has changed", delProps_0_10.getRoutingKey(), returnedDelProps_0_10.getRoutingKey()); - assertEquals("Message exchange has changed", delProps_0_10.getExchange(), returnedDelProps_0_10.getExchange()); - assertEquals("Message expiration has changed", delProps_0_10.getExpiration(), returnedDelProps_0_10.getExpiration()); - assertEquals("Message delivery priority has changed", delProps_0_10.getPriority(), returnedDelProps_0_10.getPriority()); - - MessageProperties returnedMsgProps = returnedMMD_0_10.getHeader().getMessageProperties(); - assertNotNull("MessageProperties were not returned", returnedMsgProps); - assertTrue("Message correlationID has changed", Arrays.equals(msgProps_0_10.getCorrelationId(), returnedMsgProps.getCorrelationId())); - assertEquals("Message content length has changed", msgProps_0_10.getContentLength(), returnedMsgProps.getContentLength()); - assertEquals("Message content type has changed", msgProps_0_10.getContentType(), returnedMsgProps.getContentType()); - - ByteBuffer recoveredContent = ByteBuffer.allocate((int) msgProps_0_10.getContentLength()) ; - long recoveredCount = readOnlyStore.getContent(messageid_0_10, 0, recoveredContent); - assertEquals("Incorrect amount of payload data recovered", msgProps_0_10.getContentLength(), recoveredCount); - - String returnedPayloadString_0_10 = new String(recoveredContent.array()); - assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_10); - - readOnlyStore.closeMessageStore(); - } - - private DeliveryProperties createDeliveryProperties_0_10() - { - DeliveryProperties delProps_0_10 = new DeliveryProperties(); - - delProps_0_10.setDeliveryMode(MessageDeliveryMode.PERSISTENT); - delProps_0_10.setImmediate(true); - delProps_0_10.setExchange("exchange12345"); - delProps_0_10.setRoutingKey("routingKey12345"); - delProps_0_10.setExpiration(5); - delProps_0_10.setPriority(MessageDeliveryPriority.ABOVE_AVERAGE); - - return delProps_0_10; - } - - private MessageProperties createMessageProperties_0_10(int bodySize) - { - MessageProperties msgProps_0_10 = new MessageProperties(); - msgProps_0_10.setContentLength(bodySize); - msgProps_0_10.setCorrelationId("qwerty".getBytes()); - msgProps_0_10.setContentType("text/html"); - - return msgProps_0_10; - } - - /** - * Close the provided store and create a new (read-only) store to read back the data. - * - * Use this method instead of reloading the virtual host like other tests in order - * to avoid the recovery handler deleting the message for not being on a queue. - */ - private BDBMessageStore reloadStore(BDBMessageStore messageStore) throws Exception - { - messageStore.closeMessageStore(); - - - BDBMessageStore newStore = new BDBMessageStore(); - - MessageStoreRecoveryHandler recoveryHandler = mock(MessageStoreRecoveryHandler.class); - when(recoveryHandler.begin()).thenReturn(mock(StoredMessageRecoveryHandler.class)); - VirtualHost virtualHost = getVirtualHostModel(); - newStore.openMessageStore(virtualHost, virtualHost.getMessageStoreSettings()); - - newStore.recoverMessageStore(recoveryHandler, null); - - return newStore; - } - - private MessagePublishInfo createPublishInfoBody_0_8() - { - return new MessagePublishInfo() - { - public AMQShortString getExchange() - { - return new AMQShortString("exchange12345"); - } - - public void setExchange(AMQShortString exchange) - { - } - - public boolean isImmediate() - { - return false; - } - - public boolean isMandatory() - { - return true; - } - - public AMQShortString getRoutingKey() - { - return new AMQShortString("routingKey12345"); - } - }; - - } - - private ContentHeaderBody createContentHeaderBody_0_8(BasicContentHeaderProperties props, int length) - { - MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9); - int classForBasic = methodRegistry.createBasicQosOkBody().getClazz(); - return new ContentHeaderBody(classForBasic, 1, props, length); - } - - private BasicContentHeaderProperties createContentHeaderProperties_0_8() - { - BasicContentHeaderProperties props = new BasicContentHeaderProperties(); - props.setDeliveryMode(Integer.valueOf(BasicContentHeaderProperties.PERSISTENT).byteValue()); - props.setContentType("text/html"); - props.getHeaders().setString("Test", "MST"); - return props; - } - - public void testGetContentWithOffset() throws Exception - { - MessageStore store = getVirtualHost().getMessageStore(); - BDBMessageStore bdbStore = assertBDBStore(store); - StoredMessage storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store); - long messageid_0_8 = storedMessage_0_8.getMessageNumber(); - - // normal case: offset is 0 - ByteBuffer dst = ByteBuffer.allocate(10); - int length = bdbStore.getContent(messageid_0_8, 0, dst); - assertEquals("Unexpected length", CONTENT_BYTES.length, length); - byte[] array = dst.array(); - assertTrue("Unexpected content", Arrays.equals(CONTENT_BYTES, array)); - - // offset is in the middle - dst = ByteBuffer.allocate(10); - length = bdbStore.getContent(messageid_0_8, 5, dst); - assertEquals("Unexpected length", 5, length); - array = dst.array(); - byte[] expected = new byte[10]; - System.arraycopy(CONTENT_BYTES, 5, expected, 0, 5); - assertTrue("Unexpected content", Arrays.equals(expected, array)); - - // offset beyond the content length - dst = ByteBuffer.allocate(10); - try - { - bdbStore.getContent(messageid_0_8, 15, dst); - fail("Should fail for the offset greater than message size"); - } - catch (RuntimeException e) - { - assertEquals("Unexpected exception message", "Offset 15 is greater than message size 10 for message id " - + messageid_0_8 + "!", e.getMessage()); - } - - // buffer is smaller then message size - dst = ByteBuffer.allocate(5); - length = bdbStore.getContent(messageid_0_8, 0, dst); - assertEquals("Unexpected length", 5, length); - array = dst.array(); - expected = new byte[5]; - System.arraycopy(CONTENT_BYTES, 0, expected, 0, 5); - assertTrue("Unexpected content", Arrays.equals(expected, array)); - - // buffer is smaller then message size, offset is not 0 - dst = ByteBuffer.allocate(5); - length = bdbStore.getContent(messageid_0_8, 2, dst); - assertEquals("Unexpected length", 5, length); - array = dst.array(); - expected = new byte[5]; - System.arraycopy(CONTENT_BYTES, 2, expected, 0, 5); - assertTrue("Unexpected content", Arrays.equals(expected, array)); - } - /** - * Tests that messages which are added to the store and then removed using the - * public MessageStore interfaces are actually removed from the store by then - * interrogating the store with its own implementation methods and verifying - * expected exceptions are thrown to indicate the message is not present. - */ - public void testMessageCreationAndRemoval() throws Exception - { - MessageStore store = getVirtualHost().getMessageStore(); - BDBMessageStore bdbStore = assertBDBStore(store); - - StoredMessage storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store); - long messageid_0_8 = storedMessage_0_8.getMessageNumber(); - - bdbStore.removeMessage(messageid_0_8, true); - - //verify the removal using the BDB store implementation methods directly - try - { - // the next line should throw since the message id should not be found - bdbStore.getMessageMetaData(messageid_0_8); - fail("No exception thrown when message id not found getting metadata"); - } - catch (StoreException e) - { - // pass since exception expected - } - - //expecting no content, allocate a 1 byte - ByteBuffer dst = ByteBuffer.allocate(1); - - assertEquals("Retrieved content when none was expected", - 0, bdbStore.getContent(messageid_0_8, 0, dst)); - } - private BDBMessageStore assertBDBStore(MessageStore store) - { - - assertEquals("Test requires an instance of BDBMessageStore to proceed", BDBMessageStore.class, store.getClass()); - - return (BDBMessageStore) store; - } - - private StoredMessage createAndStoreSingleChunkMessage_0_8(MessageStore store) - { - ByteBuffer chunk1 = ByteBuffer.wrap(CONTENT_BYTES); - - int bodySize = CONTENT_BYTES.length; - - //create and store the message using the MessageStore interface - MessagePublishInfo pubInfoBody_0_8 = createPublishInfoBody_0_8(); - BasicContentHeaderProperties props_0_8 = createContentHeaderProperties_0_8(); - - ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize); - - MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8); - StoredMessage storedMessage_0_8 = store.addMessage(messageMetaData_0_8); - - storedMessage_0_8.addContent(0, chunk1); - storedMessage_0_8.flushToStore(); - - return storedMessage_0_8; - } - - /** - * Tests transaction commit by utilising the enqueue and dequeue methods available - * in the TransactionLog interface implemented by the store, and verifying the - * behaviour using BDB implementation methods. - */ - public void testTranCommit() throws Exception - { - MessageStore log = getVirtualHost().getMessageStore(); - - BDBMessageStore bdbStore = assertBDBStore(log); - - final UUID mockQueueId = UUIDGenerator.generateRandomUUID(); - TransactionLogResource mockQueue = new TransactionLogResource() - { - @Override - public String getName() - { - return getId().toString(); - } - - @Override - public UUID getId() - { - return mockQueueId; - } - - @Override - public boolean isDurable() - { - return true; - } - }; - - Transaction txn = log.newTransaction(); - - txn.enqueueMessage(mockQueue, new MockMessage(1L)); - txn.enqueueMessage(mockQueue, new MockMessage(5L)); - txn.commitTran(); - - List enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueId); - - assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size()); - Long val = enqueuedIds.get(0); - assertEquals("First Message is incorrect", 1L, val.longValue()); - val = enqueuedIds.get(1); - assertEquals("Second Message is incorrect", 5L, val.longValue()); - } - - - /** - * Tests transaction rollback before a commit has occurred by utilising the - * enqueue and dequeue methods available in the TransactionLog interface - * implemented by the store, and verifying the behaviour using BDB - * implementation methods. - */ - public void testTranRollbackBeforeCommit() throws Exception - { - MessageStore log = getVirtualHost().getMessageStore(); - - BDBMessageStore bdbStore = assertBDBStore(log); - - final UUID mockQueueId = UUIDGenerator.generateRandomUUID(); - TransactionLogResource mockQueue = new TransactionLogResource() - { - @Override - public String getName() - { - return getId().toString(); - } - - @Override - public UUID getId() - { - return mockQueueId; - } - - @Override - public boolean isDurable() - { - return true; - } - }; - - Transaction txn = log.newTransaction(); - - txn.enqueueMessage(mockQueue, new MockMessage(21L)); - txn.abortTran(); - - txn = log.newTransaction(); - txn.enqueueMessage(mockQueue, new MockMessage(22L)); - txn.enqueueMessage(mockQueue, new MockMessage(23L)); - txn.commitTran(); - - List enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueId); - - assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size()); - Long val = enqueuedIds.get(0); - assertEquals("First Message is incorrect", 22L, val.longValue()); - val = enqueuedIds.get(1); - assertEquals("Second Message is incorrect", 23L, val.longValue()); - } - - public void testOnDelete() throws Exception - { - MessageStore log = getVirtualHost().getMessageStore(); - BDBMessageStore bdbStore = assertBDBStore(log); - String storeLocation = bdbStore.getStoreLocation(); - - File location = new File(storeLocation); - assertTrue("Store does not exist at " + storeLocation, location.exists()); - - bdbStore.closeMessageStore(); - assertTrue("Store does not exist at " + storeLocation, location.exists()); - - bdbStore.onDelete(); - assertFalse("Store exists at " + storeLocation, location.exists()); - } - - /** - * Tests transaction rollback after a commit has occurred by utilising the - * enqueue and dequeue methods available in the TransactionLog interface - * implemented by the store, and verifying the behaviour using BDB - * implementation methods. - */ - public void testTranRollbackAfterCommit() throws Exception - { - MessageStore log = getVirtualHost().getMessageStore(); - - BDBMessageStore bdbStore = assertBDBStore(log); - - final UUID mockQueueId = UUIDGenerator.generateRandomUUID(); - TransactionLogResource mockQueue = new TransactionLogResource() - { - @Override - public String getName() - { - return getId().toString(); - } - - @Override - public UUID getId() - { - return mockQueueId; - } - - @Override - public boolean isDurable() - { - return true; - } - }; - - Transaction txn = log.newTransaction(); - - txn.enqueueMessage(mockQueue, new MockMessage(30L)); - txn.commitTran(); - - txn = log.newTransaction(); - txn.enqueueMessage(mockQueue, new MockMessage(31L)); - txn.abortTran(); - - txn = log.newTransaction(); - txn.enqueueMessage(mockQueue, new MockMessage(32L)); - txn.commitTran(); - - List enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueId); - - assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size()); - Long val = enqueuedIds.get(0); - assertEquals("First Message is incorrect", 30L, val.longValue()); - val = enqueuedIds.get(1); - assertEquals("Second Message is incorrect", 32L, val.longValue()); - } - - @SuppressWarnings("rawtypes") - private static class MockMessage implements ServerMessage, EnqueueableMessage - { - private long _messageId; - - public MockMessage(long messageId) - { - _messageId = messageId; - } - - public String getInitialRoutingAddress() - { - return null; - } - - public AMQMessageHeader getMessageHeader() - { - return null; - } - - public StoredMessage getStoredMessage() - { - return null; - } - - public boolean isPersistent() - { - return true; - } - - public long getSize() - { - return 0; - } - - public boolean isImmediate() - { - return false; - } - - public long getExpiration() - { - return 0; - } - - public MessageReference newReference() - { - return null; - } - - public long getMessageNumber() - { - return _messageId; - } - - public long getArrivalTime() - { - return 0; - } - - public int getContent(ByteBuffer buf, int offset) - { - return 0; - } - - public ByteBuffer getContent(int offset, int length) - { - return null; - } - - @Override - public Object getConnectionReference() - { - return null; - } - } -} -- cgit v1.2.1