diff options
| author | Keith Wall <kwall@apache.org> | 2014-06-18 20:51:43 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2014-06-18 20:51:43 +0000 |
| commit | 326b9560c14d1c30eb71c1396858791f9187d11e (patch) | |
| tree | 34ba78548d48295e88b9e038f382bd8861f32500 /qpid/java | |
| parent | 2622dda9c7d3267efd985b5ae5928b99063d2fa7 (diff) | |
| download | qpid-python-326b9560c14d1c30eb71c1396858791f9187d11e.tar.gz | |
QPID-5800: [Java Broker] Refactor Derby/JDBC message store implementations to separate message and config store implementations.
* Message store implementations can now be used in isolation, which is useful when the user is using a JSON VirtualHostNode with
another persistent store implementation.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1603626 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
20 files changed, 2770 insertions, 2031 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCConfigurationStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCConfigurationStore.java new file mode 100644 index 0000000000..fb256abbf5 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCConfigurationStore.java @@ -0,0 +1,1013 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.store; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Types; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import org.apache.log4j.Logger; +import org.codehaus.jackson.JsonGenerationException; +import org.codehaus.jackson.JsonGenerator; +import org.codehaus.jackson.JsonParseException; +import org.codehaus.jackson.JsonProcessingException; +import org.codehaus.jackson.Version; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.JsonSerializer; +import org.codehaus.jackson.map.Module; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.SerializerProvider; +import org.codehaus.jackson.map.module.SimpleModule; + +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; + +public abstract class AbstractJDBCConfigurationStore implements MessageStoreProvider, DurableConfigurationStore +{ + private static final String CONFIGURATION_VERSION_TABLE_NAME = "QPID_CONFIG_VERSION"; + + private static final String CONFIGURED_OBJECTS_TABLE_NAME = "QPID_CONFIGURED_OBJECTS"; + private static final String CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME = "QPID_CONFIGURED_OBJECT_HIERARCHY"; + + private static final int DEFAULT_CONFIG_VERSION = 0; + + public static final Set<String> CONFIGURATION_STORE_TABLE_NAMES = new HashSet<String>(Arrays.asList(CONFIGURED_OBJECTS_TABLE_NAME, CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME)); + + private static final String SELECT_FROM_CONFIG_VERSION = "SELECT version FROM " + CONFIGURATION_VERSION_TABLE_NAME; + private static final String DROP_CONFIG_VERSION_TABLE = "DROP TABLE "+ CONFIGURATION_VERSION_TABLE_NAME; + + private static final String INSERT_INTO_CONFIGURED_OBJECTS = "INSERT INTO " + CONFIGURED_OBJECTS_TABLE_NAME + + " ( id, object_type, attributes) VALUES (?,?,?)"; + private static final String UPDATE_CONFIGURED_OBJECTS = "UPDATE " + CONFIGURED_OBJECTS_TABLE_NAME + + " set object_type =?, attributes = ? where id = ?"; + private static final String DELETE_FROM_CONFIGURED_OBJECTS = "DELETE FROM " + CONFIGURED_OBJECTS_TABLE_NAME + + " where id = ?"; + private static final String FIND_CONFIGURED_OBJECT = "SELECT object_type, attributes FROM " + CONFIGURED_OBJECTS_TABLE_NAME + + " where id = ?"; + private static final String SELECT_FROM_CONFIGURED_OBJECTS = "SELECT id, object_type, attributes FROM " + CONFIGURED_OBJECTS_TABLE_NAME; + + + private static final String INSERT_INTO_CONFIGURED_OBJECT_HIERARCHY = "INSERT INTO " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME + + " ( child_id, parent_type, parent_id) VALUES (?,?,?)"; + + private static final String DELETE_FROM_CONFIGURED_OBJECT_HIERARCHY = "DELETE FROM " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME + + " where child_id = ?"; + private static final String SELECT_FROM_CONFIGURED_OBJECT_HIERARCHY = "SELECT child_id, parent_type, parent_id FROM " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME; + + private static final Module _module; + static + { + SimpleModule module= new SimpleModule("ConfiguredObjectSerializer", new Version(1,0,0,null)); + + final JsonSerializer<ConfiguredObject> serializer = new JsonSerializer<ConfiguredObject>() + { + @Override + public void serialize(final ConfiguredObject value, + final JsonGenerator jgen, + final SerializerProvider provider) + throws IOException, JsonProcessingException + { + jgen.writeString(value.getId().toString()); + } + }; + module.addSerializer(ConfiguredObject.class, serializer); + + _module = module; + } + + @Override + public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) + { + checkConfigurationStoreOpen(); + + try + { + handler.begin(); + doVisitAllConfiguredObjectRecords(handler); + handler.end(); + } + catch (SQLException e) + { + throw new StoreException("Cannot visit configured object records", e); + } + } + + private void doVisitAllConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws SQLException + { + Connection conn = newAutoCommitConnection(); + Map<UUID, ConfiguredObjectRecordImpl> configuredObjects = new HashMap<UUID, ConfiguredObjectRecordImpl>(); + final ObjectMapper objectMapper = new ObjectMapper(); + try + { + PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS); + try + { + ResultSet rs = stmt.executeQuery(); + try + { + while (rs.next()) + { + String id = rs.getString(1); + String objectType = rs.getString(2); + String attributes = getBlobAsString(rs, 3); + final ConfiguredObjectRecordImpl configuredObjectRecord = + new ConfiguredObjectRecordImpl(UUID.fromString(id), objectType, + objectMapper.readValue(attributes, Map.class)); + configuredObjects.put(configuredObjectRecord.getId(),configuredObjectRecord); + + } + } + catch (JsonMappingException e) + { + throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); + } + catch (JsonParseException e) + { + throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); + } + catch (IOException e) + { + throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); + } + finally + { + rs.close(); + } + } + finally + { + stmt.close(); + } + stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECT_HIERARCHY); + try + { + ResultSet rs = stmt.executeQuery(); + try + { + while (rs.next()) + { + UUID childId = UUID.fromString(rs.getString(1)); + String parentType = rs.getString(2); + UUID parentId = UUID.fromString(rs.getString(3)); + + ConfiguredObjectRecordImpl child = configuredObjects.get(childId); + ConfiguredObjectRecordImpl parent = configuredObjects.get(parentId); + + if(child != null && parent != null) + { + child.addParent(parentType, parent); + } + } + } + finally + { + rs.close(); + } + } + finally + { + stmt.close(); + } + + } + finally + { + conn.close(); + } + + for(ConfiguredObjectRecord record : configuredObjects.values()) + { + boolean shouldContinue = handler.handle(record); + if (!shouldContinue) + { + break; + } + } + } + + protected abstract void checkConfigurationStoreOpen(); + + protected void upgradeIfNecessary(ConfiguredObject<?> parent) throws StoreException + { + Connection connection = null; + try + { + connection = newConnection(); + + boolean tableExists = tableExists(CONFIGURATION_VERSION_TABLE_NAME, connection); + if(tableExists) + { + int configVersion = getConfigVersion(connection); + if (getLogger().isDebugEnabled()) + { + getLogger().debug("Upgrader read existing config version " + configVersion); + } + + switch(configVersion) + { + + case 7: + upgradeFromV7(parent); + break; + default: + throw new UnsupportedOperationException("Cannot upgrade from configuration version : " + + configVersion); + } + } + } + catch (SQLException se) + { + throw new StoreException("Failed to upgrade database", se); + } + finally + { + JdbcUtils.closeConnection(connection, getLogger()); + } + + } + + private void upgradeFromV7(ConfiguredObject<?> parent) throws SQLException + { + @SuppressWarnings("serial") + Map<String, String> defaultExchanges = new HashMap<String, String>() + {{ + put("amq.direct", "direct"); + put("amq.topic", "topic"); + put("amq.fanout", "fanout"); + put("amq.match", "headers"); + }}; + + Connection connection = newConnection(); + try + { + String virtualHostName = parent.getName(); + UUID virtualHostId = UUIDGenerator.generateVhostUUID(virtualHostName); + + String stringifiedConfigVersion = "0." + DEFAULT_CONFIG_VERSION; + + boolean tableExists = tableExists(CONFIGURATION_VERSION_TABLE_NAME, connection); + if(tableExists) + { + int configVersion = getConfigVersion(connection); + if (getLogger().isDebugEnabled()) + { + getLogger().debug("Upgrader read existing config version " + configVersion); + } + + stringifiedConfigVersion = "0." + configVersion; + } + + Map<String, Object> virtualHostAttributes = new HashMap<String, Object>(); + virtualHostAttributes.put("modelVersion", stringifiedConfigVersion); + virtualHostAttributes.put("name", virtualHostName); + + ConfiguredObjectRecord virtualHostRecord = new ConfiguredObjectRecordImpl(virtualHostId, "VirtualHost", virtualHostAttributes); + insertConfiguredObject(virtualHostRecord, connection); + + if (getLogger().isDebugEnabled()) + { + getLogger().debug("Upgrader created VirtualHost configuration entry with config version " + stringifiedConfigVersion); + } + + Map<UUID,Map<String,Object>> bindingsToUpdate = new HashMap<UUID, Map<String, Object>>(); + List<UUID> others = new ArrayList<UUID>(); + final ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.registerModule(_module); + + PreparedStatement stmt = connection.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS); + try + { + ResultSet rs = stmt.executeQuery(); + try + { + while (rs.next()) + { + UUID id = UUID.fromString(rs.getString(1)); + String objectType = rs.getString(2); + if ("VirtualHost".equals(objectType)) + { + continue; + } + Map<String,Object> attributes = objectMapper.readValue(getBlobAsString(rs, 3),Map.class); + + if(objectType.endsWith("Binding")) + { + bindingsToUpdate.put(id,attributes); + } + else + { + if (objectType.equals("Exchange")) + { + defaultExchanges.remove((String)attributes.get("name")); + } + others.add(id); + } + } + } + catch (JsonMappingException e) + { + throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); + } + catch (JsonParseException e) + { + throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); + } + catch (IOException e) + { + throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); + } + finally + { + rs.close(); + } + } + finally + { + stmt.close(); + } + + stmt = connection.prepareStatement(INSERT_INTO_CONFIGURED_OBJECT_HIERARCHY); + try + { + for (UUID id : others) + { + stmt.setString(1, id.toString()); + stmt.setString(2, "VirtualHost"); + stmt.setString(3, virtualHostId.toString()); + stmt.execute(); + } + for(Map.Entry<UUID, Map<String,Object>> bindingEntry : bindingsToUpdate.entrySet()) + { + stmt.setString(1, bindingEntry.getKey().toString()); + stmt.setString(2,"Queue"); + stmt.setString(3, bindingEntry.getValue().remove("queue").toString()); + stmt.execute(); + + stmt.setString(1, bindingEntry.getKey().toString()); + stmt.setString(2,"Exchange"); + stmt.setString(3, bindingEntry.getValue().remove("exchange").toString()); + stmt.execute(); + } + } + finally + { + stmt.close(); + } + + for (Map.Entry<String, String> defaultExchangeEntry : defaultExchanges.entrySet()) + { + UUID id = UUIDGenerator.generateExchangeUUID(defaultExchangeEntry.getKey(), virtualHostName); + Map<String, Object> exchangeAttributes = new HashMap<String, Object>(); + exchangeAttributes.put("name", defaultExchangeEntry.getKey()); + exchangeAttributes.put("type", defaultExchangeEntry.getValue()); + exchangeAttributes.put("lifetimePolicy", "PERMANENT"); + Map<String, ConfiguredObjectRecord> parents = Collections.singletonMap("VirtualHost", virtualHostRecord); + ConfiguredObjectRecord exchangeRecord = new org.apache.qpid.server.store.ConfiguredObjectRecordImpl(id, "Exchange", exchangeAttributes, parents); + insertConfiguredObject(exchangeRecord, connection); + } + + stmt = connection.prepareStatement(UPDATE_CONFIGURED_OBJECTS); + try + { + for(Map.Entry<UUID, Map<String,Object>> bindingEntry : bindingsToUpdate.entrySet()) + { + stmt.setString(1, "Binding"); + byte[] attributesAsBytes = objectMapper.writeValueAsBytes(bindingEntry.getValue()); + + ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes); + stmt.setBinaryStream(2, bis, attributesAsBytes.length); + stmt.setString(3, bindingEntry.getKey().toString()); + stmt.execute(); + } + } + catch (JsonMappingException e) + { + throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); + } + catch (JsonGenerationException e) + { + throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); + } + catch (IOException e) + { + throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); + } + finally + { + stmt.close(); + } + + if (tableExists) + { + dropConfigVersionTable(connection); + } + + connection.commit(); + } + catch(SQLException e) + { + try + { + connection.rollback(); + } + catch(SQLException re) + { + } + throw e; + } + finally + { + connection.close(); + } + } + + protected abstract Logger getLogger(); + + protected abstract String getSqlBlobType(); + + protected abstract String getSqlVarBinaryType(int size); + + protected abstract String getSqlBigIntType(); + + + protected void createOrOpenConfigurationStoreDatabase() throws StoreException + { + Connection conn = null; + try + { + conn = newAutoCommitConnection(); + + createConfiguredObjectsTable(conn); + createConfiguredObjectHierarchyTable(conn); + } + catch (SQLException e) + { + throw new StoreException("Unable to open configuration tables", e); + } + finally + { + JdbcUtils.closeConnection(conn, getLogger()); + } + } + + private void dropConfigVersionTable(final Connection conn) throws SQLException + { + if(!tableExists(CONFIGURATION_VERSION_TABLE_NAME, conn)) + { + Statement stmt = conn.createStatement(); + try + { + stmt.execute(DROP_CONFIG_VERSION_TABLE); + } + finally + { + stmt.close(); + } + } + } + + private void createConfiguredObjectsTable(final Connection conn) throws SQLException + { + if(!tableExists(CONFIGURED_OBJECTS_TABLE_NAME, conn)) + { + Statement stmt = conn.createStatement(); + try + { + stmt.execute("CREATE TABLE " + CONFIGURED_OBJECTS_TABLE_NAME + + " ( id VARCHAR(36) not null, object_type varchar(255), attributes "+getSqlBlobType()+", PRIMARY KEY (id))"); + } + finally + { + stmt.close(); + } + } + } + + private void createConfiguredObjectHierarchyTable(final Connection conn) throws SQLException + { + if(!tableExists(CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME, conn)) + { + Statement stmt = conn.createStatement(); + try + { + stmt.execute("CREATE TABLE " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME + + " ( child_id VARCHAR(36) not null, parent_type varchar(255), parent_id VARCHAR(36), PRIMARY KEY (child_id, parent_type))"); + } + finally + { + stmt.close(); + } + } + } + + protected boolean tableExists(final String tableName, final Connection conn) throws SQLException + { + return JdbcUtils.tableExists(tableName, conn); + } + + private int getConfigVersion(Connection conn) throws SQLException + { + Statement stmt = conn.createStatement(); + try + { + ResultSet rs = stmt.executeQuery(SELECT_FROM_CONFIG_VERSION); + try + { + + if(rs.next()) + { + return rs.getInt(1); + } + return DEFAULT_CONFIG_VERSION; + } + finally + { + rs.close(); + } + + } + finally + { + stmt.close(); + } + + } + + @Override + public void create(ConfiguredObjectRecord object) throws StoreException + { + checkConfigurationStoreOpen(); + try + { + Connection conn = newConnection(); + try + { + insertConfiguredObject(object, conn); + conn.commit(); + } + finally + { + conn.close(); + } + } + catch (SQLException e) + { + throw new StoreException("Error creating ConfiguredObject " + object); + } + } + + /** + * Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED + * isolation and with auto-commit transactions enabled. + */ + protected Connection newAutoCommitConnection() throws SQLException + { + final Connection connection = newConnection(); + try + { + connection.setAutoCommit(true); + } + catch (SQLException sqlEx) + { + + try + { + connection.close(); + } + finally + { + throw sqlEx; + } + } + + return connection; + } + + /** + * Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED + * isolation and with auto-commit transactions disabled. + */ + protected Connection newConnection() throws SQLException + { + final Connection connection = getConnection(); + try + { + connection.setAutoCommit(false); + connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); + } + catch (SQLException sqlEx) + { + try + { + connection.close(); + } + finally + { + throw sqlEx; + } + } + return connection; + } + + protected abstract Connection getConnection() throws SQLException; + + private void insertConfiguredObject(ConfiguredObjectRecord configuredObject, final Connection conn) throws StoreException + { + try + { + PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT); + try + { + stmt.setString(1, configuredObject.getId().toString()); + ResultSet rs = stmt.executeQuery(); + boolean exists; + try + { + exists = rs.next(); + + } + finally + { + rs.close(); + } + // If we don't have any data in the result set then we can add this configured object + if (!exists) + { + PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS); + try + { + insertStmt.setString(1, configuredObject.getId().toString()); + insertStmt.setString(2, configuredObject.getType()); + if(configuredObject.getAttributes() == null) + { + insertStmt.setNull(3, Types.BLOB); + } + else + { + final Map<String, Object> attributes = configuredObject.getAttributes(); + final ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.registerModule(_module); + byte[] attributesAsBytes = objectMapper.writeValueAsBytes(attributes); + + ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes); + insertStmt.setBinaryStream(3, bis, attributesAsBytes.length); + } + insertStmt.execute(); + } + finally + { + insertStmt.close(); + } + + writeHierarchy(configuredObject, conn); + } + + } + finally + { + stmt.close(); + } + + } + catch (JsonMappingException e) + { + throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e); + } + catch (JsonGenerationException e) + { + throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e); + } + catch (IOException e) + { + throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e); + } + catch (SQLException e) + { + throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e); + } + } + + @Override + public UUID[] remove(ConfiguredObjectRecord... objects) throws StoreException + { + checkConfigurationStoreOpen(); + + Collection<UUID> removed = new ArrayList<UUID>(objects.length); + try + { + + Connection conn = newAutoCommitConnection(); + try + { + for(ConfiguredObjectRecord record : objects) + { + if(removeConfiguredObject(record.getId(), conn) != 0) + { + removed.add(record.getId()); + } + } + } + finally + { + conn.close(); + } + } + catch (SQLException e) + { + throw new StoreException("Error deleting of configured objects " + Arrays.asList(objects) + " from database: " + e.getMessage(), e); + } + return removed.toArray(new UUID[removed.size()]); + } + + private int removeConfiguredObject(final UUID id, final Connection conn) throws SQLException + { + final int results; + PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECTS); + try + { + stmt.setString(1, id.toString()); + results = stmt.executeUpdate(); + } + finally + { + stmt.close(); + } + stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECT_HIERARCHY); + try + { + stmt.setString(1, id.toString()); + stmt.executeUpdate(); + } + finally + { + stmt.close(); + } + + return results; + } + + @Override + public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException + { + checkConfigurationStoreOpen(); + try + { + Connection conn = newConnection(); + try + { + for(ConfiguredObjectRecord record : records) + { + updateConfiguredObject(record, createIfNecessary, conn); + } + conn.commit(); + } + finally + { + conn.close(); + } + } + catch (SQLException e) + { + throw new StoreException("Error updating configured objects in database: " + e.getMessage(), e); + } + } + + private void updateConfiguredObject(ConfiguredObjectRecord configuredObject, + boolean createIfNecessary, + Connection conn) + throws SQLException, StoreException + { + PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT); + try + { + stmt.setString(1, configuredObject.getId().toString()); + ResultSet rs = stmt.executeQuery(); + try + { + final ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.registerModule(_module); + if (rs.next()) + { + PreparedStatement stmt2 = conn.prepareStatement(UPDATE_CONFIGURED_OBJECTS); + try + { + stmt2.setString(1, configuredObject.getType()); + if (configuredObject.getAttributes() != null) + { + byte[] attributesAsBytes = objectMapper.writeValueAsBytes( + configuredObject.getAttributes()); + ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes); + stmt2.setBinaryStream(2, bis, attributesAsBytes.length); + } + else + { + stmt2.setNull(2, Types.BLOB); + } + stmt2.setString(3, configuredObject.getId().toString()); + stmt2.execute(); + } + finally + { + stmt2.close(); + } + } + else if(createIfNecessary) + { + PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS); + try + { + insertStmt.setString(1, configuredObject.getId().toString()); + insertStmt.setString(2, configuredObject.getType()); + if(configuredObject.getAttributes() == null) + { + insertStmt.setNull(3, Types.BLOB); + } + else + { + final Map<String, Object> attributes = configuredObject.getAttributes(); + byte[] attributesAsBytes = objectMapper.writeValueAsBytes(attributes); + ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes); + insertStmt.setBinaryStream(3, bis, attributesAsBytes.length); + } + insertStmt.execute(); + } + finally + { + insertStmt.close(); + } + writeHierarchy(configuredObject, conn); + } + } + finally + { + rs.close(); + } + } + catch (JsonMappingException e) + { + throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e); + } + catch (JsonGenerationException e) + { + throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e); + } + catch (IOException e) + { + throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e); + } + finally + { + stmt.close(); + } + } + + private void writeHierarchy(final ConfiguredObjectRecord configuredObject, final Connection conn) throws SQLException, StoreException + { + PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECT_HIERARCHY); + try + { + for(Map.Entry<String,ConfiguredObjectRecord> parentEntry : configuredObject.getParents().entrySet()) + { + insertStmt.setString(1, configuredObject.getId().toString()); + insertStmt.setString(2, parentEntry.getKey()); + insertStmt.setString(3, parentEntry.getValue().getId().toString()); + + insertStmt.execute(); + } + } + finally + { + insertStmt.close(); + } + } + + protected abstract String getBlobAsString(ResultSet rs, int col) throws SQLException; + + @Override + public void onDelete() + { + // TODO should probably check we are closed + try + { + Connection conn = newAutoCommitConnection(); + try + { + List<String> tables = new ArrayList<String>(); + tables.addAll(CONFIGURATION_STORE_TABLE_NAMES); + + for (String tableName : tables) + { + Statement stmt = conn.createStatement(); + try + { + stmt.execute("DROP TABLE " + tableName); + } + catch(SQLException e) + { + getLogger().warn("Failed to drop table '" + tableName + "' :" + e); + } + finally + { + stmt.close(); + } + } + } + finally + { + conn.close(); + } + } + catch(SQLException e) + { + getLogger().error("Exception while deleting store tables", e); + } + } + + private static final class ConfiguredObjectRecordImpl implements ConfiguredObjectRecord + { + + private final UUID _id; + private final String _type; + private final Map<String, Object> _attributes; + private final Map<String, ConfiguredObjectRecord> _parents = new HashMap<String, ConfiguredObjectRecord>(); + + private ConfiguredObjectRecordImpl(final UUID id, + final String type, + final Map<String, Object> attributes) + { + _id = id; + _type = type; + _attributes = Collections.unmodifiableMap(attributes); + } + + @Override + public UUID getId() + { + return _id; + } + + @Override + public String getType() + { + return _type; + } + + private void addParent(String parentType, ConfiguredObjectRecord parent) + { + _parents.put(parentType, parent); + } + + @Override + public Map<String, Object> getAttributes() + { + return _attributes; + } + + @Override + public Map<String, ConfiguredObjectRecord> getParents() + { + return Collections.unmodifiableMap(_parents); + } + + @Override + public String toString() + { + return "ConfiguredObjectRecordImpl [_id=" + _id + ", _type=" + _type + ", _attributes=" + _attributes + ", _parents=" + + _parents + "]"; + } + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index 24563bae61..7487315000 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -24,54 +24,32 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.lang.ref.SoftReference; import java.nio.ByteBuffer; -import java.nio.charset.Charset; import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; -import java.sql.Types; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.log4j.Logger; -import org.codehaus.jackson.JsonGenerationException; -import org.codehaus.jackson.JsonGenerator; -import org.codehaus.jackson.JsonParseException; -import org.codehaus.jackson.JsonProcessingException; -import org.codehaus.jackson.Version; -import org.codehaus.jackson.map.JsonMappingException; -import org.codehaus.jackson.map.JsonSerializer; -import org.codehaus.jackson.map.Module; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.map.SerializerProvider; -import org.codehaus.jackson.map.module.SimpleModule; import org.apache.qpid.server.message.EnqueueableMessage; -import org.apache.qpid.server.model.BrokerModel; import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.MessageMetaDataType; -import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; import org.apache.qpid.server.store.handler.DistributedTransactionHandler; import org.apache.qpid.server.store.handler.MessageHandler; import org.apache.qpid.server.store.handler.MessageInstanceHandler; -abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, DurableConfigurationStore +public abstract class AbstractJDBCMessageStore implements MessageStore { private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION"; - private static final String CONFIGURATION_VERSION_TABLE_NAME = "QPID_CONFIG_VERSION"; private static final String QUEUE_ENTRY_TABLE_NAME = "QPID_QUEUE_ENTRIES"; @@ -82,16 +60,10 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, private static final String XID_TABLE_NAME = "QPID_XIDS"; private static final String XID_ACTIONS_TABLE_NAME = "QPID_XID_ACTIONS"; - private static final String CONFIGURED_OBJECTS_TABLE_NAME = "QPID_CONFIGURED_OBJECTS"; - private static final String CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME = "QPID_CONFIGURED_OBJECT_HIERARCHY"; - - private static final int DEFAULT_CONFIG_VERSION = 0; - - public static final Set<String> CONFIGURATION_STORE_TABLE_NAMES = new HashSet<String>(Arrays.asList(CONFIGURED_OBJECTS_TABLE_NAME, CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME)); public static final Set<String> MESSAGE_STORE_TABLE_NAMES = new HashSet<String>(Arrays.asList(DB_VERSION_TABLE_NAME, META_DATA_TABLE_NAME, MESSAGE_CONTENT_TABLE_NAME, - QUEUE_ENTRY_TABLE_NAME, - XID_TABLE_NAME, XID_ACTIONS_TABLE_NAME)); + QUEUE_ENTRY_TABLE_NAME, + XID_TABLE_NAME, XID_ACTIONS_TABLE_NAME)); private static final int DB_VERSION = 8; @@ -103,19 +75,15 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, private static final String UPDATE_DB_VERSION = "UPDATE " + DB_VERSION_TABLE_NAME + " SET version = ?"; - private static final String SELECT_FROM_CONFIG_VERSION = "SELECT version FROM " + CONFIGURATION_VERSION_TABLE_NAME; - private static final String DROP_CONFIG_VERSION_TABLE = "DROP TABLE "+ CONFIGURATION_VERSION_TABLE_NAME; - - private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_id, message_id) values (?,?)"; private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_id = ? AND message_id =?"; private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_id, message_id FROM " + QUEUE_ENTRY_TABLE_NAME + " ORDER BY queue_id, message_id"; private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME - + "( message_id, content ) values (?, ?)"; + + "( message_id, content ) values (?, ?)"; private static final String SELECT_FROM_MESSAGE_CONTENT = "SELECT content FROM " + MESSAGE_CONTENT_TABLE_NAME - + " WHERE message_id = ?"; + + " WHERE message_id = ?"; private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME - + " WHERE message_id = ?"; + + " WHERE message_id = ?"; private static final String INSERT_INTO_META_DATA = "INSERT INTO " + META_DATA_TABLE_NAME + "( message_id , meta_data ) values (?, ?)"; private static final String SELECT_FROM_META_DATA = @@ -126,282 +94,63 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, private static final String INSERT_INTO_XIDS = "INSERT INTO "+ XID_TABLE_NAME +" ( format, global_id, branch_id ) values (?, ?, ?)"; private static final String DELETE_FROM_XIDS = "DELETE FROM " + XID_TABLE_NAME - + " WHERE format = ? and global_id = ? and branch_id = ?"; + + " WHERE format = ? and global_id = ? and branch_id = ?"; private static final String SELECT_ALL_FROM_XIDS = "SELECT format, global_id, branch_id FROM " + XID_TABLE_NAME; private static final String INSERT_INTO_XID_ACTIONS = "INSERT INTO "+ XID_ACTIONS_TABLE_NAME +" ( format, global_id, branch_id, action_type, " + "queue_id, message_id ) values (?,?,?,?,?,?) "; private static final String DELETE_FROM_XID_ACTIONS = "DELETE FROM " + XID_ACTIONS_TABLE_NAME - + " WHERE format = ? and global_id = ? and branch_id = ?"; + + " WHERE format = ? and global_id = ? and branch_id = ?"; private static final String SELECT_ALL_FROM_XID_ACTIONS = "SELECT action_type, queue_id, message_id FROM " + XID_ACTIONS_TABLE_NAME + " WHERE format = ? and global_id = ? and branch_id = ?"; - private static final String INSERT_INTO_CONFIGURED_OBJECTS = "INSERT INTO " + CONFIGURED_OBJECTS_TABLE_NAME - + " ( id, object_type, attributes) VALUES (?,?,?)"; - private static final String UPDATE_CONFIGURED_OBJECTS = "UPDATE " + CONFIGURED_OBJECTS_TABLE_NAME - + " set object_type =?, attributes = ? where id = ?"; - private static final String DELETE_FROM_CONFIGURED_OBJECTS = "DELETE FROM " + CONFIGURED_OBJECTS_TABLE_NAME - + " where id = ?"; - private static final String FIND_CONFIGURED_OBJECT = "SELECT object_type, attributes FROM " + CONFIGURED_OBJECTS_TABLE_NAME - + " where id = ?"; - private static final String SELECT_FROM_CONFIGURED_OBJECTS = "SELECT id, object_type, attributes FROM " + CONFIGURED_OBJECTS_TABLE_NAME; - - - private static final String INSERT_INTO_CONFIGURED_OBJECT_HIERARCHY = "INSERT INTO " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME - + " ( child_id, parent_type, parent_id) VALUES (?,?,?)"; - - private static final String DELETE_FROM_CONFIGURED_OBJECT_HIERARCHY = "DELETE FROM " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME - + " where child_id = ?"; - private static final String SELECT_FROM_CONFIGURED_OBJECT_HIERARCHY = "SELECT child_id, parent_type, parent_id FROM " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME; - - protected static final Charset UTF8_CHARSET = Charset.forName("UTF-8"); - - - private static final Module _module; - static - { - SimpleModule module= new SimpleModule("ConfiguredObjectSerializer", new Version(1,0,0,null)); - - final JsonSerializer<ConfiguredObject> serializer = new JsonSerializer<ConfiguredObject>() - { - @Override - public void serialize(final ConfiguredObject value, - final JsonGenerator jgen, - final SerializerProvider provider) - throws IOException, JsonProcessingException - { - jgen.writeString(value.getId().toString()); - } - }; - module.addSerializer(ConfiguredObject.class, serializer); - - _module = module; - } protected final EventManager _eventManager = new EventManager(); - private final AtomicBoolean _messageStoreOpen = new AtomicBoolean(); - private final AtomicBoolean _configurationStoreOpen = new AtomicBoolean(); + protected abstract boolean isMessageStoreOpen(); - private boolean _initialized; + protected abstract void checkMessageStoreOpen(); - @Override - public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings) + protected void setMaximumMessageId() { - if (_configurationStoreOpen.compareAndSet(false, true)) + visitMessages(new MessageHandler() { - initialiseIfNecessary(parent.getName(), storeSettings); - try - { - createOrOpenConfigurationStoreDatabase(); - upgradeIfVersionTableExists(parent); - } - catch(SQLException e) - { - throw new StoreException("Cannot create databases or upgrade", e); - } - } - } - - private void initialiseIfNecessary(String virtualHostName, Map<String, Object> storeSettings) - { - if (!_initialized) - { - try - { - implementationSpecificConfiguration(virtualHostName, storeSettings); - } - catch (ClassNotFoundException e) - { - throw new StoreException("Cannot find driver class", e); - } - catch (SQLException e) - { - throw new StoreException("Unexpected exception occured", e); - } - _initialized = true; - } - } - - @Override - public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) - { - checkConfigurationStoreOpen(); - - try - { - handler.begin(); - doVisitAllConfiguredObjectRecords(handler); - handler.end(); - } - catch (SQLException e) - { - throw new StoreException("Cannot visit configured object records", e); - } - - } - - private void doVisitAllConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws SQLException - { - Connection conn = newAutoCommitConnection(); - Map<UUID, ConfiguredObjectRecordImpl> configuredObjects = new HashMap<UUID, ConfiguredObjectRecordImpl>(); - final ObjectMapper objectMapper = new ObjectMapper(); - try - { - PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS); - try - { - ResultSet rs = stmt.executeQuery(); - try - { - while (rs.next()) - { - String id = rs.getString(1); - String objectType = rs.getString(2); - String attributes = getBlobAsString(rs, 3); - final ConfiguredObjectRecordImpl configuredObjectRecord = - new ConfiguredObjectRecordImpl(UUID.fromString(id), objectType, - objectMapper.readValue(attributes, Map.class)); - configuredObjects.put(configuredObjectRecord.getId(),configuredObjectRecord); - - } - } - catch (JsonMappingException e) - { - throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); - } - catch (JsonParseException e) - { - throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); - } - catch (IOException e) - { - throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); - } - finally - { - rs.close(); - } - } - finally - { - stmt.close(); - } - stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECT_HIERARCHY); - try + @Override + public boolean handle(StoredMessage<?> storedMessage) { - ResultSet rs = stmt.executeQuery(); - try - { - while (rs.next()) - { - UUID childId = UUID.fromString(rs.getString(1)); - String parentType = rs.getString(2); - UUID parentId = UUID.fromString(rs.getString(3)); - - ConfiguredObjectRecordImpl child = configuredObjects.get(childId); - ConfiguredObjectRecordImpl parent = configuredObjects.get(parentId); - - if(child != null && parent != null) - { - child.addParent(parentType, parent); - } - } - } - finally + long id = storedMessage.getMessageNumber(); + if (_messageId.get() < id) { - rs.close(); + _messageId.set(id); } + return true; } - finally - { - stmt.close(); - } - - } - finally - { - conn.close(); - } - - for(ConfiguredObjectRecord record : configuredObjects.values()) - { - boolean shoudlContinue = handler.handle(record); - if (!shoudlContinue) - { - break; - } - } - } - - private void checkConfigurationStoreOpen() - { - if (!_configurationStoreOpen.get()) - { - throw new IllegalStateException("Configuration store is not open"); - } + }); } - private void checkMessageStoreOpen() + protected void upgrade(ConfiguredObject<?> parent) throws StoreException { - if (!_messageStoreOpen.get()) - { - throw new IllegalStateException("Message store is not open"); - } - } - - private void upgradeIfVersionTableExists(ConfiguredObject<?> parent) - throws SQLException { - Connection conn = newAutoCommitConnection(); + Connection conn = null; try { + conn = newAutoCommitConnection(); if (tableExists(DB_VERSION_TABLE_NAME, conn)) { upgradeIfNecessary(parent); } } - finally + catch (SQLException e) { - if (conn != null) - { - conn.close(); - } + throw new StoreException("Failed to upgrade database", e); } - } - - public void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings) - { - if (_messageStoreOpen.compareAndSet(false, true)) + finally { - initialiseIfNecessary(parent.getName(), messageStoreSettings); - try - { - createOrOpenMessageStoreDatabase(); - upgradeIfNecessary(parent); - - visitMessages(new MessageHandler() - { - @Override - public boolean handle(StoredMessage<?> storedMessage) - { - long id = storedMessage.getMessageNumber(); - if (_messageId.get() < id) - { - _messageId.set(id); - } - return true; - } - }); - } - catch (SQLException e) - { - throw new StoreException("Unable to activate message store ", e); - } + JdbcUtils.closeConnection(conn, getLogger()); } } - protected void upgradeIfNecessary(ConfiguredObject<?> parent) throws SQLException + private void upgradeIfNecessary(ConfiguredObject<?> parent) throws SQLException { Connection conn = newAutoCommitConnection(); try @@ -423,7 +172,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, case 6: upgradeFromV6(); case 7: - upgradeFromV7(parent); + upgradeFromV7(); case DB_VERSION: return; default: @@ -447,214 +196,13 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, } - private void upgradeFromV6() throws SQLException + private void upgradeFromV7() { - updateDbVersion(7); } - - private void upgradeFromV7(ConfiguredObject<?> parent) throws SQLException + private void upgradeFromV6() throws SQLException { - @SuppressWarnings("serial") - Map<String, String> defaultExchanges = new HashMap<String, String>() - {{ - put("amq.direct", "direct"); - put("amq.topic", "topic"); - put("amq.fanout", "fanout"); - put("amq.match", "headers"); - }}; - - Connection connection = newConnection(); - try - { - String virtualHostName = parent.getName(); - UUID virtualHostId = UUIDGenerator.generateVhostUUID(virtualHostName); - - String stringifiedConfigVersion = BrokerModel.MODEL_VERSION; - boolean tableExists = tableExists(CONFIGURATION_VERSION_TABLE_NAME, connection); - if(tableExists) - { - int configVersion = getConfigVersion(connection); - if (getLogger().isDebugEnabled()) - { - getLogger().debug("Upgrader read existing config version " + configVersion); - } - - stringifiedConfigVersion = "0." + configVersion; - } - - Map<String, Object> virtualHostAttributes = new HashMap<String, Object>(); - virtualHostAttributes.put("modelVersion", stringifiedConfigVersion); - virtualHostAttributes.put("name", virtualHostName); - - ConfiguredObjectRecord virtualHostRecord = new ConfiguredObjectRecordImpl(virtualHostId, "VirtualHost", virtualHostAttributes); - insertConfiguredObject(virtualHostRecord, connection); - - if (getLogger().isDebugEnabled()) - { - getLogger().debug("Upgrader created VirtualHost configuration entry with config version " + stringifiedConfigVersion); - } - - Map<UUID,Map<String,Object>> bindingsToUpdate = new HashMap<UUID, Map<String, Object>>(); - List<UUID> others = new ArrayList<UUID>(); - final ObjectMapper objectMapper = new ObjectMapper(); - objectMapper.registerModule(_module); - - PreparedStatement stmt = connection.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS); - try - { - ResultSet rs = stmt.executeQuery(); - try - { - while (rs.next()) - { - UUID id = UUID.fromString(rs.getString(1)); - String objectType = rs.getString(2); - if ("VirtualHost".equals(objectType)) - { - continue; - } - Map<String,Object> attributes = objectMapper.readValue(getBlobAsString(rs, 3),Map.class); - - if(objectType.endsWith("Binding")) - { - bindingsToUpdate.put(id,attributes); - } - else - { - if (objectType.equals("Exchange")) - { - defaultExchanges.remove((String)attributes.get("name")); - } - others.add(id); - } - } - } - catch (JsonMappingException e) - { - throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); - } - catch (JsonParseException e) - { - throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); - } - catch (IOException e) - { - throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); - } - finally - { - rs.close(); - } - } - finally - { - stmt.close(); - } - - stmt = connection.prepareStatement(INSERT_INTO_CONFIGURED_OBJECT_HIERARCHY); - try - { - for (UUID id : others) - { - stmt.setString(1, id.toString()); - stmt.setString(2, "VirtualHost"); - stmt.setString(3, virtualHostId.toString()); - stmt.execute(); - } - for(Map.Entry<UUID, Map<String,Object>> bindingEntry : bindingsToUpdate.entrySet()) - { - stmt.setString(1, bindingEntry.getKey().toString()); - stmt.setString(2,"Queue"); - stmt.setString(3, bindingEntry.getValue().remove("queue").toString()); - stmt.execute(); - - stmt.setString(1, bindingEntry.getKey().toString()); - stmt.setString(2,"Exchange"); - stmt.setString(3, bindingEntry.getValue().remove("exchange").toString()); - stmt.execute(); - } - } - finally - { - stmt.close(); - } - - for (Map.Entry<String, String> defaultExchangeEntry : defaultExchanges.entrySet()) - { - UUID id = UUIDGenerator.generateExchangeUUID(defaultExchangeEntry.getKey(), virtualHostName); - Map<String, Object> exchangeAttributes = new HashMap<String, Object>(); - exchangeAttributes.put("name", defaultExchangeEntry.getKey()); - exchangeAttributes.put("type", defaultExchangeEntry.getValue()); - exchangeAttributes.put("lifetimePolicy", "PERMANENT"); - Map<String, ConfiguredObjectRecord> parents = Collections.singletonMap("VirtualHost", virtualHostRecord); - ConfiguredObjectRecord exchangeRecord = new org.apache.qpid.server.store.ConfiguredObjectRecordImpl(id, "Exchange", exchangeAttributes, parents); - insertConfiguredObject(exchangeRecord, connection); - } - - stmt = connection.prepareStatement(UPDATE_CONFIGURED_OBJECTS); - try - { - for(Map.Entry<UUID, Map<String,Object>> bindingEntry : bindingsToUpdate.entrySet()) - { - stmt.setString(1, "Binding"); - byte[] attributesAsBytes = objectMapper.writeValueAsBytes(bindingEntry.getValue()); - - ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes); - stmt.setBinaryStream(2, bis, attributesAsBytes.length); - stmt.setString(3, bindingEntry.getKey().toString()); - stmt.execute(); - } - } - catch (JsonMappingException e) - { - throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); - } - catch (JsonGenerationException e) - { - throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); - } - catch (IOException e) - { - throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); - } - finally - { - stmt.close(); - } - stmt = connection.prepareStatement(UPDATE_DB_VERSION); - try - { - stmt.setInt(1, 8); - stmt.execute(); - } - finally - { - stmt.close(); - } - - if (tableExists) - { - dropConfigVersionTable(connection); - } - - connection.commit(); - } - catch(SQLException e) - { - try - { - connection.rollback(); - } - catch(SQLException re) - { - } - throw e; - } - finally - { - connection.close(); - } + updateDbVersion(7); } private void updateDbVersion(int newVersion) throws SQLException @@ -680,44 +228,36 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, } } - protected abstract void implementationSpecificConfiguration(String name, Map<String, Object> messageStoreSettings) throws ClassNotFoundException, SQLException; + protected abstract Logger getLogger(); - abstract protected Logger getLogger(); + protected abstract String getSqlBlobType(); - abstract protected String getSqlBlobType(); - - abstract protected String getSqlVarBinaryType(int size); - - abstract protected String getSqlBigIntType(); - - @Override - public void upgradeStoreStructure() throws StoreException - { - // TODO acquire connection to the database using the attribute of the parents, - // run the upgrader in a transaction, close the connection. - } - - protected void createOrOpenMessageStoreDatabase() throws SQLException - { - Connection conn = newAutoCommitConnection(); + protected abstract String getSqlVarBinaryType(int size); - createVersionTable(conn); - createQueueEntryTable(conn); - createMetaDataTable(conn); - createMessageContentTable(conn); - createXidTable(conn); - createXidActionTable(conn); - conn.close(); - } + protected abstract String getSqlBigIntType(); - protected void createOrOpenConfigurationStoreDatabase() throws SQLException + protected void createOrOpenMessageStoreDatabase() throws StoreException { - Connection conn = newAutoCommitConnection(); - - createConfiguredObjectsTable(conn); - createConfiguredObjectHierarchyTable(conn); + Connection conn = null; + try + { + conn = newAutoCommitConnection(); - conn.close(); + createVersionTable(conn); + createQueueEntryTable(conn); + createMetaDataTable(conn); + createMessageContentTable(conn); + createXidTable(conn); + createXidActionTable(conn); + } + catch (SQLException e) + { + throw new StoreException("Failed to create message store tables", e); + } + finally + { + JdbcUtils.closeConnection(conn, getLogger()); + } } private void createVersionTable(final Connection conn) throws SQLException @@ -747,56 +287,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, } } - private void dropConfigVersionTable(final Connection conn) throws SQLException - { - if(!tableExists(CONFIGURATION_VERSION_TABLE_NAME, conn)) - { - Statement stmt = conn.createStatement(); - try - { - stmt.execute(DROP_CONFIG_VERSION_TABLE); - } - finally - { - stmt.close(); - } - } - } - - private void createConfiguredObjectsTable(final Connection conn) throws SQLException - { - if(!tableExists(CONFIGURED_OBJECTS_TABLE_NAME, conn)) - { - Statement stmt = conn.createStatement(); - try - { - stmt.execute("CREATE TABLE " + CONFIGURED_OBJECTS_TABLE_NAME - + " ( id VARCHAR(36) not null, object_type varchar(255), attributes "+getSqlBlobType()+", PRIMARY KEY (id))"); - } - finally - { - stmt.close(); - } - } - } - - private void createConfiguredObjectHierarchyTable(final Connection conn) throws SQLException - { - if(!tableExists(CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME, conn)) - { - Statement stmt = conn.createStatement(); - try - { - stmt.execute("CREATE TABLE " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME - + " ( child_id VARCHAR(36) not null, parent_type varchar(255), parent_id VARCHAR(36), PRIMARY KEY (child_id, parent_type))"); - } - finally - { - stmt.close(); - } - } - } - private void createQueueEntryTable(final Connection conn) throws SQLException { if(!tableExists(QUEUE_ENTRY_TABLE_NAME, conn)) @@ -805,7 +295,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, try { stmt.execute("CREATE TABLE "+ QUEUE_ENTRY_TABLE_NAME +" ( queue_id varchar(36) not null, message_id " - + getSqlBigIntType() + " not null, PRIMARY KEY (queue_id, message_id) )"); + + getSqlBigIntType() + " not null, PRIMARY KEY (queue_id, message_id) )"); } finally { @@ -909,81 +399,10 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, protected boolean tableExists(final String tableName, final Connection conn) throws SQLException { - DatabaseMetaData metaData = conn.getMetaData(); - ResultSet rs = metaData.getTables(null, null, "%", null); - - try - { - - while(rs.next()) - { - final String table = rs.getString(3); - if(tableName.equalsIgnoreCase(table)) - { - return true; - } - } - return false; - } - finally - { - rs.close(); - } - } - - private int getConfigVersion(Connection conn) throws SQLException - { - Statement stmt = conn.createStatement(); - try - { - ResultSet rs = stmt.executeQuery(SELECT_FROM_CONFIG_VERSION); - try - { - - if(rs.next()) - { - return rs.getInt(1); - } - return DEFAULT_CONFIG_VERSION; - } - finally - { - rs.close(); - } - - } - finally - { - stmt.close(); - } - - } - - public void closeMessageStore() - { - if (_messageStoreOpen.compareAndSet(true, false)) - { - if (!_configurationStoreOpen.get()) - { - doClose(); - } - } + return JdbcUtils.tableExists(tableName, conn); } @Override - public void closeConfigurationStore() - { - if (_configurationStoreOpen.compareAndSet(true, false)) - { - if (!_messageStoreOpen.get()) - { - doClose(); - } - } - } - - protected abstract void doClose(); - public StoredMessage addMessage(StorableMessageMetaData metaData) { checkMessageStoreOpen(); @@ -1058,30 +477,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, } - - @Override - public void create(ConfiguredObjectRecord object) throws StoreException - { - checkConfigurationStoreOpen(); - try - { - Connection conn = newConnection(); - try - { - insertConfiguredObject(object, conn); - conn.commit(); - } - finally - { - conn.close(); - } - } - catch (SQLException e) - { - throw new StoreException("Error creating ConfiguredObject " + object); - } - } - /** * Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED * isolation and with auto-commit transactions enabled. @@ -1137,6 +532,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, protected abstract Connection getConnection() throws SQLException; + @Override public Transaction newTransaction() { checkMessageStoreOpen(); @@ -1154,13 +550,13 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, if (getLogger().isDebugEnabled()) { getLogger().debug("Enqueuing message " - + messageId - + " on queue " - + queue.getName() - + " with id " + queue.getId() - + " [Connection" - + conn - + "]"); + + messageId + + " on queue " + + queue.getName() + + " with id " + queue.getId() + + " [Connection" + + conn + + "]"); } PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY); @@ -1180,7 +576,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, { getLogger().error("Failed to enqueue: " + e.getMessage(), e); throw new StoreException("Error writing enqueued message with id " + messageId + " for queue " + queue.getName() + " with id " + queue.getId() - + " to database", e); + + " to database", e); } } @@ -1205,13 +601,13 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, if(results != 1) { throw new StoreException("Unable to find message with id " + messageId + " on queue " + queue.getName() - + " with id " + queue.getId()); + + " with id " + queue.getId()); } if (getLogger().isDebugEnabled()) { getLogger().debug("Dequeuing message " + messageId + " on queue " + queue.getName() - + " with id " + queue.getId()); + + " with id " + queue.getId()); } } finally @@ -1224,7 +620,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, { getLogger().error("Failed to dequeue: " + e.getMessage(), e); throw new StoreException("Error deleting enqueued message with id " + messageId + " for queue " + queue.getName() - + " with id " + queue.getId() + " from database", e); + + " with id " + queue.getId() + " from database", e); } } @@ -1420,7 +816,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, } private void storeMetaData(Connection conn, long messageId, StorableMessageMetaData metaData) - throws SQLException + throws SQLException { if(getLogger().isDebugEnabled()) { @@ -1606,12 +1002,12 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, } catch (SQLException e) { - closeConnection(conn); + JdbcUtils.closeConnection(conn, getLogger()); throw new StoreException("Error adding content for message " + messageId + ": " + e.getMessage(), e); } finally { - closePreparedStatement(stmt); + JdbcUtils.closePreparedStatement(stmt, getLogger()); } } @@ -1640,7 +1036,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, if (offset > size) { throw new StoreException("Offset " + offset + " is greater than message size " + size - + " for message id " + messageId + "!"); + + " for message id " + messageId + "!"); } @@ -1662,13 +1058,14 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, } finally { - closePreparedStatement(stmt); - closeConnection(conn); + JdbcUtils.closePreparedStatement(stmt, getLogger()); + JdbcUtils.closeConnection(conn, getLogger()); } } + @Override public boolean isPersistent() { return true; @@ -1904,7 +1301,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, } finally { - closeConnection(conn); + JdbcUtils.closeConnection(conn, AbstractJDBCMessageStore.this.getLogger()); } return StoreFuture.IMMEDIATE_FUTURE; } @@ -1927,7 +1324,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, { storeMetaData(conn, _messageId, _metaData); AbstractJDBCMessageStore.this.addContent(conn, _messageId, - _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data)); + _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data)); } finally { @@ -1948,310 +1345,13 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, } } - protected void closeConnection(final Connection conn) - { - if(conn != null) - { - try - { - conn.close(); - } - catch (SQLException e) - { - getLogger().error("Problem closing connection", e); - } - } - } - - protected void closePreparedStatement(final PreparedStatement stmt) - { - if (stmt != null) - { - try - { - stmt.close(); - } - catch(SQLException e) - { - getLogger().error("Problem closing prepared statement", e); - } - } - } - + @Override public void addEventListener(EventListener eventListener, Event... events) { _eventManager.addEventListener(eventListener, events); } - private void insertConfiguredObject(ConfiguredObjectRecord configuredObject, final Connection conn) throws StoreException - { - try - { - PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT); - try - { - stmt.setString(1, configuredObject.getId().toString()); - ResultSet rs = stmt.executeQuery(); - boolean exists; - try - { - exists = rs.next(); - - } - finally - { - rs.close(); - } - // If we don't have any data in the result set then we can add this configured object - if (!exists) - { - PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS); - try - { - insertStmt.setString(1, configuredObject.getId().toString()); - insertStmt.setString(2, configuredObject.getType()); - if(configuredObject.getAttributes() == null) - { - insertStmt.setNull(3, Types.BLOB); - } - else - { - final Map<String, Object> attributes = configuredObject.getAttributes(); - final ObjectMapper objectMapper = new ObjectMapper(); - objectMapper.registerModule(_module); - byte[] attributesAsBytes = objectMapper.writeValueAsBytes(attributes); - - ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes); - insertStmt.setBinaryStream(3, bis, attributesAsBytes.length); - } - insertStmt.execute(); - } - finally - { - insertStmt.close(); - } - - writeHierarchy(configuredObject, conn); - } - - } - finally - { - stmt.close(); - } - - } - catch (JsonMappingException e) - { - throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e); - } - catch (JsonGenerationException e) - { - throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e); - } - catch (IOException e) - { - throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e); - } - catch (SQLException e) - { - throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e); - } - } - @Override - public UUID[] remove(ConfiguredObjectRecord... objects) throws StoreException - { - checkConfigurationStoreOpen(); - - Collection<UUID> removed = new ArrayList<UUID>(objects.length); - try - { - - Connection conn = newAutoCommitConnection(); - try - { - for(ConfiguredObjectRecord record : objects) - { - if(removeConfiguredObject(record.getId(), conn) != 0) - { - removed.add(record.getId()); - } - } - } - finally - { - conn.close(); - } - } - catch (SQLException e) - { - throw new StoreException("Error deleting of configured objects " + Arrays.asList(objects) + " from database: " + e.getMessage(), e); - } - return removed.toArray(new UUID[removed.size()]); - } - - private int removeConfiguredObject(final UUID id, final Connection conn) throws SQLException - { - final int results; - PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECTS); - try - { - stmt.setString(1, id.toString()); - results = stmt.executeUpdate(); - } - finally - { - stmt.close(); - } - stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECT_HIERARCHY); - try - { - stmt.setString(1, id.toString()); - stmt.executeUpdate(); - } - finally - { - stmt.close(); - } - - return results; - } - - public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException - { - checkConfigurationStoreOpen(); - try - { - Connection conn = newConnection(); - try - { - for(ConfiguredObjectRecord record : records) - { - updateConfiguredObject(record, createIfNecessary, conn); - } - conn.commit(); - } - finally - { - conn.close(); - } - } - catch (SQLException e) - { - throw new StoreException("Error updating configured objects in database: " + e.getMessage(), e); - } - } - - private void updateConfiguredObject(ConfiguredObjectRecord configuredObject, - boolean createIfNecessary, - Connection conn) - throws SQLException, StoreException - { - PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT); - try - { - stmt.setString(1, configuredObject.getId().toString()); - ResultSet rs = stmt.executeQuery(); - try - { - final ObjectMapper objectMapper = new ObjectMapper(); - objectMapper.registerModule(_module); - if (rs.next()) - { - PreparedStatement stmt2 = conn.prepareStatement(UPDATE_CONFIGURED_OBJECTS); - try - { - stmt2.setString(1, configuredObject.getType()); - if (configuredObject.getAttributes() != null) - { - byte[] attributesAsBytes = objectMapper.writeValueAsBytes( - configuredObject.getAttributes()); - ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes); - stmt2.setBinaryStream(2, bis, attributesAsBytes.length); - } - else - { - stmt2.setNull(2, Types.BLOB); - } - stmt2.setString(3, configuredObject.getId().toString()); - stmt2.execute(); - } - finally - { - stmt2.close(); - } - } - else if(createIfNecessary) - { - PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS); - try - { - insertStmt.setString(1, configuredObject.getId().toString()); - insertStmt.setString(2, configuredObject.getType()); - if(configuredObject.getAttributes() == null) - { - insertStmt.setNull(3, Types.BLOB); - } - else - { - final Map<String, Object> attributes = configuredObject.getAttributes(); - byte[] attributesAsBytes = objectMapper.writeValueAsBytes(attributes); - ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes); - insertStmt.setBinaryStream(3, bis, attributesAsBytes.length); - } - insertStmt.execute(); - } - finally - { - insertStmt.close(); - } - writeHierarchy(configuredObject, conn); - } - } - finally - { - rs.close(); - } - } - catch (JsonMappingException e) - { - throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e); - } - catch (JsonGenerationException e) - { - throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e); - } - catch (IOException e) - { - throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e); - } - finally - { - stmt.close(); - } - } - - private void writeHierarchy(final ConfiguredObjectRecord configuredObject, final Connection conn) throws SQLException, StoreException - { - PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECT_HIERARCHY); - try - { - for(Map.Entry<String,ConfiguredObjectRecord> parentEntry : configuredObject.getParents().entrySet()) - { - insertStmt.setString(1, configuredObject.getId().toString()); - insertStmt.setString(2, parentEntry.getKey()); - insertStmt.setString(3, parentEntry.getValue().getId().toString()); - - insertStmt.execute(); - } - } - finally - { - insertStmt.close(); - } - } - public void visitMessages(MessageHandler handler) throws StoreException { checkMessageStoreOpen(); @@ -2298,10 +1398,11 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, } finally { - closeConnection(conn); + JdbcUtils.closeConnection(conn, getLogger()); } } + @Override public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException { checkMessageStoreOpen(); @@ -2342,10 +1443,11 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, } finally { - closeConnection(conn); + JdbcUtils.closeConnection(conn, getLogger()); } } + @Override public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException { checkMessageStoreOpen(); @@ -2422,8 +1524,8 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, } if (!handler.handle(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(), - enqueues.toArray(new RecordImpl[enqueues.size()]), - dequeues.toArray(new RecordImpl[dequeues.size()]))) + enqueues.toArray(new RecordImpl[enqueues.size()]), + dequeues.toArray(new RecordImpl[dequeues.size()]))) { break; } @@ -2437,13 +1539,10 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, } finally { - closeConnection(conn); + JdbcUtils.closeConnection(conn, getLogger()); } } - - protected abstract String getBlobAsString(ResultSet rs, int col) throws SQLException; - protected abstract void storedSizeChange(int storeSizeIncrease); @Override @@ -2456,7 +1555,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, try { List<String> tables = new ArrayList<String>(); - tables.addAll(CONFIGURATION_STORE_TABLE_NAMES); tables.addAll(MESSAGE_STORE_TABLE_NAMES); for (String tableName : tables) @@ -2488,57 +1586,4 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, } - private static final class ConfiguredObjectRecordImpl implements ConfiguredObjectRecord - { - - private final UUID _id; - private final String _type; - private final Map<String, Object> _attributes; - private final Map<String, ConfiguredObjectRecord> _parents = new HashMap<String, ConfiguredObjectRecord>(); - - private ConfiguredObjectRecordImpl(final UUID id, - final String type, - final Map<String, Object> attributes) - { - _id = id; - _type = type; - _attributes = Collections.unmodifiableMap(attributes); - } - - @Override - public UUID getId() - { - return _id; - } - - @Override - public String getType() - { - return _type; - } - - private void addParent(String parentType, ConfiguredObjectRecord parent) - { - _parents.put(parentType, parent); - } - - @Override - public Map<String, Object> getAttributes() - { - return _attributes; - } - - @Override - public Map<String, ConfiguredObjectRecord> getParents() - { - return Collections.unmodifiableMap(_parents); - } - - @Override - public String toString() - { - return "ConfiguredObjectRecordImpl [_id=" + _id + ", _type=" + _type + ", _attributes=" + _attributes + ", _parents=" - + _parents + "]"; - } - } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JdbcUtils.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JdbcUtils.java new file mode 100644 index 0000000000..a26e478a50 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JdbcUtils.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.server.store; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + + +import org.apache.log4j.Logger; + +public class JdbcUtils +{ + public static void closeConnection(final Connection conn, final Logger logger) + { + if(conn != null) + { + try + { + conn.close(); + } + catch (SQLException e) + { + logger.error("Problem closing connection", e); + } + } + } + + public static void closePreparedStatement(final PreparedStatement stmt, final Logger logger) + { + if (stmt != null) + { + try + { + stmt.close(); + } + catch(SQLException e) + { + logger.error("Problem closing prepared statement", e); + } + } + } + + public static boolean tableExists(final String tableName, final Connection conn) throws SQLException + { + DatabaseMetaData metaData = conn.getMetaData(); + ResultSet rs = metaData.getTables(null, null, "%", null); + + try + { + + while(rs.next()) + { + final String table = rs.getString(3); + if(tableName.equalsIgnoreCase(table)) + { + return true; + } + } + return false; + } + finally + { + rs.close(); + } + } +} diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java new file mode 100644 index 0000000000..0a121ad476 --- /dev/null +++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java @@ -0,0 +1,337 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.store.derby; + + +import java.sql.CallableStatement; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.store.AbstractJDBCMessageStore; +import org.apache.qpid.server.store.Event; +import org.apache.qpid.server.store.JdbcUtils; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreException; + +public abstract class AbstractDerbyMessageStore extends AbstractJDBCMessageStore +{ + private final AtomicBoolean _messageStoreOpen = new AtomicBoolean(false); + + private long _persistentSizeLowThreshold; + private long _persistentSizeHighThreshold; + private long _totalStoreSize; + private boolean _limitBusted; + + private ConfiguredObject<?> _parent; + + @Override + public final void openMessageStore(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings) + { + if (_messageStoreOpen.compareAndSet(false, true)) + { + _parent = parent; + + DerbyUtils.loadDerbyDriver(); + + doOpen(parent, messageStoreSettings); + + Object overfullAttr = messageStoreSettings.get(MessageStore.OVERFULL_SIZE); + Object underfullAttr = messageStoreSettings.get(MessageStore.UNDERFULL_SIZE); + + _persistentSizeHighThreshold = overfullAttr == null ? -1l : + overfullAttr instanceof Number + ? ((Number) overfullAttr).longValue() + : Long.parseLong(overfullAttr.toString()); + _persistentSizeLowThreshold = underfullAttr == null ? _persistentSizeHighThreshold : + underfullAttr instanceof Number + ? ((Number) underfullAttr).longValue() + : Long.parseLong(underfullAttr.toString()); + + if (_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l) + { + _persistentSizeLowThreshold = _persistentSizeHighThreshold; + } + + + createOrOpenMessageStoreDatabase(); + setInitialSize(); + setMaximumMessageId(); + } + } + + protected abstract void doOpen(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings); + + @Override + public final void upgradeStoreStructure() throws StoreException + { + checkMessageStoreOpen(); + + upgrade(_parent); + } + + @Override + public final void closeMessageStore() + { + if (_messageStoreOpen.compareAndSet(true, false)) + { + doClose(); + } + } + + protected abstract void doClose(); + + @Override + protected boolean isMessageStoreOpen() + { + return _messageStoreOpen.get(); + } + + @Override + protected void checkMessageStoreOpen() + { + if (!_messageStoreOpen.get()) + { + throw new IllegalStateException("Message store is not open"); + } + } + + @Override + protected String getSqlBlobType() + { + return "blob"; + } + + @Override + protected String getSqlVarBinaryType(int size) + { + return "varchar("+size+") for bit data"; + } + + @Override + protected String getSqlBigIntType() + { + return "bigint"; + } + + @Override + protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException + { + return DerbyUtils.getBlobAsBytes(rs, col); + } + + @Override + protected boolean tableExists(final String tableName, final Connection conn) throws SQLException + { + return DerbyUtils.tableExists(tableName, conn); + } + + @Override + protected void storedSizeChange(final int delta) + { + if(getPersistentSizeHighThreshold() > 0) + { + synchronized(this) + { + // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every + // time, so we do so only when there's been enough change that it is worth looking again. We do this by + // assuming the total size will change by less than twice the amount of the message data change. + long newSize = _totalStoreSize += 3*delta; + + Connection conn = null; + try + { + + if(!_limitBusted && newSize > getPersistentSizeHighThreshold()) + { + conn = newAutoCommitConnection(); + _totalStoreSize = getSizeOnDisk(conn); + if(_totalStoreSize > getPersistentSizeHighThreshold()) + { + _limitBusted = true; + _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL); + } + } + else if(_limitBusted && newSize < getPersistentSizeLowThreshold()) + { + long oldSize = _totalStoreSize; + conn = newAutoCommitConnection(); + _totalStoreSize = getSizeOnDisk(conn); + if(oldSize <= _totalStoreSize) + { + + reduceSizeOnDisk(conn); + + _totalStoreSize = getSizeOnDisk(conn); + } + + if(_totalStoreSize < getPersistentSizeLowThreshold()) + { + _limitBusted = false; + _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); + } + + + } + } + catch (SQLException e) + { + JdbcUtils.closeConnection(conn, getLogger()); + throw new StoreException("Exception while processing store size change", e); + } + } + } + } + + private void setInitialSize() + { + Connection conn = null; + try + { + conn = newAutoCommitConnection(); + _totalStoreSize = getSizeOnDisk(conn); + } + catch (SQLException e) + { + getLogger().error("Unable to set initial store size", e); + } + finally + { + JdbcUtils.closeConnection(conn, getLogger()); + } + } + + private long getSizeOnDisk(Connection conn) + { + PreparedStatement stmt = null; + try + { + String sizeQuery = "SELECT SUM(T2.NUMALLOCATEDPAGES * T2.PAGESIZE) TOTALSIZE" + + " FROM " + + " SYS.SYSTABLES systabs," + + " TABLE (SYSCS_DIAG.SPACE_TABLE(systabs.tablename)) AS T2" + + " WHERE systabs.tabletype = 'T'"; + + stmt = conn.prepareStatement(sizeQuery); + + ResultSet rs = null; + long size = 0l; + + try + { + rs = stmt.executeQuery(); + while(rs.next()) + { + size = rs.getLong(1); + } + } + finally + { + if(rs != null) + { + rs.close(); + } + } + + return size; + + } + catch (SQLException e) + { + throw new StoreException("Error establishing on disk size", e); + } + finally + { + JdbcUtils.closePreparedStatement(stmt, getLogger()); + } + } + + private void reduceSizeOnDisk(Connection conn) + { + CallableStatement cs = null; + PreparedStatement stmt = null; + try + { + String tableQuery = + "SELECT S.SCHEMANAME, T.TABLENAME FROM SYS.SYSSCHEMAS S, SYS.SYSTABLES T WHERE S.SCHEMAID = T.SCHEMAID AND T.TABLETYPE='T'"; + stmt = conn.prepareStatement(tableQuery); + ResultSet rs = null; + + List<String> schemas = new ArrayList<String>(); + List<String> tables = new ArrayList<String>(); + + try + { + rs = stmt.executeQuery(); + while(rs.next()) + { + schemas.add(rs.getString(1)); + tables.add(rs.getString(2)); + } + } + finally + { + if(rs != null) + { + rs.close(); + } + } + + + cs = conn.prepareCall + ("CALL SYSCS_UTIL.SYSCS_COMPRESS_TABLE(?, ?, ?)"); + + for(int i = 0; i < schemas.size(); i++) + { + cs.setString(1, schemas.get(i)); + cs.setString(2, tables.get(i)); + cs.setShort(3, (short) 0); + cs.execute(); + } + } + catch (SQLException e) + { + throw new StoreException("Error reducing on disk size", e); + } + finally + { + JdbcUtils.closePreparedStatement(stmt, getLogger()); + JdbcUtils.closePreparedStatement(cs, getLogger()); + } + } + + private long getPersistentSizeLowThreshold() + { + return _persistentSizeLowThreshold; + } + + private long getPersistentSizeHighThreshold() + { + return _persistentSizeHighThreshold; + } + +} diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyConfigurationStore.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyConfigurationStore.java new file mode 100644 index 0000000000..540e92fac7 --- /dev/null +++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyConfigurationStore.java @@ -0,0 +1,221 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.store.derby; + + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.log4j.Logger; + +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.store.AbstractJDBCConfigurationStore; +import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreProvider; +import org.apache.qpid.server.store.StoreException; + +/** + * Implementation of a DurableConfigurationStore backed by Apache Derby + * that also provides a MessageStore. + */ +public class DerbyConfigurationStore extends AbstractJDBCConfigurationStore + implements MessageStoreProvider, DurableConfigurationStore +{ + private static final Logger LOGGER = Logger.getLogger(DerbyConfigurationStore.class); + + private final AtomicBoolean _configurationStoreOpen = new AtomicBoolean(); + private final MessageStoreWrapper _messageStore = new MessageStoreWrapper(); + + private String _connectionURL; + private String _storeLocation; + + private ConfiguredObject<?> _parent; + + @Override + public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings) + throws StoreException + { + if (_configurationStoreOpen.compareAndSet(false, true)) + { + _parent = parent; + DerbyUtils.loadDerbyDriver(); + + String databasePath = (String) storeSettings.get(MessageStore.STORE_PATH); + + _storeLocation = databasePath; + _connectionURL = DerbyUtils.createConnectionUrl(parent.getName(), databasePath); + + createOrOpenConfigurationStoreDatabase(); + } + } + + @Override + public void upgradeStoreStructure() throws StoreException + { + checkConfigurationStoreOpen(); + upgradeIfNecessary(_parent); + } + + @Override + protected Connection getConnection() throws SQLException + { + checkConfigurationStoreOpen(); + return DriverManager.getConnection(_connectionURL); + } + + @Override + public void closeConfigurationStore() throws StoreException + { + if (_messageStore.isMessageStoreOpen()) + { + throw new IllegalStateException("Cannot close the store as the provided message store is still open"); + } + + if (_configurationStoreOpen.compareAndSet(true, false)) + { + try + { + DerbyUtils.shutdownDatabase(_connectionURL); + } + catch (SQLException e) + { + throw new StoreException("Error closing configuration store", e); + } + } + } + + @Override + protected String getSqlBlobType() + { + return "blob"; + } + + @Override + protected String getSqlVarBinaryType(int size) + { + return "varchar("+size+") for bit data"; + } + + @Override + protected String getSqlBigIntType() + { + return "bigint"; + } + + @Override + protected String getBlobAsString(ResultSet rs, int col) throws SQLException + { + return DerbyUtils.getBlobAsString(rs, col); + } + + @Override + public void onDelete() + { + if (_messageStore.isMessageStoreOpen()) + { + throw new IllegalStateException("Cannot delete the store as the provided message store is still open"); + } + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Deleting store " + _storeLocation); + } + + try + { + DerbyUtils.deleteDatabaseLocation(_storeLocation); + } + catch (StoreException se) + { + LOGGER.debug("Failed to delete the store at location " + _storeLocation); + } + finally + { + _storeLocation = null; + } + } + + @Override + public MessageStore getMessageStore() + { + return _messageStore; + } + + @Override + protected boolean tableExists(final String tableName, final Connection conn) throws SQLException + { + return DerbyUtils.tableExists(tableName, conn); + } + + @Override + protected void checkConfigurationStoreOpen() + { + if (!_configurationStoreOpen.get()) + { + throw new IllegalStateException("Configuration store is not open"); + } + } + + @Override + protected Logger getLogger() + { + return LOGGER; + } + + private class MessageStoreWrapper extends AbstractDerbyMessageStore + { + @Override + protected void doOpen(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings) + { + // Nothing to do, store provided by DerbyConfigurationStore + } + + @Override + protected Connection getConnection() throws SQLException + { + checkMessageStoreOpen(); + return DerbyConfigurationStore.this.getConnection(); + } + + @Override + protected void doClose() + { + // Nothing to do, store provided by DerbyConfigurationStore + } + + @Override + public String getStoreLocation() + { + return DerbyConfigurationStore.this._storeLocation; + } + + @Override + protected Logger getLogger() + { + return DerbyConfigurationStore.this.getLogger(); + } + } +} diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java index 6f87e81ba1..9e2a2f63d4 100644 --- a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java +++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java @@ -21,522 +21,96 @@ package org.apache.qpid.server.store.derby; -import java.io.File; -import java.sql.Blob; -import java.sql.CallableStatement; import java.sql.Connection; -import java.sql.Driver; import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.ResultSet; import java.sql.SQLException; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import org.apache.log4j.Logger; import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.store.AbstractJDBCMessageStore; -import org.apache.qpid.server.store.DurableConfigurationStore; -import org.apache.qpid.server.store.Event; -import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreProvider; -import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.store.Transaction; -import org.apache.qpid.server.store.handler.DistributedTransactionHandler; -import org.apache.qpid.server.store.handler.MessageHandler; -import org.apache.qpid.server.store.handler.MessageInstanceHandler; -import org.apache.qpid.util.FileUtils; /** - * An implementation of a {@link MessageStore} that uses Apache Derby as the persistence - * mechanism. - * + * Implementation of a MessageStore backed by Apache Derby. */ -public class DerbyMessageStore extends AbstractJDBCMessageStore implements MessageStoreProvider, - DurableConfigurationStore +public class DerbyMessageStore extends AbstractDerbyMessageStore { + private static final Logger LOGGER = Logger.getLogger(DerbyMessageStore.class); - private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class); - - private static final String SQL_DRIVER_NAME = "org.apache.derby.jdbc.EmbeddedDriver"; - - public static final String MEMORY_STORE_LOCATION = ":memory:"; - - private static final String TABLE_EXISTANCE_QUERY = "SELECT 1 FROM SYS.SYSTABLES WHERE TABLENAME = ?"; - - public static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006"; - - public static final String TYPE = "DERBY"; - - private long _totalStoreSize; - private boolean _limitBusted; - private long _persistentSizeLowThreshold; - private long _persistentSizeHighThreshold; - - protected String _connectionURL; - + private String _connectionURL; private String _storeLocation; - private Class<Driver> _driverClass; - - private final MessageStore _messageStoreFacade = new MessageStoreWrapper(); - - public DerbyMessageStore() - { - } - - protected Logger getLogger() - { - return _logger; - } @Override - protected String getSqlBlobType() + protected void doOpen(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings) { - return "blob"; - } + String databasePath = (String) messageStoreSettings.get(MessageStore.STORE_PATH); + String name = parent.getName(); - @Override - protected String getSqlVarBinaryType(int size) - { - return "varchar("+size+") for bit data"; + _storeLocation = databasePath; + _connectionURL = DerbyUtils.createConnectionUrl(name, databasePath); } @Override - protected String getSqlBigIntType() + protected Connection getConnection() throws SQLException { - return "bigint"; + checkMessageStoreOpen(); + return DriverManager.getConnection(_connectionURL); } + @Override protected void doClose() { try { - Connection conn = DriverManager.getConnection(_connectionURL + ";shutdown=true"); - // Shouldn't reach this point - shutdown=true should throw SQLException - conn.close(); - getLogger().error("Unable to shut down the store"); + DerbyUtils.shutdownDatabase(_connectionURL); } catch (SQLException e) { - if (e.getSQLState().equalsIgnoreCase(DerbyMessageStore.DERBY_SINGLE_DB_SHUTDOWN_CODE)) - { - //expected and represents a clean shutdown of this database only, do nothing. - } - else - { - getLogger().error("Exception whilst shutting down the store: " + e); - throw new StoreException("Error closing message store", e); - } + throw new StoreException("Error closing configuration store", e); } } @Override - protected void implementationSpecificConfiguration(String name, Map<String, Object> messageStoreSettings) - throws ClassNotFoundException - { - //Update to pick up QPID_WORK and use that as the default location not just derbyDB - _driverClass = (Class<Driver>) Class.forName(SQL_DRIVER_NAME); - - String databasePath = (String) messageStoreSettings.get(MessageStore.STORE_PATH);; - - if(databasePath == null) - { - databasePath = System.getProperty("QPID_WORK") + File.separator + "derbyDB"; - } - - if(!MEMORY_STORE_LOCATION.equals(databasePath)) - { - File environmentPath = new File(databasePath); - if (!environmentPath.exists()) - { - if (!environmentPath.mkdirs()) - { - throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. " - + "Ensure the path is correct and that the permissions are correct."); - } - } - } - - _storeLocation = databasePath; - - Object overfullAttr = messageStoreSettings.get(MessageStore.OVERFULL_SIZE); - Object underfullAttr = messageStoreSettings.get(MessageStore.UNDERFULL_SIZE); - - _persistentSizeHighThreshold = overfullAttr == null ? -1l : - overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString()); - _persistentSizeLowThreshold = underfullAttr == null ? _persistentSizeHighThreshold : - underfullAttr instanceof Number ? ((Number) underfullAttr).longValue() : Long.parseLong(underfullAttr.toString()); - - if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l) - { - _persistentSizeLowThreshold = _persistentSizeHighThreshold; - } - - //FIXME this the _vhost name should not be added here, but derby wont use an empty directory as was possibly just created. - _connectionURL = "jdbc:derby" + (databasePath.equals(MEMORY_STORE_LOCATION) ? databasePath: ":" + databasePath+ "/") + name + ";create=true"; - - setInitialSize(); - - } - - private void setInitialSize() - { - Connection conn = null; - try - { - - - try - { - conn = newAutoCommitConnection(); - _totalStoreSize = getSizeOnDisk(conn); - } - finally - { - if(conn != null) - { - conn.close(); - - - } - } - } - catch (SQLException e) - { - getLogger().error("Unable to set initial store size", e); - } - } - - protected String getBlobAsString(ResultSet rs, int col) throws SQLException - { - Blob blob = rs.getBlob(col); - if(blob == null) - { - return null; - } - byte[] bytes = blob.getBytes(1, (int)blob.length()); - return new String(bytes, UTF8_CHARSET); - } - - protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException - { - Blob dataAsBlob = rs.getBlob(col); - return dataAsBlob.getBytes(1,(int) dataAsBlob.length()); - } - - - protected boolean tableExists(final String tableName, final Connection conn) throws SQLException - { - PreparedStatement stmt = conn.prepareStatement(TABLE_EXISTANCE_QUERY); - try - { - stmt.setString(1, tableName); - ResultSet rs = stmt.executeQuery(); - try - { - return rs.next(); - } - finally - { - rs.close(); - } - } - finally - { - stmt.close(); - } - } - - public String getStoreLocation() - { - return _storeLocation; - } - - protected synchronized void storedSizeChange(final int delta) + public void onDelete() { - if(getPersistentSizeHighThreshold() > 0) + if (isMessageStoreOpen()) { - synchronized(this) - { - // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every - // time, so we do so only when there's been enough change that it is worth looking again. We do this by - // assuming the total size will change by less than twice the amount of the message data change. - long newSize = _totalStoreSize += 3*delta; - - Connection conn = null; - try - { - - if(!_limitBusted && newSize > getPersistentSizeHighThreshold()) - { - conn = newAutoCommitConnection(); - _totalStoreSize = getSizeOnDisk(conn); - if(_totalStoreSize > getPersistentSizeHighThreshold()) - { - _limitBusted = true; - _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL); - } - } - else if(_limitBusted && newSize < getPersistentSizeLowThreshold()) - { - long oldSize = _totalStoreSize; - conn = newAutoCommitConnection(); - _totalStoreSize = getSizeOnDisk(conn); - if(oldSize <= _totalStoreSize) - { - - reduceSizeOnDisk(conn); - - _totalStoreSize = getSizeOnDisk(conn); - } - - if(_totalStoreSize < getPersistentSizeLowThreshold()) - { - _limitBusted = false; - _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); - } - - - } - } - catch (SQLException e) - { - closeConnection(conn); - throw new StoreException("Exception while processing store size change", e); - } - } + throw new IllegalStateException("Cannot delete the store as the message store is still open"); } - } - - private void reduceSizeOnDisk(Connection conn) - { - CallableStatement cs = null; - PreparedStatement stmt = null; - try - { - String tableQuery = - "SELECT S.SCHEMANAME, T.TABLENAME FROM SYS.SYSSCHEMAS S, SYS.SYSTABLES T WHERE S.SCHEMAID = T.SCHEMAID AND T.TABLETYPE='T'"; - stmt = conn.prepareStatement(tableQuery); - ResultSet rs = null; - List<String> schemas = new ArrayList<String>(); - List<String> tables = new ArrayList<String>(); - - try - { - rs = stmt.executeQuery(); - while(rs.next()) - { - schemas.add(rs.getString(1)); - tables.add(rs.getString(2)); - } - } - finally - { - if(rs != null) - { - rs.close(); - } - } - - - cs = conn.prepareCall - ("CALL SYSCS_UTIL.SYSCS_COMPRESS_TABLE(?, ?, ?)"); - - for(int i = 0; i < schemas.size(); i++) - { - cs.setString(1, schemas.get(i)); - cs.setString(2, tables.get(i)); - cs.setShort(3, (short) 0); - cs.execute(); - } - } - catch (SQLException e) - { - closeConnection(conn); - throw new StoreException("Error reducing on disk size", e); - } - finally + if (LOGGER.isDebugEnabled()) { - closePreparedStatement(stmt); - closePreparedStatement(cs); + LOGGER.debug("Deleting store " + _storeLocation); } - } - - private long getSizeOnDisk(Connection conn) - { - PreparedStatement stmt = null; try { - String sizeQuery = "SELECT SUM(T2.NUMALLOCATEDPAGES * T2.PAGESIZE) TOTALSIZE" + - " FROM " + - " SYS.SYSTABLES systabs," + - " TABLE (SYSCS_DIAG.SPACE_TABLE(systabs.tablename)) AS T2" + - " WHERE systabs.tabletype = 'T'"; - - stmt = conn.prepareStatement(sizeQuery); - - ResultSet rs = null; - long size = 0l; - - try - { - rs = stmt.executeQuery(); - while(rs.next()) - { - size = rs.getLong(1); - } - } - finally - { - if(rs != null) - { - rs.close(); - } - } - - return size; - + DerbyUtils.deleteDatabaseLocation(_storeLocation); } - catch (SQLException e) + catch (StoreException se) { - closeConnection(conn); - throw new StoreException("Error establishing on disk size", e); + LOGGER.debug("Failed to delete the store at location " + _storeLocation); } finally { - closePreparedStatement(stmt); + _storeLocation = null; } - - } - - - private long getPersistentSizeLowThreshold() - { - return _persistentSizeLowThreshold; - } - - private long getPersistentSizeHighThreshold() - { - return _persistentSizeHighThreshold; } @Override - public void onDelete() + protected Logger getLogger() { - if (_logger.isDebugEnabled()) - { - _logger.debug("Deleting store " + _storeLocation); - } - - if (MEMORY_STORE_LOCATION.equals(_storeLocation)) - { - return; - } - - if (_storeLocation != null) - { - File location = new File(_storeLocation); - if (location.exists()) - { - if (!FileUtils.delete(location, true)) - { - _logger.error("Cannot delete " + _storeLocation); - } - } - } + return LOGGER; } - protected Connection getConnection() throws SQLException - { - return DriverManager.getConnection(_connectionURL); - } @Override - public MessageStore getMessageStore() + public String getStoreLocation() { - return _messageStoreFacade; + return _storeLocation; } - private class MessageStoreWrapper implements MessageStore - { - - @Override - public void openMessageStore(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings) - { - DerbyMessageStore.this.openMessageStore(parent, messageStoreSettings); - } - - @Override - public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(final T metaData) - { - return DerbyMessageStore.this.addMessage(metaData); - } - - @Override - public boolean isPersistent() - { - return DerbyMessageStore.this.isPersistent(); - } - - @Override - public Transaction newTransaction() - { - return DerbyMessageStore.this.newTransaction(); - } - - @Override - public void closeMessageStore() - { - DerbyMessageStore.this.closeMessageStore(); - } - - @Override - public void addEventListener(final EventListener eventListener, final Event... events) - { - DerbyMessageStore.this.addEventListener(eventListener, events); - } - - @Override - public void upgradeStoreStructure() throws StoreException - { - } - - @Override - public String getStoreLocation() - { - return DerbyMessageStore.this.getStoreLocation(); - } - - @Override - public void onDelete() - { - DerbyMessageStore.this.onDelete(); - } - - @Override - public void visitMessages(final MessageHandler handler) throws StoreException - { - DerbyMessageStore.this.visitMessages(handler); - } - - @Override - public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException - { - DerbyMessageStore.this.visitMessageInstances(handler); - } - - @Override - public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException - { - DerbyMessageStore.this.visitDistributedTransactions(handler); - } - } } diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyUtils.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyUtils.java new file mode 100644 index 0000000000..b0f4a137f2 --- /dev/null +++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyUtils.java @@ -0,0 +1,165 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.store.derby; + + +import java.io.File; +import java.nio.charset.Charset; +import java.sql.Blob; +import java.sql.Connection; +import java.sql.Driver; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +import org.apache.qpid.server.store.StoreException; +import org.apache.qpid.util.FileUtils; + +public class DerbyUtils +{ + public static final String MEMORY_STORE_LOCATION = ":memory:"; + public static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006"; + private static final String SQL_DRIVER_NAME = "org.apache.derby.jdbc.EmbeddedDriver"; + private static final String TABLE_EXISTENCE_QUERY = "SELECT 1 FROM SYS.SYSTABLES WHERE TABLENAME = ?"; + private static final Charset UTF8_CHARSET = Charset.forName("UTF-8"); + + + public static void loadDerbyDriver() + { + try + { + Class<Driver> driverClass = (Class<Driver>) Class.forName(SQL_DRIVER_NAME); + } + catch (ClassNotFoundException e) + { + throw new StoreException("Failed to load driver " + SQL_DRIVER_NAME, e); + } + } + + public static String createConnectionUrl(final String name, final String databasePath) + { + // Derby wont use an existing directory, so we append parent name + if (MEMORY_STORE_LOCATION.equals(databasePath)) + { + return "jdbc:derby:" + MEMORY_STORE_LOCATION + "/" + name + ";create=true"; + } + else + { + File environmentPath = new File(databasePath); + if (!environmentPath.exists()) + { + if (!environmentPath.mkdirs()) + { + throw new IllegalArgumentException("Environment path " + + environmentPath + + " could not be read or created. " + + "Ensure the path is correct and that the permissions are correct."); + } + } + return "jdbc:derby:" + databasePath + "/" + name + ";create=true"; + } + + } + + public static void shutdownDatabase(String connectionURL) throws SQLException + { + try + { + Connection conn = DriverManager.getConnection(connectionURL + ";shutdown=true"); + // Shouldn't reach this point - shutdown=true should throw SQLException + conn.close(); + } + catch (SQLException e) + { + if (e.getSQLState().equalsIgnoreCase(DerbyUtils.DERBY_SINGLE_DB_SHUTDOWN_CODE)) + { + //expected and represents a clean shutdown of this database only, do nothing. + } + else + { + throw e; + } + } + } + + public static void deleteDatabaseLocation(String storeLocation) + { + if (MEMORY_STORE_LOCATION.equals(storeLocation)) + { + return; + } + + if (storeLocation != null) + { + File location = new File(storeLocation); + if (location.exists()) + { + if (!FileUtils.delete(location, true)) + { + throw new StoreException("Failed to delete the store at location : " + storeLocation); + } + } + } + } + + public static String getBlobAsString(ResultSet rs, int col) throws SQLException + { + Blob blob = rs.getBlob(col); + if(blob == null) + { + return null; + } + byte[] bytes = blob.getBytes(1, (int) blob.length()); + return new String(bytes, UTF8_CHARSET); + } + + protected static byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException + { + Blob dataAsBlob = rs.getBlob(col); + return dataAsBlob.getBytes(1,(int) dataAsBlob.length()); + } + + public static boolean tableExists(final String tableName, final Connection conn) throws SQLException + { + PreparedStatement stmt = conn.prepareStatement(TABLE_EXISTENCE_QUERY); + try + { + stmt.setString(1, tableName); + ResultSet rs = stmt.executeQuery(); + try + { + return rs.next(); + } + finally + { + rs.close(); + } + } + finally + { + stmt.close(); + } + } + + +} + diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhost/derby/DerbyVirtualHost.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhost/derby/DerbyVirtualHost.java index 31c3f7c944..fc67d2fa50 100644 --- a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhost/derby/DerbyVirtualHost.java +++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhost/derby/DerbyVirtualHost.java @@ -45,7 +45,7 @@ public class DerbyVirtualHost extends AbstractVirtualHost<DerbyVirtualHost> @Override protected MessageStore createMessageStore() { - return new DerbyMessageStore().getMessageStore(); + return new DerbyMessageStore(); } } diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhostnode/derby/DerbyVirtualHostNode.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhostnode/derby/DerbyVirtualHostNode.java index ffc19832fa..340d046033 100644 --- a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhostnode/derby/DerbyVirtualHostNode.java +++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhostnode/derby/DerbyVirtualHostNode.java @@ -28,7 +28,7 @@ import org.apache.qpid.server.model.ManagedAttributeField; import org.apache.qpid.server.model.ManagedObject; import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; import org.apache.qpid.server.store.DurableConfigurationStore; -import org.apache.qpid.server.store.derby.DerbyMessageStore; +import org.apache.qpid.server.store.derby.DerbyConfigurationStore; import org.apache.qpid.server.virtualhostnode.AbstractStandardVirtualHostNode; import org.apache.qpid.server.virtualhostnode.FileBasedVirtualHostNode; @@ -49,7 +49,7 @@ public class DerbyVirtualHostNode extends AbstractStandardVirtualHostNode<DerbyV @Override protected DurableConfigurationStore createConfigurationStore() { - return new DerbyMessageStore(); + return new DerbyConfigurationStore(); } @Override diff --git a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreConfigurationTest.java b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreConfigurationTest.java index aaf65e9ee0..08c421b606 100644 --- a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreConfigurationTest.java +++ b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreConfigurationTest.java @@ -26,9 +26,9 @@ public class DerbyMessageStoreConfigurationTest extends AbstractDurableConfigura { @Override - protected DerbyMessageStore createConfigStore() throws Exception + protected DerbyConfigurationStore createConfigStore() throws Exception { - return new DerbyMessageStore(); + return new DerbyConfigurationStore(); } } diff --git a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java index 1d35b9ef83..ba7ae26292 100644 --- a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java +++ b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java @@ -50,7 +50,7 @@ public class DerbyMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTes @Override protected MessageStore createStore() throws Exception { - return (new DerbyMessageStore()).getMessageStore(); + return new DerbyMessageStore(); } @Override diff --git a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java index 4594b7f223..9a2d945494 100644 --- a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java +++ b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java @@ -83,7 +83,7 @@ public class DerbyMessageStoreTest extends MessageStoreTestCase @Override protected MessageStore createMessageStore() { - return (new DerbyMessageStore()).getMessageStore(); + return new DerbyMessageStore(); } } diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java new file mode 100644 index 0000000000..bd245fa4f4 --- /dev/null +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.store.jdbc; + + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.store.StoreException; +import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.store.Transaction; + +public abstract class GenericAbstractJDBCMessageStore extends org.apache.qpid.server.store.AbstractJDBCMessageStore +{ + private final AtomicBoolean _messageStoreOpen = new AtomicBoolean(false); + private final List<RecordedJDBCTransaction> _transactions = new CopyOnWriteArrayList<>(); + + private ConfiguredObject<?> _parent; + + @Override + public final void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings) + { + if (_messageStoreOpen.compareAndSet(false, true)) + { + _parent = parent; + + doOpen(parent, storeSettings); + + createOrOpenMessageStoreDatabase(); + setMaximumMessageId(); + } + } + + protected abstract void doOpen(final ConfiguredObject<?> parent, final Map<String, Object> storeSettings) + throws StoreException; + + @Override + public final void upgradeStoreStructure() throws StoreException + { + checkMessageStoreOpen(); + + upgrade(_parent); + } + + @Override + public final void closeMessageStore() + { + if (_messageStoreOpen.compareAndSet(true, false)) + { + try + { + while(!_transactions.isEmpty()) + { + RecordedJDBCTransaction txn = _transactions.get(0); + txn.abortTran(); + } + } + finally + { + doClose(); + } + + } + } + + protected abstract void doClose(); + + protected boolean isMessageStoreOpen() + { + return _messageStoreOpen.get(); + } + + @Override + protected void checkMessageStoreOpen() + { + if (!_messageStoreOpen.get()) + { + throw new IllegalStateException("Message store is not open"); + } + } + + @Override + protected void storedSizeChange(int contentSize) + { + } + + @Override + public Transaction newTransaction() + { + return new RecordedJDBCTransaction(); + } + + + private class RecordedJDBCTransaction extends JDBCTransaction + { + private RecordedJDBCTransaction() + { + super(); + GenericAbstractJDBCMessageStore.this._transactions.add(this); + } + + @Override + public void commitTran() + { + try + { + super.commitTran(); + } + finally + { + GenericAbstractJDBCMessageStore.this._transactions.remove(this); + } + } + + @Override + public StoreFuture commitTranAsync() + { + try + { + return super.commitTranAsync(); + } + finally + { + GenericAbstractJDBCMessageStore.this._transactions.remove(this); + } + } + + @Override + public void abortTran() + { + try + { + super.abortTran(); + } + finally + { + GenericAbstractJDBCMessageStore.this._transactions.remove(this); + } + } + } +} diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java new file mode 100644 index 0000000000..7bafe5a859 --- /dev/null +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.store.jdbc; + + +import java.nio.charset.Charset; +import java.sql.Blob; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.log4j.Logger; + +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory; +import org.apache.qpid.server.store.*; +import org.apache.qpid.server.util.MapValueConverter; + +/** + * Implementation of a DurableConfigurationStore backed by Generic JDBC Database + * that also provides a MessageStore. + */ +public class GenericJDBCConfigurationStore extends AbstractJDBCConfigurationStore implements MessageStoreProvider +{ + private static final Charset UTF8_CHARSET = Charset.forName("UTF-8"); + + private static final Logger LOGGER = Logger.getLogger(GenericJDBCConfigurationStore.class); + + public static final String CONNECTION_URL = "connectionUrl"; + public static final String CONNECTION_POOL_TYPE = "connectionPoolType"; + public static final String JDBC_BIG_INT_TYPE = "bigIntType"; + public static final String JDBC_BYTES_FOR_BLOB = "bytesForBlob"; + public static final String JDBC_VARBINARY_TYPE = "varbinaryType"; + public static final String JDBC_BLOB_TYPE = "blobType"; + + private final AtomicBoolean _configurationStoreOpen = new AtomicBoolean(); + private final MessageStore _messageStoreFacade = new MessageStoreWrapper(); + + protected String _connectionURL; + private ConnectionProvider _connectionProvider; + + private String _blobType; + private String _varBinaryType; + private String _bigIntType; + private boolean _useBytesMethodsForBlob; + + private ConfiguredObject<?> _parent; + + @Override + public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings) + throws StoreException + { + if (_configurationStoreOpen.compareAndSet(false, true)) + { + _parent = parent; + _connectionURL = String.valueOf(storeSettings.get(CONNECTION_URL)); + Object poolAttribute = storeSettings.get(CONNECTION_POOL_TYPE); + + JDBCDetails details = null; + + String[] components = _connectionURL.split(":", 3); + if(components.length >= 2) + { + String vendor = components[1]; + details = JDBCDetails.getDetails(vendor); + } + + if(details == null) + { + getLogger().info("Do not recognize vendor from connection URL: " + _connectionURL); + + details = JDBCDetails.getDefaultDetails(); + } + + String connectionPoolType = poolAttribute == null ? DefaultConnectionProviderFactory.TYPE : String.valueOf(poolAttribute); + + JDBCConnectionProviderFactory connectionProviderFactory = + JDBCConnectionProviderFactory.FACTORIES.get(connectionPoolType); + if(connectionProviderFactory == null) + { + LOGGER.warn("Unknown connection pool type: " + + connectionPoolType + + ". no connection pooling will be used"); + connectionProviderFactory = new DefaultConnectionProviderFactory(); + } + + try + { + _connectionProvider = connectionProviderFactory.getConnectionProvider(_connectionURL, storeSettings); + } + catch (SQLException e) + { + throw new StoreException("Failed to create connection provider for " + _connectionURL); + } + _blobType = MapValueConverter.getStringAttribute(JDBC_BLOB_TYPE, storeSettings, details.getBlobType()); + _varBinaryType = MapValueConverter.getStringAttribute(JDBC_VARBINARY_TYPE, storeSettings, details.getVarBinaryType()); + _useBytesMethodsForBlob = MapValueConverter.getBooleanAttribute(JDBC_BYTES_FOR_BLOB, storeSettings, details.isUseBytesMethodsForBlob()); + _bigIntType = MapValueConverter.getStringAttribute(JDBC_BIG_INT_TYPE, + storeSettings, + details.getBigintType()); + + createOrOpenConfigurationStoreDatabase(); + } + } + + @Override + public void upgradeStoreStructure() throws StoreException + { + checkConfigurationStoreOpen(); + upgradeIfNecessary(_parent); + } + + @Override + protected Connection getConnection() throws SQLException + { + return _connectionProvider.getConnection(); + } + + @Override + public void closeConfigurationStore() throws StoreException + { + if (_configurationStoreOpen.compareAndSet(true, false)) + { + try + { + _connectionProvider.close(); + } + catch (SQLException e) + { + throw new StoreException("Unable to close connection provider ", e); + } + } + } + + @Override + protected String getSqlBlobType() + { + return _blobType; + } + + @Override + protected String getSqlVarBinaryType(int size) + { + return String.format(_varBinaryType, size); + } + + @Override + public String getSqlBigIntType() + { + return _bigIntType; + } + + @Override + protected String getBlobAsString(ResultSet rs, int col) throws SQLException + { + byte[] bytes; + if(_useBytesMethodsForBlob) + { + bytes = rs.getBytes(col); + return new String(bytes,UTF8_CHARSET); + } + else + { + Blob blob = rs.getBlob(col); + if(blob == null) + { + return null; + } + bytes = blob.getBytes(1, (int)blob.length()); + } + return new String(bytes, UTF8_CHARSET); + + } + + protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException + { + if(_useBytesMethodsForBlob) + { + return rs.getBytes(col); + } + else + { + Blob dataAsBlob = rs.getBlob(col); + return dataAsBlob.getBytes(1,(int) dataAsBlob.length()); + + } + } + + @Override + protected void checkConfigurationStoreOpen() + { + if (!_configurationStoreOpen.get()) + { + throw new IllegalStateException("Configuration store is not open"); + } + } + + @Override + protected Logger getLogger() + { + return LOGGER; + } + + @Override + public MessageStore getMessageStore() + { + return _messageStoreFacade; + } + + private class MessageStoreWrapper extends GenericAbstractJDBCMessageStore + { + @Override + protected void doOpen(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings) + { + // Nothing to do, store provided by DerbyConfigurationStore + } + + @Override + protected Connection getConnection() throws SQLException + { + return GenericJDBCConfigurationStore.this.getConnection(); + } + + @Override + protected void doClose() + { + // Nothing to do, store provided by DerbyConfigurationStore + } + + @Override + public String getStoreLocation() + { + return GenericJDBCConfigurationStore.this._connectionURL; + } + + @Override + protected Logger getLogger() + { + return GenericJDBCConfigurationStore.this.getLogger(); + } + + @Override + protected String getSqlBlobType() + { + return GenericJDBCConfigurationStore.this.getSqlBlobType(); + } + + @Override + protected String getSqlVarBinaryType(int size) + { + return GenericJDBCConfigurationStore.this.getSqlVarBinaryType(size); + } + + @Override + protected String getSqlBigIntType() + { + return GenericJDBCConfigurationStore.this.getSqlBigIntType(); + } + + @Override + protected byte[] getBlobAsBytes(final ResultSet rs, final int col) throws SQLException + { + return GenericJDBCConfigurationStore.this.getBlobAsBytes(rs, col); + } + } + + +} diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java new file mode 100644 index 0000000000..dad4432183 --- /dev/null +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java @@ -0,0 +1,177 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.store.jdbc; + + +import java.sql.Blob; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Map; + +import org.apache.log4j.Logger; + +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory; +import org.apache.qpid.server.store.StoreException; +import org.apache.qpid.server.util.MapValueConverter; + +/** + * Implementation of a MessageStore backed by a Generic JDBC Database. + */ +public class GenericJDBCMessageStore extends GenericAbstractJDBCMessageStore +{ + + private static final Logger _logger = Logger.getLogger(GenericJDBCMessageStore.class); + + public static final String TYPE = "JDBC"; + public static final String CONNECTION_URL = "connectionUrl"; + public static final String CONNECTION_POOL_TYPE = "connectionPoolType"; + public static final String JDBC_BIG_INT_TYPE = "bigIntType"; + public static final String JDBC_BYTES_FOR_BLOB = "bytesForBlob"; + public static final String JDBC_VARBINARY_TYPE = "varbinaryType"; + public static final String JDBC_BLOB_TYPE = "blobType"; + + protected String _connectionURL; + private ConnectionProvider _connectionProvider; + + private String _blobType; + private String _varBinaryType; + private String _bigIntType; + private boolean _useBytesMethodsForBlob; + + + @Override + protected void doOpen(final ConfiguredObject<?> parent, final Map<String, Object> storeSettings) throws StoreException + { + _connectionURL = String.valueOf(storeSettings.get(CONNECTION_URL)); + + org.apache.qpid.server.store.jdbc.JDBCDetails details = null; + + String[] components = _connectionURL.split(":", 3); + if(components.length >= 2) + { + String vendor = components[1]; + details = org.apache.qpid.server.store.jdbc.JDBCDetails.getDetails(vendor); + } + + if(details == null) + { + getLogger().info("Do not recognize vendor from connection URL: " + _connectionURL); + + details = org.apache.qpid.server.store.jdbc.JDBCDetails.getDefaultDetails(); + } + + + _blobType = MapValueConverter.getStringAttribute(JDBC_BLOB_TYPE, storeSettings, details.getBlobType()); + _varBinaryType = MapValueConverter.getStringAttribute(JDBC_VARBINARY_TYPE, storeSettings, details.getVarBinaryType()); + _useBytesMethodsForBlob = MapValueConverter.getBooleanAttribute(JDBC_BYTES_FOR_BLOB, storeSettings, details.isUseBytesMethodsForBlob()); + _bigIntType = MapValueConverter.getStringAttribute(JDBC_BIG_INT_TYPE, + storeSettings, + details.getBigintType()); + + Object poolAttribute = storeSettings.get(CONNECTION_POOL_TYPE); + String connectionPoolType = poolAttribute == null ? DefaultConnectionProviderFactory.TYPE : String.valueOf(poolAttribute); + + JDBCConnectionProviderFactory connectionProviderFactory = + JDBCConnectionProviderFactory.FACTORIES.get(connectionPoolType); + if(connectionProviderFactory == null) + { + _logger.warn("Unknown connection pool type: " + connectionPoolType + ". no connection pooling will be used"); + connectionProviderFactory = new DefaultConnectionProviderFactory(); + } + + try + { + _connectionProvider = connectionProviderFactory.getConnectionProvider(_connectionURL, storeSettings); + } + catch (SQLException e) + { + throw new StoreException("Failed to create connection provider for " + _connectionURL); + } + + } + + @Override + protected Connection getConnection() throws SQLException + { + return _connectionProvider.getConnection(); + } + + protected void doClose() + { + try + { + _connectionProvider.close(); + } + catch (SQLException e) + { + throw new StoreException("Unable to close connection provider ", e); + } + } + + + @Override + protected Logger getLogger() + { + return _logger; + } + + @Override + protected String getSqlBlobType() + { + return _blobType; + } + + @Override + protected String getSqlVarBinaryType(int size) + { + return String.format(_varBinaryType, size); + } + + @Override + protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException + { + if(_useBytesMethodsForBlob) + { + return rs.getBytes(col); + } + else + { + Blob dataAsBlob = rs.getBlob(col); + return dataAsBlob.getBytes(1,(int) dataAsBlob.length()); + + } + } + + @Override + public String getSqlBigIntType() + { + return _bigIntType; + } + + @Override + public String getStoreLocation() + { + return _connectionURL; + } + +} diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java new file mode 100644 index 0000000000..6cf1413b83 --- /dev/null +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.server.store.jdbc; + +import java.util.HashMap; +import java.util.Map; + +public class JDBCDetails +{ + + private static Map<String, JDBCDetails> VENDOR_DETAILS = new HashMap<>(); + + private static JDBCDetails DERBY_DETAILS = + new JDBCDetails("derby", + "blob", + "varchar(%d) for bit data", + "bigint", + false); + + private static JDBCDetails POSTGRESQL_DETAILS = + new JDBCDetails("postgresql", + "bytea", + "bytea", + "bigint", + true); + + private static JDBCDetails MYSQL_DETAILS = + new JDBCDetails("mysql", + "blob", + "varbinary(%d)", + "bigint", + false); + + + private static JDBCDetails SYBASE_DETAILS = + new JDBCDetails("sybase", + "image", + "varbinary(%d)", + "bigint", + false); + + + private static JDBCDetails ORACLE_DETAILS = + new JDBCDetails("oracle", + "blob", + "raw(%d)", + "number", + false); + + + static + { + + addDetails(DERBY_DETAILS); + addDetails(POSTGRESQL_DETAILS); + addDetails(MYSQL_DETAILS); + addDetails(SYBASE_DETAILS); + addDetails(ORACLE_DETAILS); + } + + public static JDBCDetails getDetails(String vendor) + { + return VENDOR_DETAILS.get(vendor); + } + + public static JDBCDetails getDefaultDetails() + { + return DERBY_DETAILS; + } + + private static void addDetails(JDBCDetails details) + { + VENDOR_DETAILS.put(details.getVendor(), details); + } + + private final String _vendor; + private String _blobType; + private String _varBinaryType; + private String _bigintType; + private boolean _useBytesMethodsForBlob; + + JDBCDetails(String vendor, + String blobType, + String varBinaryType, + String bigIntType, + boolean useBytesMethodsForBlob) + { + _vendor = vendor; + setBlobType(blobType); + setVarBinaryType(varBinaryType); + setBigintType(bigIntType); + setUseBytesMethodsForBlob(useBytesMethodsForBlob); + } + + + @Override + public boolean equals(Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + + JDBCDetails that = (JDBCDetails) o; + + if (!getVendor().equals(that.getVendor())) + { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + return getVendor().hashCode(); + } + + @Override + public String toString() + { + return "JDBCDetails{" + + "vendor='" + getVendor() + '\'' + + ", blobType='" + getBlobType() + '\'' + + ", varBinaryType='" + getVarBinaryType() + '\'' + + ", bigIntType='" + getBigintType() + '\'' + + ", useBytesMethodsForBlob=" + isUseBytesMethodsForBlob() + + '}'; + } + + public String getVendor() + { + return _vendor; + } + + public String getBlobType() + { + return _blobType; + } + + public void setBlobType(String blobType) + { + _blobType = blobType; + } + + public String getVarBinaryType() + { + return _varBinaryType; + } + + public void setVarBinaryType(String varBinaryType) + { + _varBinaryType = varBinaryType; + } + + public boolean isUseBytesMethodsForBlob() + { + return _useBytesMethodsForBlob; + } + + public void setUseBytesMethodsForBlob(boolean useBytesMethodsForBlob) + { + _useBytesMethodsForBlob = useBytesMethodsForBlob; + } + + public String getBigintType() + { + return _bigintType; + } + + public void setBigintType(String bigintType) + { + _bigintType = bigintType; + } +} diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java deleted file mode 100644 index 61ecb6748c..0000000000 --- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java +++ /dev/null @@ -1,522 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.store.jdbc; - - -import java.sql.Blob; -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CopyOnWriteArrayList; - -import org.apache.log4j.Logger; - -import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory; -import org.apache.qpid.server.store.AbstractJDBCMessageStore; -import org.apache.qpid.server.store.Event; -import org.apache.qpid.server.store.EventListener; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreProvider; -import org.apache.qpid.server.store.StorableMessageMetaData; -import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.store.StoreFuture; -import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.store.Transaction; -import org.apache.qpid.server.store.handler.DistributedTransactionHandler; -import org.apache.qpid.server.store.handler.MessageHandler; -import org.apache.qpid.server.store.handler.MessageInstanceHandler; -import org.apache.qpid.server.util.MapValueConverter; - -/** - * An implementation of a {@link org.apache.qpid.server.store.MessageStore} that uses a JDBC database as the persistence - * mechanism. - * - */ -public class JDBCMessageStore extends AbstractJDBCMessageStore implements MessageStoreProvider -{ - - private static final Logger _logger = Logger.getLogger(JDBCMessageStore.class); - - public static final String TYPE = "JDBC"; - public static final String CONNECTION_URL = "connectionUrl"; - public static final String CONNECTION_POOL_TYPE = "connectionPoolType"; - public static final String JDBC_BIG_INT_TYPE = "bigIntType"; - public static final String JDBC_BYTES_FOR_BLOB = "bytesForBlob"; - public static final String JDBC_VARBINARY_TYPE = "varbinaryType"; - public static final String JDBC_BLOB_TYPE = "blobType"; - - protected String _connectionURL; - private ConnectionProvider _connectionProvider; - - private final MessageStore _messageStoreFacade = new MessageStoreWrapper(); - - private static class JDBCDetails - { - private final String _vendor; - private String _blobType; - private String _varBinaryType; - private String _bigintType; - private boolean _useBytesMethodsForBlob; - - private JDBCDetails(String vendor, - String blobType, - String varBinaryType, - String bigIntType, - boolean useBytesMethodsForBlob) - { - _vendor = vendor; - setBlobType(blobType); - setVarBinaryType(varBinaryType); - setBigintType(bigIntType); - setUseBytesMethodsForBlob(useBytesMethodsForBlob); - } - - @Override - public boolean equals(Object o) - { - if (this == o) - { - return true; - } - if (o == null || getClass() != o.getClass()) - { - return false; - } - - JDBCDetails that = (JDBCDetails) o; - - if (!getVendor().equals(that.getVendor())) - { - return false; - } - - return true; - } - - @Override - public int hashCode() - { - return getVendor().hashCode(); - } - - @Override - public String toString() - { - return "JDBCDetails{" + - "vendor='" + getVendor() + '\'' + - ", blobType='" + getBlobType() + '\'' + - ", varBinaryType='" + getVarBinaryType() + '\'' + - ", bigIntType='" + getBigintType() + '\'' + - ", useBytesMethodsForBlob=" + isUseBytesMethodsForBlob() + - '}'; - } - - public String getVendor() - { - return _vendor; - } - - public String getBlobType() - { - return _blobType; - } - - public void setBlobType(String blobType) - { - _blobType = blobType; - } - - public String getVarBinaryType() - { - return _varBinaryType; - } - - public void setVarBinaryType(String varBinaryType) - { - _varBinaryType = varBinaryType; - } - - public boolean isUseBytesMethodsForBlob() - { - return _useBytesMethodsForBlob; - } - - public void setUseBytesMethodsForBlob(boolean useBytesMethodsForBlob) - { - _useBytesMethodsForBlob = useBytesMethodsForBlob; - } - - public String getBigintType() - { - return _bigintType; - } - - public void setBigintType(String bigintType) - { - _bigintType = bigintType; - } - } - - private static JDBCDetails DERBY_DETAILS = - new JDBCDetails("derby", - "blob", - "varchar(%d) for bit data", - "bigint", - false); - - private static JDBCDetails POSTGRESQL_DETAILS = - new JDBCDetails("postgresql", - "bytea", - "bytea", - "bigint", - true); - - private static JDBCDetails MYSQL_DETAILS = - new JDBCDetails("mysql", - "blob", - "varbinary(%d)", - "bigint", - false); - - - private static JDBCDetails SYBASE_DETAILS = - new JDBCDetails("sybase", - "image", - "varbinary(%d)", - "bigint", - false); - - - private static JDBCDetails ORACLE_DETAILS = - new JDBCDetails("oracle", - "blob", - "raw(%d)", - "number", - false); - - - private static Map<String, JDBCDetails> VENDOR_DETAILS = new HashMap<String,JDBCDetails>(); - - static - { - - addDetails(DERBY_DETAILS); - addDetails(POSTGRESQL_DETAILS); - addDetails(MYSQL_DETAILS); - addDetails(SYBASE_DETAILS); - addDetails(ORACLE_DETAILS); - } - - private static void addDetails(JDBCDetails details) - { - VENDOR_DETAILS.put(details.getVendor(), details); - } - - private String _blobType; - private String _varBinaryType; - private String _bigIntType; - private boolean _useBytesMethodsForBlob; - - private List<RecordedJDBCTransaction> _transactions = new CopyOnWriteArrayList<RecordedJDBCTransaction>(); - - - public JDBCMessageStore() - { - } - - protected Logger getLogger() - { - return _logger; - } - - protected String getSqlBlobType() - { - return _blobType; - } - - protected String getSqlVarBinaryType(int size) - { - return String.format(_varBinaryType, size); - } - - public String getSqlBigIntType() - { - return _bigIntType; - } - - @Override - protected void doClose() - { - try - { - while(!_transactions.isEmpty()) - { - RecordedJDBCTransaction txn = _transactions.get(0); - txn.abortTran(); - } - } - finally - { - try - { - _connectionProvider.close(); - } - catch (SQLException e) - { - throw new StoreException("Unable to close connection provider ", e); - } - } - } - - - protected Connection getConnection() throws SQLException - { - return _connectionProvider.getConnection(); - } - - - protected void implementationSpecificConfiguration(String name, Map<String, Object> storeSettings) - throws ClassNotFoundException, SQLException - { - _connectionURL = String.valueOf(storeSettings.get(CONNECTION_URL)); - Object poolAttribute = storeSettings.get(CONNECTION_POOL_TYPE); - - JDBCDetails details = null; - - String[] components = _connectionURL.split(":",3); - if(components.length >= 2) - { - String vendor = components[1]; - details = VENDOR_DETAILS.get(vendor); - } - - if(details == null) - { - getLogger().info("Do not recognize vendor from connection URL: " + _connectionURL); - - // TODO - is there a better default than derby - details = DERBY_DETAILS; - } - - String connectionPoolType = poolAttribute == null ? DefaultConnectionProviderFactory.TYPE : String.valueOf(poolAttribute); - - JDBCConnectionProviderFactory connectionProviderFactory = - JDBCConnectionProviderFactory.FACTORIES.get(connectionPoolType); - if(connectionProviderFactory == null) - { - _logger.warn("Unknown connection pool type: " + connectionPoolType + ". no connection pooling will be used"); - connectionProviderFactory = new DefaultConnectionProviderFactory(); - } - - _connectionProvider = connectionProviderFactory.getConnectionProvider(_connectionURL, storeSettings); - _blobType = MapValueConverter.getStringAttribute(JDBC_BLOB_TYPE, storeSettings, details.getBlobType()); - _varBinaryType = MapValueConverter.getStringAttribute(JDBC_VARBINARY_TYPE, storeSettings, details.getVarBinaryType()); - _useBytesMethodsForBlob = MapValueConverter.getBooleanAttribute(JDBC_BYTES_FOR_BLOB, storeSettings, details.isUseBytesMethodsForBlob()); - _bigIntType = MapValueConverter.getStringAttribute(JDBC_BIG_INT_TYPE, storeSettings, details.getBigintType()); - } - - @Override - protected void storedSizeChange(int contentSize) - { - } - - public String getStoreLocation() - { - return _connectionURL; - } - - @Override - protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException - { - if(_useBytesMethodsForBlob) - { - return rs.getBytes(col); - } - else - { - Blob dataAsBlob = rs.getBlob(col); - return dataAsBlob.getBytes(1,(int) dataAsBlob.length()); - - } - } - - @Override - protected String getBlobAsString(ResultSet rs, int col) throws SQLException - { - byte[] bytes; - if(_useBytesMethodsForBlob) - { - bytes = rs.getBytes(col); - return new String(bytes,UTF8_CHARSET); - } - else - { - Blob blob = rs.getBlob(col); - if(blob == null) - { - return null; - } - bytes = blob.getBytes(1, (int)blob.length()); - } - return new String(bytes, UTF8_CHARSET); - - } - - @Override - public Transaction newTransaction() - { - return new RecordedJDBCTransaction(); - } - - private class RecordedJDBCTransaction extends JDBCTransaction - { - private RecordedJDBCTransaction() - { - super(); - JDBCMessageStore.this._transactions.add(this); - } - - @Override - public void commitTran() - { - try - { - super.commitTran(); - } - finally - { - JDBCMessageStore.this._transactions.remove(this); - } - } - - @Override - public StoreFuture commitTranAsync() - { - try - { - return super.commitTranAsync(); - } - finally - { - JDBCMessageStore.this._transactions.remove(this); - } - } - - @Override - public void abortTran() - { - try - { - super.abortTran(); - } - finally - { - JDBCMessageStore.this._transactions.remove(this); - } - } - } - - @Override - public MessageStore getMessageStore() - { - return _messageStoreFacade; - } - - private class MessageStoreWrapper implements MessageStore - { - - @Override - public void openMessageStore(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings) - { - JDBCMessageStore.this.openMessageStore(parent, messageStoreSettings); - } - - @Override - public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(final T metaData) - { - return JDBCMessageStore.this.addMessage(metaData); - } - - @Override - public boolean isPersistent() - { - return JDBCMessageStore.this.isPersistent(); - } - - @Override - public Transaction newTransaction() - { - return JDBCMessageStore.this.newTransaction(); - } - - @Override - public void closeMessageStore() - { - JDBCMessageStore.this.closeMessageStore(); - } - - @Override - public void addEventListener(final EventListener eventListener, final Event... events) - { - JDBCMessageStore.this.addEventListener(eventListener, events); - } - - @Override - public void upgradeStoreStructure() throws StoreException - { - } - - @Override - public String getStoreLocation() - { - return JDBCMessageStore.this.getStoreLocation(); - } - - @Override - public void onDelete() - { - JDBCMessageStore.this.onDelete(); - } - - @Override - public void visitMessages(final MessageHandler handler) throws StoreException - { - JDBCMessageStore.this.visitMessages(handler); - } - - @Override - public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException - { - JDBCMessageStore.this.visitMessageInstances(handler); - } - - @Override - public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException - { - JDBCMessageStore.this.visitDistributedTransactions(handler); - } - } - -} diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhost/jdbc/JDBCVirtualHost.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhost/jdbc/JDBCVirtualHost.java index 1dd39a8696..85e8f89dbe 100644 --- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhost/jdbc/JDBCVirtualHost.java +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhost/jdbc/JDBCVirtualHost.java @@ -26,7 +26,7 @@ import org.apache.qpid.server.model.ManagedObject; import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.jdbc.JDBCMessageStore; +import org.apache.qpid.server.store.jdbc.GenericJDBCMessageStore; import org.apache.qpid.server.virtualhost.AbstractVirtualHost; @ManagedObject(category = false, type = JDBCVirtualHost.VIRTUAL_HOST_TYPE) @@ -45,6 +45,6 @@ public class JDBCVirtualHost extends AbstractVirtualHost<JDBCVirtualHost> @Override protected MessageStore createMessageStore() { - return new JDBCMessageStore().getMessageStore(); + return new GenericJDBCMessageStore(); } } diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java index 2108b4e09a..ab8f4554cb 100644 --- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java @@ -27,7 +27,7 @@ import org.apache.qpid.server.model.ManagedAttributeField; import org.apache.qpid.server.model.ManagedObject; import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; import org.apache.qpid.server.store.DurableConfigurationStore; -import org.apache.qpid.server.store.jdbc.JDBCMessageStore; +import org.apache.qpid.server.store.jdbc.GenericJDBCConfigurationStore; import org.apache.qpid.server.virtualhostnode.AbstractStandardVirtualHostNode; @ManagedObject(type = JDBCVirtualHostNodeImpl.VIRTUAL_HOST_NODE_TYPE, category = false ) @@ -62,7 +62,7 @@ public class JDBCVirtualHostNodeImpl extends AbstractStandardVirtualHostNode<JDB @Override protected DurableConfigurationStore createConfigurationStore() { - return new JDBCMessageStore(); + return new GenericJDBCConfigurationStore(); } @Override diff --git a/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java b/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java index 1f03b7f75f..8261e93347 100644 --- a/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java +++ b/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java @@ -52,7 +52,7 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase public void testOnDelete() throws Exception { - Set<String> expectedTables = JDBCMessageStore.MESSAGE_STORE_TABLE_NAMES; + Set<String> expectedTables = GenericJDBCMessageStore.MESSAGE_STORE_TABLE_NAMES; assertTablesExist(expectedTables, true); getStore().closeMessageStore(); assertTablesExist(expectedTables, true); @@ -65,7 +65,7 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase { _connectionURL = "jdbc:derby:memory:/" + getTestName() + ";create=true"; Map<String, Object> messageStoreSettings = new HashMap<String, Object>(); - messageStoreSettings.put(JDBCMessageStore.CONNECTION_URL, _connectionURL); + messageStoreSettings.put(GenericJDBCMessageStore.CONNECTION_URL, _connectionURL); return messageStoreSettings; } @@ -73,7 +73,7 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase @Override protected MessageStore createMessageStore() { - return (new JDBCMessageStore()).getMessageStore(); + return new GenericJDBCMessageStore(); } private void assertTablesExist(Set<String> expectedTables, boolean exists) throws SQLException |
