diff options
| author | Alex Rudyy <orudyy@apache.org> | 2013-04-29 14:44:11 +0000 |
|---|---|---|
| committer | Alex Rudyy <orudyy@apache.org> | 2013-04-29 14:44:11 +0000 |
| commit | 9e30ef98f227273fc2587ee6b67829524035b714 (patch) | |
| tree | 1153729df9fff5678a9855f9b36cab4b18d9ce2e /java/broker | |
| parent | 677ba13c2cebd987011332b224ba36c64ffc3080 (diff) | |
| download | qpid-python-9e30ef98f227273fc2587ee6b67829524035b714.tar.gz | |
QPID-4639: Improve functionality to delete store on virtual host deletion
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1477110 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
9 files changed, 402 insertions, 15 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java index 97bb492484..220d6e1ddd 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java @@ -80,13 +80,11 @@ import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -import org.apache.qpid.util.FileUtils; public final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, ExchangeRegistry.RegistryChangeListener, QueueRegistry.RegistryChangeListener, IConnectionRegistry.RegistryChangeListener { - private static final Logger LOGGER = Logger.getLogger(VirtualHostAdapter.class); @SuppressWarnings("serial") @@ -1004,24 +1002,21 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual { throw new IntegrityViolationException("Cannot delete default virtual host '" + hostName + "'"); } - String storePath = (String)getAttribute(STORE_PATH); if (_virtualHost != null && _virtualHost.getState() == org.apache.qpid.server.virtualhost.State.ACTIVE) { setDesiredState(currentState, State.STOPPED); } + MessageStore ms = _virtualHost.getMessageStore(); _virtualHost = null; - setAttribute(VirtualHost.STATE, getActualState(), State.DELETED); - if (storePath != null) + try { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Deleting store at " + storePath); - } - if (!FileUtils.delete(new File(storePath), true)) - { - LOGGER.warn("Cannot delete " + storePath); - } + ms.onDelete(); } + catch(Exception e) + { + LOGGER.warn("Exception occured on store deletion", e); + } + setAttribute(VirtualHost.STATE, getActualState(), State.DELETED); return true; } return false; diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index 3428e5735e..6b4b8d4a3e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -72,6 +72,9 @@ abstract public class AbstractJDBCMessageStore implements MessageStore private static final String CONFIGURED_OBJECTS_TABLE_NAME = "QPID_CONFIGURED_OBJECTS"; + public static String[] ALL_TABLES = new String[] { DB_VERSION_TABLE_NAME, LINKS_TABLE_NAME, BRIDGES_TABLE_NAME, XID_ACTIONS_TABLE_NAME, + XID_TABLE_NAME, QUEUE_ENTRY_TABLE_NAME, MESSAGE_CONTENT_TABLE_NAME, META_DATA_TABLE_NAME, CONFIGURED_OBJECTS_TABLE_NAME }; + private static final int DB_VERSION = 6; private final AtomicLong _messageId = new AtomicLong(0); @@ -167,6 +170,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore private ConfiguredObjectHelper _configuredObjectHelper = new ConfiguredObjectHelper(); + @Override public void configureConfigStore(String name, ConfigurationRecoveryHandler configRecoveryHandler, @@ -2037,4 +2041,37 @@ abstract public class AbstractJDBCMessageStore implements MessageStore protected abstract void storedSizeChange(int storeSizeIncrease); + + @Override + public void onDelete() + { + try + { + Connection conn = newAutoCommitConnection(); + try + { + for (String tableName : ALL_TABLES) + { + Statement stmt = conn.createStatement(); + try + { + stmt.execute("DROP TABLE " + tableName); + } + finally + { + stmt.close(); + } + } + } + finally + { + conn.close(); + } + } + catch(SQLException e) + { + getLogger().error("Exception while deleting store tables", e); + } + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java index 0acaf164d9..cf8444b089 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java @@ -69,4 +69,6 @@ public interface MessageStore extends DurableConfigurationStore String getStoreLocation(); String getStoreType(); + + void onDelete(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java index c6bffbc1de..fdb80295cf 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java @@ -121,4 +121,8 @@ public abstract class NullMessageStore implements MessageStore return null; } + @Override + public void onDelete() + { + } }
\ No newline at end of file diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java index 271b7f5551..57024817f5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java @@ -39,6 +39,7 @@ import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreConstants; +import org.apache.qpid.util.FileUtils; /** * An implementation of a {@link MessageStore} that uses Apache Derby as the persistence @@ -56,7 +57,7 @@ public class DerbyMessageStore extends AbstractJDBCMessageStore implements Messa private static final String TABLE_EXISTANCE_QUERY = "SELECT 1 FROM SYS.SYSTABLES WHERE TABLENAME = ?"; - private static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006"; + public static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006"; public static final String TYPE = "DERBY"; @@ -418,4 +419,29 @@ public class DerbyMessageStore extends AbstractJDBCMessageStore implements Messa return TYPE; } + @Override + public void onDelete() + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Deleting store " + _storeLocation); + } + + if (MEMORY_STORE_LOCATION.equals(_storeLocation)) + { + return; + } + + if (_storeLocation != null) + { + File location = new File(_storeLocation); + if (location.exists()) + { + if (!FileUtils.delete(location, true)) + { + _logger.error("Cannot delete " + _storeLocation); + } + } + } + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java index 2c4b0e8119..42a32c1cbd 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java @@ -24,7 +24,6 @@ package org.apache.qpid.server.store.jdbc; import java.sql.Blob; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java new file mode 100644 index 0000000000..065d6408de --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java @@ -0,0 +1,93 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler; +import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler; +import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler; +import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; +import org.apache.qpid.test.utils.QpidTestCase; + +public abstract class MessageStoreTestCase extends QpidTestCase +{ + private ConfigurationRecoveryHandler _recoveryHandler; + private QueueRecoveryHandler _queueRecoveryHandler; + private ExchangeRecoveryHandler _exchangeRecoveryHandler; + private BindingRecoveryHandler _bindingRecoveryHandler; + private MessageStoreRecoveryHandler _messageStoreRecoveryHandler; + private StoredMessageRecoveryHandler _storedMessageRecoveryHandler; + private TransactionLogRecoveryHandler _logRecoveryHandler; + private TransactionLogRecoveryHandler.QueueEntryRecoveryHandler _queueEntryRecoveryHandler; + private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler _dtxRecordRecoveryHandler; + + private MessageStore _store; + private Configuration _storeConfiguration; + + public void setUp() throws Exception + { + super.setUp(); + + _recoveryHandler = mock(ConfigurationRecoveryHandler.class); + _queueRecoveryHandler = mock(QueueRecoveryHandler.class); + _exchangeRecoveryHandler = mock(ExchangeRecoveryHandler.class); + _bindingRecoveryHandler = mock(BindingRecoveryHandler.class); + _storedMessageRecoveryHandler = mock(StoredMessageRecoveryHandler.class); + _logRecoveryHandler = mock(TransactionLogRecoveryHandler.class); + _messageStoreRecoveryHandler = mock(MessageStoreRecoveryHandler.class); + _queueEntryRecoveryHandler = mock(TransactionLogRecoveryHandler.QueueEntryRecoveryHandler.class); + _dtxRecordRecoveryHandler = mock(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler.class); + + when(_messageStoreRecoveryHandler.begin()).thenReturn(_storedMessageRecoveryHandler); + when(_recoveryHandler.begin(isA(MessageStore.class))).thenReturn(_exchangeRecoveryHandler); + when(_exchangeRecoveryHandler.completeExchangeRecovery()).thenReturn(_queueRecoveryHandler); + when(_queueRecoveryHandler.completeQueueRecovery()).thenReturn(_bindingRecoveryHandler); + when(_logRecoveryHandler.begin(any(MessageStore.class))).thenReturn(_queueEntryRecoveryHandler); + when(_queueEntryRecoveryHandler.completeQueueEntryRecovery()).thenReturn(_dtxRecordRecoveryHandler); + + _storeConfiguration = new PropertiesConfiguration(); + setUpStoreConfiguration(_storeConfiguration); + + _store = createMessageStore(); + _store.configureConfigStore(getTestName(), _recoveryHandler, _storeConfiguration); + _store.configureMessageStore(getTestName(), _messageStoreRecoveryHandler, _logRecoveryHandler, _storeConfiguration); + } + + protected abstract void setUpStoreConfiguration(Configuration storeConfiguration) throws Exception; + + protected abstract MessageStore createMessageStore(); + + public MessageStore getStore() + { + return _store; + } + + public Configuration getStoreConfiguration() + { + return _storeConfiguration; + } +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java new file mode 100644 index 0000000000..1747588bf1 --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java @@ -0,0 +1,82 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.derby; + +import java.io.File; + +import org.apache.commons.configuration.Configuration; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreTestCase; +import org.apache.qpid.util.FileUtils; + +public class DerbyMessageStoreTest extends MessageStoreTestCase +{ + private String _storeLocation; + + @Override + public void tearDown() throws Exception + { + try + { + deleteStoreIfExists(); + } + finally + { + super.tearDown(); + } + } + + public void testOnDelete() throws Exception + { + File location = new File(_storeLocation); + assertTrue("Store does not exist at " + _storeLocation, location.exists()); + + getStore().close(); + assertTrue("Store does not exist at " + _storeLocation, location.exists()); + + getStore().onDelete(); + assertFalse("Store exists at " + _storeLocation, location.exists()); + } + + @Override + protected void setUpStoreConfiguration(Configuration storeConfiguration) throws Exception + { + _storeLocation = TMP_FOLDER + File.separator + getTestName(); + storeConfiguration.setProperty("environment-path", _storeLocation); + deleteStoreIfExists(); + } + + private void deleteStoreIfExists() + { + File location = new File(_storeLocation); + if (location.exists()) + { + FileUtils.delete(location, true); + } + } + + @Override + protected MessageStore createMessageStore() + { + return new DerbyMessageStore(); + } + +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java new file mode 100644 index 0000000000..bb118eaaf7 --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java @@ -0,0 +1,149 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.jdbc; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashSet; +import java.util.Set; + +import org.apache.commons.configuration.Configuration; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreTestCase; +import org.apache.qpid.server.store.derby.DerbyMessageStore; + +public class JDBCMessageStoreTest extends MessageStoreTestCase +{ + private String _connectionURL; + + @Override + public void tearDown() throws Exception + { + try + { + shutdownDerby(); + } + finally + { + super.tearDown(); + } + } + + public void testOnDelete() throws Exception + { + String[] expectedTables = JDBCMessageStore.ALL_TABLES; + assertTablesExist(expectedTables, true); + getStore().close(); + assertTablesExist(expectedTables, true); + getStore().onDelete(); + assertTablesExist(expectedTables, false); + } + + @Override + protected void setUpStoreConfiguration(Configuration storeConfiguration) throws Exception + { + _connectionURL = "jdbc:derby:memory:/" + getTestName() + ";create=true"; + storeConfiguration.addProperty("connectionUrl", _connectionURL); + } + + @Override + protected MessageStore createMessageStore() + { + return new JDBCMessageStore(); + } + + private void assertTablesExist(String[] expectedTables, boolean exists) throws SQLException + { + Set<String> existingTables = getTableNames(); + for (String tableName : expectedTables) + { + assertEquals("Table " + tableName + (exists ? " is not found" : " actually exist"), exists, + existingTables.contains(tableName)); + } + } + + private Set<String> getTableNames() throws SQLException + { + Set<String> tableNames = new HashSet<String>(); + Connection conn = null; + try + { + conn = openConnection(); + DatabaseMetaData metaData = conn.getMetaData(); + ResultSet tables = metaData.getTables(null, null, null, new String[] { "TABLE" }); + try + { + while (tables.next()) + { + tableNames.add(tables.getString("TABLE_NAME")); + } + } + finally + { + tables.close(); + } + } + finally + { + if (conn != null) + { + conn.close(); + } + } + return tableNames; + } + + private Connection openConnection() throws SQLException + { + return DriverManager.getConnection(_connectionURL); + } + + + private void shutdownDerby() throws SQLException + { + Connection connection = null; + try + { + connection = DriverManager.getConnection("jdbc:derby:memory:/" + getTestName() + ";shutdown=true"); + } + catch(SQLException e) + { + if (e.getSQLState().equalsIgnoreCase(DerbyMessageStore.DERBY_SINGLE_DB_SHUTDOWN_CODE)) + { + //expected and represents a clean shutdown of this database only, do nothing. + } + else + { + throw e; + } + } + finally + { + if (connection != null) + { + connection.close(); + } + } + } +} |
