summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-06-18 20:51:43 +0000
committerKeith Wall <kwall@apache.org>2014-06-18 20:51:43 +0000
commit326b9560c14d1c30eb71c1396858791f9187d11e (patch)
tree34ba78548d48295e88b9e038f382bd8861f32500 /qpid/java
parent2622dda9c7d3267efd985b5ae5928b99063d2fa7 (diff)
downloadqpid-python-326b9560c14d1c30eb71c1396858791f9187d11e.tar.gz
QPID-5800: [Java Broker] Refactor Derby/JDBC message store implementations to separate message and config store implementations.
* Message store implementations can now be used in isolation, which is useful when the user is using a JSON VirtualHostNode with another persistent store implementation. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1603626 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCConfigurationStore.java1013
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java1127
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JdbcUtils.java86
-rw-r--r--qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java337
-rw-r--r--qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyConfigurationStore.java221
-rw-r--r--qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java482
-rw-r--r--qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyUtils.java165
-rw-r--r--qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhost/derby/DerbyVirtualHost.java2
-rw-r--r--qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhostnode/derby/DerbyVirtualHostNode.java4
-rw-r--r--qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreConfigurationTest.java4
-rw-r--r--qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java2
-rw-r--r--qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java2
-rw-r--r--qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java160
-rw-r--r--qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java286
-rw-r--r--qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java177
-rw-r--r--qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java197
-rw-r--r--qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java522
-rw-r--r--qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhost/jdbc/JDBCVirtualHost.java4
-rw-r--r--qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java4
-rw-r--r--qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java6
20 files changed, 2770 insertions, 2031 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCConfigurationStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCConfigurationStore.java
new file mode 100644
index 0000000000..fb256abbf5
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCConfigurationStore.java
@@ -0,0 +1,1013 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.store;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.JsonProcessingException;
+import org.codehaus.jackson.Version;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.JsonSerializer;
+import org.codehaus.jackson.map.Module;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializerProvider;
+import org.codehaus.jackson.map.module.SimpleModule;
+
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+
+public abstract class AbstractJDBCConfigurationStore implements MessageStoreProvider, DurableConfigurationStore
+{
+ private static final String CONFIGURATION_VERSION_TABLE_NAME = "QPID_CONFIG_VERSION";
+
+ private static final String CONFIGURED_OBJECTS_TABLE_NAME = "QPID_CONFIGURED_OBJECTS";
+ private static final String CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME = "QPID_CONFIGURED_OBJECT_HIERARCHY";
+
+ private static final int DEFAULT_CONFIG_VERSION = 0;
+
+ public static final Set<String> CONFIGURATION_STORE_TABLE_NAMES = new HashSet<String>(Arrays.asList(CONFIGURED_OBJECTS_TABLE_NAME, CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME));
+
+ private static final String SELECT_FROM_CONFIG_VERSION = "SELECT version FROM " + CONFIGURATION_VERSION_TABLE_NAME;
+ private static final String DROP_CONFIG_VERSION_TABLE = "DROP TABLE "+ CONFIGURATION_VERSION_TABLE_NAME;
+
+ private static final String INSERT_INTO_CONFIGURED_OBJECTS = "INSERT INTO " + CONFIGURED_OBJECTS_TABLE_NAME
+ + " ( id, object_type, attributes) VALUES (?,?,?)";
+ private static final String UPDATE_CONFIGURED_OBJECTS = "UPDATE " + CONFIGURED_OBJECTS_TABLE_NAME
+ + " set object_type =?, attributes = ? where id = ?";
+ private static final String DELETE_FROM_CONFIGURED_OBJECTS = "DELETE FROM " + CONFIGURED_OBJECTS_TABLE_NAME
+ + " where id = ?";
+ private static final String FIND_CONFIGURED_OBJECT = "SELECT object_type, attributes FROM " + CONFIGURED_OBJECTS_TABLE_NAME
+ + " where id = ?";
+ private static final String SELECT_FROM_CONFIGURED_OBJECTS = "SELECT id, object_type, attributes FROM " + CONFIGURED_OBJECTS_TABLE_NAME;
+
+
+ private static final String INSERT_INTO_CONFIGURED_OBJECT_HIERARCHY = "INSERT INTO " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME
+ + " ( child_id, parent_type, parent_id) VALUES (?,?,?)";
+
+ private static final String DELETE_FROM_CONFIGURED_OBJECT_HIERARCHY = "DELETE FROM " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME
+ + " where child_id = ?";
+ private static final String SELECT_FROM_CONFIGURED_OBJECT_HIERARCHY = "SELECT child_id, parent_type, parent_id FROM " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME;
+
+ private static final Module _module;
+ static
+ {
+ SimpleModule module= new SimpleModule("ConfiguredObjectSerializer", new Version(1,0,0,null));
+
+ final JsonSerializer<ConfiguredObject> serializer = new JsonSerializer<ConfiguredObject>()
+ {
+ @Override
+ public void serialize(final ConfiguredObject value,
+ final JsonGenerator jgen,
+ final SerializerProvider provider)
+ throws IOException, JsonProcessingException
+ {
+ jgen.writeString(value.getId().toString());
+ }
+ };
+ module.addSerializer(ConfiguredObject.class, serializer);
+
+ _module = module;
+ }
+
+ @Override
+ public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler)
+ {
+ checkConfigurationStoreOpen();
+
+ try
+ {
+ handler.begin();
+ doVisitAllConfiguredObjectRecords(handler);
+ handler.end();
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Cannot visit configured object records", e);
+ }
+ }
+
+ private void doVisitAllConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws SQLException
+ {
+ Connection conn = newAutoCommitConnection();
+ Map<UUID, ConfiguredObjectRecordImpl> configuredObjects = new HashMap<UUID, ConfiguredObjectRecordImpl>();
+ final ObjectMapper objectMapper = new ObjectMapper();
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS);
+ try
+ {
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+ while (rs.next())
+ {
+ String id = rs.getString(1);
+ String objectType = rs.getString(2);
+ String attributes = getBlobAsString(rs, 3);
+ final ConfiguredObjectRecordImpl configuredObjectRecord =
+ new ConfiguredObjectRecordImpl(UUID.fromString(id), objectType,
+ objectMapper.readValue(attributes, Map.class));
+ configuredObjects.put(configuredObjectRecord.getId(),configuredObjectRecord);
+
+ }
+ }
+ catch (JsonMappingException e)
+ {
+ throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ catch (JsonParseException e)
+ {
+ throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ catch (IOException e)
+ {
+ throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+ stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECT_HIERARCHY);
+ try
+ {
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+ while (rs.next())
+ {
+ UUID childId = UUID.fromString(rs.getString(1));
+ String parentType = rs.getString(2);
+ UUID parentId = UUID.fromString(rs.getString(3));
+
+ ConfiguredObjectRecordImpl child = configuredObjects.get(childId);
+ ConfiguredObjectRecordImpl parent = configuredObjects.get(parentId);
+
+ if(child != null && parent != null)
+ {
+ child.addParent(parentType, parent);
+ }
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ }
+ finally
+ {
+ conn.close();
+ }
+
+ for(ConfiguredObjectRecord record : configuredObjects.values())
+ {
+ boolean shouldContinue = handler.handle(record);
+ if (!shouldContinue)
+ {
+ break;
+ }
+ }
+ }
+
+ protected abstract void checkConfigurationStoreOpen();
+
+ protected void upgradeIfNecessary(ConfiguredObject<?> parent) throws StoreException
+ {
+ Connection connection = null;
+ try
+ {
+ connection = newConnection();
+
+ boolean tableExists = tableExists(CONFIGURATION_VERSION_TABLE_NAME, connection);
+ if(tableExists)
+ {
+ int configVersion = getConfigVersion(connection);
+ if (getLogger().isDebugEnabled())
+ {
+ getLogger().debug("Upgrader read existing config version " + configVersion);
+ }
+
+ switch(configVersion)
+ {
+
+ case 7:
+ upgradeFromV7(parent);
+ break;
+ default:
+ throw new UnsupportedOperationException("Cannot upgrade from configuration version : "
+ + configVersion);
+ }
+ }
+ }
+ catch (SQLException se)
+ {
+ throw new StoreException("Failed to upgrade database", se);
+ }
+ finally
+ {
+ JdbcUtils.closeConnection(connection, getLogger());
+ }
+
+ }
+
+ private void upgradeFromV7(ConfiguredObject<?> parent) throws SQLException
+ {
+ @SuppressWarnings("serial")
+ Map<String, String> defaultExchanges = new HashMap<String, String>()
+ {{
+ put("amq.direct", "direct");
+ put("amq.topic", "topic");
+ put("amq.fanout", "fanout");
+ put("amq.match", "headers");
+ }};
+
+ Connection connection = newConnection();
+ try
+ {
+ String virtualHostName = parent.getName();
+ UUID virtualHostId = UUIDGenerator.generateVhostUUID(virtualHostName);
+
+ String stringifiedConfigVersion = "0." + DEFAULT_CONFIG_VERSION;
+
+ boolean tableExists = tableExists(CONFIGURATION_VERSION_TABLE_NAME, connection);
+ if(tableExists)
+ {
+ int configVersion = getConfigVersion(connection);
+ if (getLogger().isDebugEnabled())
+ {
+ getLogger().debug("Upgrader read existing config version " + configVersion);
+ }
+
+ stringifiedConfigVersion = "0." + configVersion;
+ }
+
+ Map<String, Object> virtualHostAttributes = new HashMap<String, Object>();
+ virtualHostAttributes.put("modelVersion", stringifiedConfigVersion);
+ virtualHostAttributes.put("name", virtualHostName);
+
+ ConfiguredObjectRecord virtualHostRecord = new ConfiguredObjectRecordImpl(virtualHostId, "VirtualHost", virtualHostAttributes);
+ insertConfiguredObject(virtualHostRecord, connection);
+
+ if (getLogger().isDebugEnabled())
+ {
+ getLogger().debug("Upgrader created VirtualHost configuration entry with config version " + stringifiedConfigVersion);
+ }
+
+ Map<UUID,Map<String,Object>> bindingsToUpdate = new HashMap<UUID, Map<String, Object>>();
+ List<UUID> others = new ArrayList<UUID>();
+ final ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.registerModule(_module);
+
+ PreparedStatement stmt = connection.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS);
+ try
+ {
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+ while (rs.next())
+ {
+ UUID id = UUID.fromString(rs.getString(1));
+ String objectType = rs.getString(2);
+ if ("VirtualHost".equals(objectType))
+ {
+ continue;
+ }
+ Map<String,Object> attributes = objectMapper.readValue(getBlobAsString(rs, 3),Map.class);
+
+ if(objectType.endsWith("Binding"))
+ {
+ bindingsToUpdate.put(id,attributes);
+ }
+ else
+ {
+ if (objectType.equals("Exchange"))
+ {
+ defaultExchanges.remove((String)attributes.get("name"));
+ }
+ others.add(id);
+ }
+ }
+ }
+ catch (JsonMappingException e)
+ {
+ throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ catch (JsonParseException e)
+ {
+ throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ catch (IOException e)
+ {
+ throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ stmt = connection.prepareStatement(INSERT_INTO_CONFIGURED_OBJECT_HIERARCHY);
+ try
+ {
+ for (UUID id : others)
+ {
+ stmt.setString(1, id.toString());
+ stmt.setString(2, "VirtualHost");
+ stmt.setString(3, virtualHostId.toString());
+ stmt.execute();
+ }
+ for(Map.Entry<UUID, Map<String,Object>> bindingEntry : bindingsToUpdate.entrySet())
+ {
+ stmt.setString(1, bindingEntry.getKey().toString());
+ stmt.setString(2,"Queue");
+ stmt.setString(3, bindingEntry.getValue().remove("queue").toString());
+ stmt.execute();
+
+ stmt.setString(1, bindingEntry.getKey().toString());
+ stmt.setString(2,"Exchange");
+ stmt.setString(3, bindingEntry.getValue().remove("exchange").toString());
+ stmt.execute();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ for (Map.Entry<String, String> defaultExchangeEntry : defaultExchanges.entrySet())
+ {
+ UUID id = UUIDGenerator.generateExchangeUUID(defaultExchangeEntry.getKey(), virtualHostName);
+ Map<String, Object> exchangeAttributes = new HashMap<String, Object>();
+ exchangeAttributes.put("name", defaultExchangeEntry.getKey());
+ exchangeAttributes.put("type", defaultExchangeEntry.getValue());
+ exchangeAttributes.put("lifetimePolicy", "PERMANENT");
+ Map<String, ConfiguredObjectRecord> parents = Collections.singletonMap("VirtualHost", virtualHostRecord);
+ ConfiguredObjectRecord exchangeRecord = new org.apache.qpid.server.store.ConfiguredObjectRecordImpl(id, "Exchange", exchangeAttributes, parents);
+ insertConfiguredObject(exchangeRecord, connection);
+ }
+
+ stmt = connection.prepareStatement(UPDATE_CONFIGURED_OBJECTS);
+ try
+ {
+ for(Map.Entry<UUID, Map<String,Object>> bindingEntry : bindingsToUpdate.entrySet())
+ {
+ stmt.setString(1, "Binding");
+ byte[] attributesAsBytes = objectMapper.writeValueAsBytes(bindingEntry.getValue());
+
+ ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
+ stmt.setBinaryStream(2, bis, attributesAsBytes.length);
+ stmt.setString(3, bindingEntry.getKey().toString());
+ stmt.execute();
+ }
+ }
+ catch (JsonMappingException e)
+ {
+ throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ catch (JsonGenerationException e)
+ {
+ throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ catch (IOException e)
+ {
+ throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ if (tableExists)
+ {
+ dropConfigVersionTable(connection);
+ }
+
+ connection.commit();
+ }
+ catch(SQLException e)
+ {
+ try
+ {
+ connection.rollback();
+ }
+ catch(SQLException re)
+ {
+ }
+ throw e;
+ }
+ finally
+ {
+ connection.close();
+ }
+ }
+
+ protected abstract Logger getLogger();
+
+ protected abstract String getSqlBlobType();
+
+ protected abstract String getSqlVarBinaryType(int size);
+
+ protected abstract String getSqlBigIntType();
+
+
+ protected void createOrOpenConfigurationStoreDatabase() throws StoreException
+ {
+ Connection conn = null;
+ try
+ {
+ conn = newAutoCommitConnection();
+
+ createConfiguredObjectsTable(conn);
+ createConfiguredObjectHierarchyTable(conn);
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Unable to open configuration tables", e);
+ }
+ finally
+ {
+ JdbcUtils.closeConnection(conn, getLogger());
+ }
+ }
+
+ private void dropConfigVersionTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(CONFIGURATION_VERSION_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute(DROP_CONFIG_VERSION_TABLE);
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ }
+
+ private void createConfiguredObjectsTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(CONFIGURED_OBJECTS_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute("CREATE TABLE " + CONFIGURED_OBJECTS_TABLE_NAME
+ + " ( id VARCHAR(36) not null, object_type varchar(255), attributes "+getSqlBlobType()+", PRIMARY KEY (id))");
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ }
+
+ private void createConfiguredObjectHierarchyTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute("CREATE TABLE " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME
+ + " ( child_id VARCHAR(36) not null, parent_type varchar(255), parent_id VARCHAR(36), PRIMARY KEY (child_id, parent_type))");
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ }
+
+ protected boolean tableExists(final String tableName, final Connection conn) throws SQLException
+ {
+ return JdbcUtils.tableExists(tableName, conn);
+ }
+
+ private int getConfigVersion(Connection conn) throws SQLException
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ ResultSet rs = stmt.executeQuery(SELECT_FROM_CONFIG_VERSION);
+ try
+ {
+
+ if(rs.next())
+ {
+ return rs.getInt(1);
+ }
+ return DEFAULT_CONFIG_VERSION;
+ }
+ finally
+ {
+ rs.close();
+ }
+
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ }
+
+ @Override
+ public void create(ConfiguredObjectRecord object) throws StoreException
+ {
+ checkConfigurationStoreOpen();
+ try
+ {
+ Connection conn = newConnection();
+ try
+ {
+ insertConfiguredObject(object, conn);
+ conn.commit();
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Error creating ConfiguredObject " + object);
+ }
+ }
+
+ /**
+ * Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED
+ * isolation and with auto-commit transactions enabled.
+ */
+ protected Connection newAutoCommitConnection() throws SQLException
+ {
+ final Connection connection = newConnection();
+ try
+ {
+ connection.setAutoCommit(true);
+ }
+ catch (SQLException sqlEx)
+ {
+
+ try
+ {
+ connection.close();
+ }
+ finally
+ {
+ throw sqlEx;
+ }
+ }
+
+ return connection;
+ }
+
+ /**
+ * Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED
+ * isolation and with auto-commit transactions disabled.
+ */
+ protected Connection newConnection() throws SQLException
+ {
+ final Connection connection = getConnection();
+ try
+ {
+ connection.setAutoCommit(false);
+ connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
+ }
+ catch (SQLException sqlEx)
+ {
+ try
+ {
+ connection.close();
+ }
+ finally
+ {
+ throw sqlEx;
+ }
+ }
+ return connection;
+ }
+
+ protected abstract Connection getConnection() throws SQLException;
+
+ private void insertConfiguredObject(ConfiguredObjectRecord configuredObject, final Connection conn) throws StoreException
+ {
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
+ try
+ {
+ stmt.setString(1, configuredObject.getId().toString());
+ ResultSet rs = stmt.executeQuery();
+ boolean exists;
+ try
+ {
+ exists = rs.next();
+
+ }
+ finally
+ {
+ rs.close();
+ }
+ // If we don't have any data in the result set then we can add this configured object
+ if (!exists)
+ {
+ PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS);
+ try
+ {
+ insertStmt.setString(1, configuredObject.getId().toString());
+ insertStmt.setString(2, configuredObject.getType());
+ if(configuredObject.getAttributes() == null)
+ {
+ insertStmt.setNull(3, Types.BLOB);
+ }
+ else
+ {
+ final Map<String, Object> attributes = configuredObject.getAttributes();
+ final ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.registerModule(_module);
+ byte[] attributesAsBytes = objectMapper.writeValueAsBytes(attributes);
+
+ ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
+ insertStmt.setBinaryStream(3, bis, attributesAsBytes.length);
+ }
+ insertStmt.execute();
+ }
+ finally
+ {
+ insertStmt.close();
+ }
+
+ writeHierarchy(configuredObject, conn);
+ }
+
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ }
+ catch (JsonMappingException e)
+ {
+ throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
+ }
+ catch (JsonGenerationException e)
+ {
+ throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
+ }
+ catch (IOException e)
+ {
+ throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public UUID[] remove(ConfiguredObjectRecord... objects) throws StoreException
+ {
+ checkConfigurationStoreOpen();
+
+ Collection<UUID> removed = new ArrayList<UUID>(objects.length);
+ try
+ {
+
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ for(ConfiguredObjectRecord record : objects)
+ {
+ if(removeConfiguredObject(record.getId(), conn) != 0)
+ {
+ removed.add(record.getId());
+ }
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Error deleting of configured objects " + Arrays.asList(objects) + " from database: " + e.getMessage(), e);
+ }
+ return removed.toArray(new UUID[removed.size()]);
+ }
+
+ private int removeConfiguredObject(final UUID id, final Connection conn) throws SQLException
+ {
+ final int results;
+ PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECTS);
+ try
+ {
+ stmt.setString(1, id.toString());
+ results = stmt.executeUpdate();
+ }
+ finally
+ {
+ stmt.close();
+ }
+ stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECT_HIERARCHY);
+ try
+ {
+ stmt.setString(1, id.toString());
+ stmt.executeUpdate();
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ return results;
+ }
+
+ @Override
+ public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException
+ {
+ checkConfigurationStoreOpen();
+ try
+ {
+ Connection conn = newConnection();
+ try
+ {
+ for(ConfiguredObjectRecord record : records)
+ {
+ updateConfiguredObject(record, createIfNecessary, conn);
+ }
+ conn.commit();
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Error updating configured objects in database: " + e.getMessage(), e);
+ }
+ }
+
+ private void updateConfiguredObject(ConfiguredObjectRecord configuredObject,
+ boolean createIfNecessary,
+ Connection conn)
+ throws SQLException, StoreException
+ {
+ PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
+ try
+ {
+ stmt.setString(1, configuredObject.getId().toString());
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+ final ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.registerModule(_module);
+ if (rs.next())
+ {
+ PreparedStatement stmt2 = conn.prepareStatement(UPDATE_CONFIGURED_OBJECTS);
+ try
+ {
+ stmt2.setString(1, configuredObject.getType());
+ if (configuredObject.getAttributes() != null)
+ {
+ byte[] attributesAsBytes = objectMapper.writeValueAsBytes(
+ configuredObject.getAttributes());
+ ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
+ stmt2.setBinaryStream(2, bis, attributesAsBytes.length);
+ }
+ else
+ {
+ stmt2.setNull(2, Types.BLOB);
+ }
+ stmt2.setString(3, configuredObject.getId().toString());
+ stmt2.execute();
+ }
+ finally
+ {
+ stmt2.close();
+ }
+ }
+ else if(createIfNecessary)
+ {
+ PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS);
+ try
+ {
+ insertStmt.setString(1, configuredObject.getId().toString());
+ insertStmt.setString(2, configuredObject.getType());
+ if(configuredObject.getAttributes() == null)
+ {
+ insertStmt.setNull(3, Types.BLOB);
+ }
+ else
+ {
+ final Map<String, Object> attributes = configuredObject.getAttributes();
+ byte[] attributesAsBytes = objectMapper.writeValueAsBytes(attributes);
+ ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
+ insertStmt.setBinaryStream(3, bis, attributesAsBytes.length);
+ }
+ insertStmt.execute();
+ }
+ finally
+ {
+ insertStmt.close();
+ }
+ writeHierarchy(configuredObject, conn);
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ catch (JsonMappingException e)
+ {
+ throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
+ }
+ catch (JsonGenerationException e)
+ {
+ throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
+ }
+ catch (IOException e)
+ {
+ throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+
+ private void writeHierarchy(final ConfiguredObjectRecord configuredObject, final Connection conn) throws SQLException, StoreException
+ {
+ PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECT_HIERARCHY);
+ try
+ {
+ for(Map.Entry<String,ConfiguredObjectRecord> parentEntry : configuredObject.getParents().entrySet())
+ {
+ insertStmt.setString(1, configuredObject.getId().toString());
+ insertStmt.setString(2, parentEntry.getKey());
+ insertStmt.setString(3, parentEntry.getValue().getId().toString());
+
+ insertStmt.execute();
+ }
+ }
+ finally
+ {
+ insertStmt.close();
+ }
+ }
+
+ protected abstract String getBlobAsString(ResultSet rs, int col) throws SQLException;
+
+ @Override
+ public void onDelete()
+ {
+ // TODO should probably check we are closed
+ try
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ List<String> tables = new ArrayList<String>();
+ tables.addAll(CONFIGURATION_STORE_TABLE_NAMES);
+
+ for (String tableName : tables)
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute("DROP TABLE " + tableName);
+ }
+ catch(SQLException e)
+ {
+ getLogger().warn("Failed to drop table '" + tableName + "' :" + e);
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch(SQLException e)
+ {
+ getLogger().error("Exception while deleting store tables", e);
+ }
+ }
+
+ private static final class ConfiguredObjectRecordImpl implements ConfiguredObjectRecord
+ {
+
+ private final UUID _id;
+ private final String _type;
+ private final Map<String, Object> _attributes;
+ private final Map<String, ConfiguredObjectRecord> _parents = new HashMap<String, ConfiguredObjectRecord>();
+
+ private ConfiguredObjectRecordImpl(final UUID id,
+ final String type,
+ final Map<String, Object> attributes)
+ {
+ _id = id;
+ _type = type;
+ _attributes = Collections.unmodifiableMap(attributes);
+ }
+
+ @Override
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ @Override
+ public String getType()
+ {
+ return _type;
+ }
+
+ private void addParent(String parentType, ConfiguredObjectRecord parent)
+ {
+ _parents.put(parentType, parent);
+ }
+
+ @Override
+ public Map<String, Object> getAttributes()
+ {
+ return _attributes;
+ }
+
+ @Override
+ public Map<String, ConfiguredObjectRecord> getParents()
+ {
+ return Collections.unmodifiableMap(_parents);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ConfiguredObjectRecordImpl [_id=" + _id + ", _type=" + _type + ", _attributes=" + _attributes + ", _parents="
+ + _parents + "]";
+ }
+ }
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index 24563bae61..7487315000 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -24,54 +24,32 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
-import java.sql.Types;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
-import org.codehaus.jackson.JsonGenerationException;
-import org.codehaus.jackson.JsonGenerator;
-import org.codehaus.jackson.JsonParseException;
-import org.codehaus.jackson.JsonProcessingException;
-import org.codehaus.jackson.Version;
-import org.codehaus.jackson.map.JsonMappingException;
-import org.codehaus.jackson.map.JsonSerializer;
-import org.codehaus.jackson.map.Module;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.SerializerProvider;
-import org.codehaus.jackson.map.module.SimpleModule;
import org.apache.qpid.server.message.EnqueueableMessage;
-import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.MessageMetaDataType;
-import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
import org.apache.qpid.server.store.handler.MessageHandler;
import org.apache.qpid.server.store.handler.MessageInstanceHandler;
-abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, DurableConfigurationStore
+public abstract class AbstractJDBCMessageStore implements MessageStore
{
private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION";
- private static final String CONFIGURATION_VERSION_TABLE_NAME = "QPID_CONFIG_VERSION";
private static final String QUEUE_ENTRY_TABLE_NAME = "QPID_QUEUE_ENTRIES";
@@ -82,16 +60,10 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
private static final String XID_TABLE_NAME = "QPID_XIDS";
private static final String XID_ACTIONS_TABLE_NAME = "QPID_XID_ACTIONS";
- private static final String CONFIGURED_OBJECTS_TABLE_NAME = "QPID_CONFIGURED_OBJECTS";
- private static final String CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME = "QPID_CONFIGURED_OBJECT_HIERARCHY";
-
- private static final int DEFAULT_CONFIG_VERSION = 0;
-
- public static final Set<String> CONFIGURATION_STORE_TABLE_NAMES = new HashSet<String>(Arrays.asList(CONFIGURED_OBJECTS_TABLE_NAME, CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME));
public static final Set<String> MESSAGE_STORE_TABLE_NAMES = new HashSet<String>(Arrays.asList(DB_VERSION_TABLE_NAME,
META_DATA_TABLE_NAME, MESSAGE_CONTENT_TABLE_NAME,
- QUEUE_ENTRY_TABLE_NAME,
- XID_TABLE_NAME, XID_ACTIONS_TABLE_NAME));
+ QUEUE_ENTRY_TABLE_NAME,
+ XID_TABLE_NAME, XID_ACTIONS_TABLE_NAME));
private static final int DB_VERSION = 8;
@@ -103,19 +75,15 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
private static final String UPDATE_DB_VERSION = "UPDATE " + DB_VERSION_TABLE_NAME + " SET version = ?";
- private static final String SELECT_FROM_CONFIG_VERSION = "SELECT version FROM " + CONFIGURATION_VERSION_TABLE_NAME;
- private static final String DROP_CONFIG_VERSION_TABLE = "DROP TABLE "+ CONFIGURATION_VERSION_TABLE_NAME;
-
-
private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_id, message_id) values (?,?)";
private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_id = ? AND message_id =?";
private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_id, message_id FROM " + QUEUE_ENTRY_TABLE_NAME + " ORDER BY queue_id, message_id";
private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME
- + "( message_id, content ) values (?, ?)";
+ + "( message_id, content ) values (?, ?)";
private static final String SELECT_FROM_MESSAGE_CONTENT = "SELECT content FROM " + MESSAGE_CONTENT_TABLE_NAME
- + " WHERE message_id = ?";
+ + " WHERE message_id = ?";
private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME
- + " WHERE message_id = ?";
+ + " WHERE message_id = ?";
private static final String INSERT_INTO_META_DATA = "INSERT INTO " + META_DATA_TABLE_NAME + "( message_id , meta_data ) values (?, ?)";
private static final String SELECT_FROM_META_DATA =
@@ -126,282 +94,63 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
private static final String INSERT_INTO_XIDS =
"INSERT INTO "+ XID_TABLE_NAME +" ( format, global_id, branch_id ) values (?, ?, ?)";
private static final String DELETE_FROM_XIDS = "DELETE FROM " + XID_TABLE_NAME
- + " WHERE format = ? and global_id = ? and branch_id = ?";
+ + " WHERE format = ? and global_id = ? and branch_id = ?";
private static final String SELECT_ALL_FROM_XIDS = "SELECT format, global_id, branch_id FROM " + XID_TABLE_NAME;
private static final String INSERT_INTO_XID_ACTIONS =
"INSERT INTO "+ XID_ACTIONS_TABLE_NAME +" ( format, global_id, branch_id, action_type, " +
"queue_id, message_id ) values (?,?,?,?,?,?) ";
private static final String DELETE_FROM_XID_ACTIONS = "DELETE FROM " + XID_ACTIONS_TABLE_NAME
- + " WHERE format = ? and global_id = ? and branch_id = ?";
+ + " WHERE format = ? and global_id = ? and branch_id = ?";
private static final String SELECT_ALL_FROM_XID_ACTIONS =
"SELECT action_type, queue_id, message_id FROM " + XID_ACTIONS_TABLE_NAME +
" WHERE format = ? and global_id = ? and branch_id = ?";
- private static final String INSERT_INTO_CONFIGURED_OBJECTS = "INSERT INTO " + CONFIGURED_OBJECTS_TABLE_NAME
- + " ( id, object_type, attributes) VALUES (?,?,?)";
- private static final String UPDATE_CONFIGURED_OBJECTS = "UPDATE " + CONFIGURED_OBJECTS_TABLE_NAME
- + " set object_type =?, attributes = ? where id = ?";
- private static final String DELETE_FROM_CONFIGURED_OBJECTS = "DELETE FROM " + CONFIGURED_OBJECTS_TABLE_NAME
- + " where id = ?";
- private static final String FIND_CONFIGURED_OBJECT = "SELECT object_type, attributes FROM " + CONFIGURED_OBJECTS_TABLE_NAME
- + " where id = ?";
- private static final String SELECT_FROM_CONFIGURED_OBJECTS = "SELECT id, object_type, attributes FROM " + CONFIGURED_OBJECTS_TABLE_NAME;
-
-
- private static final String INSERT_INTO_CONFIGURED_OBJECT_HIERARCHY = "INSERT INTO " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME
- + " ( child_id, parent_type, parent_id) VALUES (?,?,?)";
-
- private static final String DELETE_FROM_CONFIGURED_OBJECT_HIERARCHY = "DELETE FROM " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME
- + " where child_id = ?";
- private static final String SELECT_FROM_CONFIGURED_OBJECT_HIERARCHY = "SELECT child_id, parent_type, parent_id FROM " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME;
-
- protected static final Charset UTF8_CHARSET = Charset.forName("UTF-8");
-
-
- private static final Module _module;
- static
- {
- SimpleModule module= new SimpleModule("ConfiguredObjectSerializer", new Version(1,0,0,null));
-
- final JsonSerializer<ConfiguredObject> serializer = new JsonSerializer<ConfiguredObject>()
- {
- @Override
- public void serialize(final ConfiguredObject value,
- final JsonGenerator jgen,
- final SerializerProvider provider)
- throws IOException, JsonProcessingException
- {
- jgen.writeString(value.getId().toString());
- }
- };
- module.addSerializer(ConfiguredObject.class, serializer);
-
- _module = module;
- }
protected final EventManager _eventManager = new EventManager();
- private final AtomicBoolean _messageStoreOpen = new AtomicBoolean();
- private final AtomicBoolean _configurationStoreOpen = new AtomicBoolean();
+ protected abstract boolean isMessageStoreOpen();
- private boolean _initialized;
+ protected abstract void checkMessageStoreOpen();
- @Override
- public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings)
+ protected void setMaximumMessageId()
{
- if (_configurationStoreOpen.compareAndSet(false, true))
+ visitMessages(new MessageHandler()
{
- initialiseIfNecessary(parent.getName(), storeSettings);
- try
- {
- createOrOpenConfigurationStoreDatabase();
- upgradeIfVersionTableExists(parent);
- }
- catch(SQLException e)
- {
- throw new StoreException("Cannot create databases or upgrade", e);
- }
- }
- }
-
- private void initialiseIfNecessary(String virtualHostName, Map<String, Object> storeSettings)
- {
- if (!_initialized)
- {
- try
- {
- implementationSpecificConfiguration(virtualHostName, storeSettings);
- }
- catch (ClassNotFoundException e)
- {
- throw new StoreException("Cannot find driver class", e);
- }
- catch (SQLException e)
- {
- throw new StoreException("Unexpected exception occured", e);
- }
- _initialized = true;
- }
- }
-
- @Override
- public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler)
- {
- checkConfigurationStoreOpen();
-
- try
- {
- handler.begin();
- doVisitAllConfiguredObjectRecords(handler);
- handler.end();
- }
- catch (SQLException e)
- {
- throw new StoreException("Cannot visit configured object records", e);
- }
-
- }
-
- private void doVisitAllConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws SQLException
- {
- Connection conn = newAutoCommitConnection();
- Map<UUID, ConfiguredObjectRecordImpl> configuredObjects = new HashMap<UUID, ConfiguredObjectRecordImpl>();
- final ObjectMapper objectMapper = new ObjectMapper();
- try
- {
- PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS);
- try
- {
- ResultSet rs = stmt.executeQuery();
- try
- {
- while (rs.next())
- {
- String id = rs.getString(1);
- String objectType = rs.getString(2);
- String attributes = getBlobAsString(rs, 3);
- final ConfiguredObjectRecordImpl configuredObjectRecord =
- new ConfiguredObjectRecordImpl(UUID.fromString(id), objectType,
- objectMapper.readValue(attributes, Map.class));
- configuredObjects.put(configuredObjectRecord.getId(),configuredObjectRecord);
-
- }
- }
- catch (JsonMappingException e)
- {
- throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
- }
- catch (JsonParseException e)
- {
- throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
- }
- catch (IOException e)
- {
- throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
- stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECT_HIERARCHY);
- try
+ @Override
+ public boolean handle(StoredMessage<?> storedMessage)
{
- ResultSet rs = stmt.executeQuery();
- try
- {
- while (rs.next())
- {
- UUID childId = UUID.fromString(rs.getString(1));
- String parentType = rs.getString(2);
- UUID parentId = UUID.fromString(rs.getString(3));
-
- ConfiguredObjectRecordImpl child = configuredObjects.get(childId);
- ConfiguredObjectRecordImpl parent = configuredObjects.get(parentId);
-
- if(child != null && parent != null)
- {
- child.addParent(parentType, parent);
- }
- }
- }
- finally
+ long id = storedMessage.getMessageNumber();
+ if (_messageId.get() < id)
{
- rs.close();
+ _messageId.set(id);
}
+ return true;
}
- finally
- {
- stmt.close();
- }
-
- }
- finally
- {
- conn.close();
- }
-
- for(ConfiguredObjectRecord record : configuredObjects.values())
- {
- boolean shoudlContinue = handler.handle(record);
- if (!shoudlContinue)
- {
- break;
- }
- }
- }
-
- private void checkConfigurationStoreOpen()
- {
- if (!_configurationStoreOpen.get())
- {
- throw new IllegalStateException("Configuration store is not open");
- }
+ });
}
- private void checkMessageStoreOpen()
+ protected void upgrade(ConfiguredObject<?> parent) throws StoreException
{
- if (!_messageStoreOpen.get())
- {
- throw new IllegalStateException("Message store is not open");
- }
- }
-
- private void upgradeIfVersionTableExists(ConfiguredObject<?> parent)
- throws SQLException {
- Connection conn = newAutoCommitConnection();
+ Connection conn = null;
try
{
+ conn = newAutoCommitConnection();
if (tableExists(DB_VERSION_TABLE_NAME, conn))
{
upgradeIfNecessary(parent);
}
}
- finally
+ catch (SQLException e)
{
- if (conn != null)
- {
- conn.close();
- }
+ throw new StoreException("Failed to upgrade database", e);
}
- }
-
- public void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings)
- {
- if (_messageStoreOpen.compareAndSet(false, true))
+ finally
{
- initialiseIfNecessary(parent.getName(), messageStoreSettings);
- try
- {
- createOrOpenMessageStoreDatabase();
- upgradeIfNecessary(parent);
-
- visitMessages(new MessageHandler()
- {
- @Override
- public boolean handle(StoredMessage<?> storedMessage)
- {
- long id = storedMessage.getMessageNumber();
- if (_messageId.get() < id)
- {
- _messageId.set(id);
- }
- return true;
- }
- });
- }
- catch (SQLException e)
- {
- throw new StoreException("Unable to activate message store ", e);
- }
+ JdbcUtils.closeConnection(conn, getLogger());
}
}
- protected void upgradeIfNecessary(ConfiguredObject<?> parent) throws SQLException
+ private void upgradeIfNecessary(ConfiguredObject<?> parent) throws SQLException
{
Connection conn = newAutoCommitConnection();
try
@@ -423,7 +172,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
case 6:
upgradeFromV6();
case 7:
- upgradeFromV7(parent);
+ upgradeFromV7();
case DB_VERSION:
return;
default:
@@ -447,214 +196,13 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
}
- private void upgradeFromV6() throws SQLException
+ private void upgradeFromV7()
{
- updateDbVersion(7);
}
-
- private void upgradeFromV7(ConfiguredObject<?> parent) throws SQLException
+ private void upgradeFromV6() throws SQLException
{
- @SuppressWarnings("serial")
- Map<String, String> defaultExchanges = new HashMap<String, String>()
- {{
- put("amq.direct", "direct");
- put("amq.topic", "topic");
- put("amq.fanout", "fanout");
- put("amq.match", "headers");
- }};
-
- Connection connection = newConnection();
- try
- {
- String virtualHostName = parent.getName();
- UUID virtualHostId = UUIDGenerator.generateVhostUUID(virtualHostName);
-
- String stringifiedConfigVersion = BrokerModel.MODEL_VERSION;
- boolean tableExists = tableExists(CONFIGURATION_VERSION_TABLE_NAME, connection);
- if(tableExists)
- {
- int configVersion = getConfigVersion(connection);
- if (getLogger().isDebugEnabled())
- {
- getLogger().debug("Upgrader read existing config version " + configVersion);
- }
-
- stringifiedConfigVersion = "0." + configVersion;
- }
-
- Map<String, Object> virtualHostAttributes = new HashMap<String, Object>();
- virtualHostAttributes.put("modelVersion", stringifiedConfigVersion);
- virtualHostAttributes.put("name", virtualHostName);
-
- ConfiguredObjectRecord virtualHostRecord = new ConfiguredObjectRecordImpl(virtualHostId, "VirtualHost", virtualHostAttributes);
- insertConfiguredObject(virtualHostRecord, connection);
-
- if (getLogger().isDebugEnabled())
- {
- getLogger().debug("Upgrader created VirtualHost configuration entry with config version " + stringifiedConfigVersion);
- }
-
- Map<UUID,Map<String,Object>> bindingsToUpdate = new HashMap<UUID, Map<String, Object>>();
- List<UUID> others = new ArrayList<UUID>();
- final ObjectMapper objectMapper = new ObjectMapper();
- objectMapper.registerModule(_module);
-
- PreparedStatement stmt = connection.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS);
- try
- {
- ResultSet rs = stmt.executeQuery();
- try
- {
- while (rs.next())
- {
- UUID id = UUID.fromString(rs.getString(1));
- String objectType = rs.getString(2);
- if ("VirtualHost".equals(objectType))
- {
- continue;
- }
- Map<String,Object> attributes = objectMapper.readValue(getBlobAsString(rs, 3),Map.class);
-
- if(objectType.endsWith("Binding"))
- {
- bindingsToUpdate.put(id,attributes);
- }
- else
- {
- if (objectType.equals("Exchange"))
- {
- defaultExchanges.remove((String)attributes.get("name"));
- }
- others.add(id);
- }
- }
- }
- catch (JsonMappingException e)
- {
- throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
- }
- catch (JsonParseException e)
- {
- throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
- }
- catch (IOException e)
- {
- throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
-
- stmt = connection.prepareStatement(INSERT_INTO_CONFIGURED_OBJECT_HIERARCHY);
- try
- {
- for (UUID id : others)
- {
- stmt.setString(1, id.toString());
- stmt.setString(2, "VirtualHost");
- stmt.setString(3, virtualHostId.toString());
- stmt.execute();
- }
- for(Map.Entry<UUID, Map<String,Object>> bindingEntry : bindingsToUpdate.entrySet())
- {
- stmt.setString(1, bindingEntry.getKey().toString());
- stmt.setString(2,"Queue");
- stmt.setString(3, bindingEntry.getValue().remove("queue").toString());
- stmt.execute();
-
- stmt.setString(1, bindingEntry.getKey().toString());
- stmt.setString(2,"Exchange");
- stmt.setString(3, bindingEntry.getValue().remove("exchange").toString());
- stmt.execute();
- }
- }
- finally
- {
- stmt.close();
- }
-
- for (Map.Entry<String, String> defaultExchangeEntry : defaultExchanges.entrySet())
- {
- UUID id = UUIDGenerator.generateExchangeUUID(defaultExchangeEntry.getKey(), virtualHostName);
- Map<String, Object> exchangeAttributes = new HashMap<String, Object>();
- exchangeAttributes.put("name", defaultExchangeEntry.getKey());
- exchangeAttributes.put("type", defaultExchangeEntry.getValue());
- exchangeAttributes.put("lifetimePolicy", "PERMANENT");
- Map<String, ConfiguredObjectRecord> parents = Collections.singletonMap("VirtualHost", virtualHostRecord);
- ConfiguredObjectRecord exchangeRecord = new org.apache.qpid.server.store.ConfiguredObjectRecordImpl(id, "Exchange", exchangeAttributes, parents);
- insertConfiguredObject(exchangeRecord, connection);
- }
-
- stmt = connection.prepareStatement(UPDATE_CONFIGURED_OBJECTS);
- try
- {
- for(Map.Entry<UUID, Map<String,Object>> bindingEntry : bindingsToUpdate.entrySet())
- {
- stmt.setString(1, "Binding");
- byte[] attributesAsBytes = objectMapper.writeValueAsBytes(bindingEntry.getValue());
-
- ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
- stmt.setBinaryStream(2, bis, attributesAsBytes.length);
- stmt.setString(3, bindingEntry.getKey().toString());
- stmt.execute();
- }
- }
- catch (JsonMappingException e)
- {
- throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
- }
- catch (JsonGenerationException e)
- {
- throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
- }
- catch (IOException e)
- {
- throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
- }
- finally
- {
- stmt.close();
- }
- stmt = connection.prepareStatement(UPDATE_DB_VERSION);
- try
- {
- stmt.setInt(1, 8);
- stmt.execute();
- }
- finally
- {
- stmt.close();
- }
-
- if (tableExists)
- {
- dropConfigVersionTable(connection);
- }
-
- connection.commit();
- }
- catch(SQLException e)
- {
- try
- {
- connection.rollback();
- }
- catch(SQLException re)
- {
- }
- throw e;
- }
- finally
- {
- connection.close();
- }
+ updateDbVersion(7);
}
private void updateDbVersion(int newVersion) throws SQLException
@@ -680,44 +228,36 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
}
}
- protected abstract void implementationSpecificConfiguration(String name, Map<String, Object> messageStoreSettings) throws ClassNotFoundException, SQLException;
+ protected abstract Logger getLogger();
- abstract protected Logger getLogger();
+ protected abstract String getSqlBlobType();
- abstract protected String getSqlBlobType();
-
- abstract protected String getSqlVarBinaryType(int size);
-
- abstract protected String getSqlBigIntType();
-
- @Override
- public void upgradeStoreStructure() throws StoreException
- {
- // TODO acquire connection to the database using the attribute of the parents,
- // run the upgrader in a transaction, close the connection.
- }
-
- protected void createOrOpenMessageStoreDatabase() throws SQLException
- {
- Connection conn = newAutoCommitConnection();
+ protected abstract String getSqlVarBinaryType(int size);
- createVersionTable(conn);
- createQueueEntryTable(conn);
- createMetaDataTable(conn);
- createMessageContentTable(conn);
- createXidTable(conn);
- createXidActionTable(conn);
- conn.close();
- }
+ protected abstract String getSqlBigIntType();
- protected void createOrOpenConfigurationStoreDatabase() throws SQLException
+ protected void createOrOpenMessageStoreDatabase() throws StoreException
{
- Connection conn = newAutoCommitConnection();
-
- createConfiguredObjectsTable(conn);
- createConfiguredObjectHierarchyTable(conn);
+ Connection conn = null;
+ try
+ {
+ conn = newAutoCommitConnection();
- conn.close();
+ createVersionTable(conn);
+ createQueueEntryTable(conn);
+ createMetaDataTable(conn);
+ createMessageContentTable(conn);
+ createXidTable(conn);
+ createXidActionTable(conn);
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Failed to create message store tables", e);
+ }
+ finally
+ {
+ JdbcUtils.closeConnection(conn, getLogger());
+ }
}
private void createVersionTable(final Connection conn) throws SQLException
@@ -747,56 +287,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
}
}
- private void dropConfigVersionTable(final Connection conn) throws SQLException
- {
- if(!tableExists(CONFIGURATION_VERSION_TABLE_NAME, conn))
- {
- Statement stmt = conn.createStatement();
- try
- {
- stmt.execute(DROP_CONFIG_VERSION_TABLE);
- }
- finally
- {
- stmt.close();
- }
- }
- }
-
- private void createConfiguredObjectsTable(final Connection conn) throws SQLException
- {
- if(!tableExists(CONFIGURED_OBJECTS_TABLE_NAME, conn))
- {
- Statement stmt = conn.createStatement();
- try
- {
- stmt.execute("CREATE TABLE " + CONFIGURED_OBJECTS_TABLE_NAME
- + " ( id VARCHAR(36) not null, object_type varchar(255), attributes "+getSqlBlobType()+", PRIMARY KEY (id))");
- }
- finally
- {
- stmt.close();
- }
- }
- }
-
- private void createConfiguredObjectHierarchyTable(final Connection conn) throws SQLException
- {
- if(!tableExists(CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME, conn))
- {
- Statement stmt = conn.createStatement();
- try
- {
- stmt.execute("CREATE TABLE " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME
- + " ( child_id VARCHAR(36) not null, parent_type varchar(255), parent_id VARCHAR(36), PRIMARY KEY (child_id, parent_type))");
- }
- finally
- {
- stmt.close();
- }
- }
- }
-
private void createQueueEntryTable(final Connection conn) throws SQLException
{
if(!tableExists(QUEUE_ENTRY_TABLE_NAME, conn))
@@ -805,7 +295,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
try
{
stmt.execute("CREATE TABLE "+ QUEUE_ENTRY_TABLE_NAME +" ( queue_id varchar(36) not null, message_id "
- + getSqlBigIntType() + " not null, PRIMARY KEY (queue_id, message_id) )");
+ + getSqlBigIntType() + " not null, PRIMARY KEY (queue_id, message_id) )");
}
finally
{
@@ -909,81 +399,10 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
protected boolean tableExists(final String tableName, final Connection conn) throws SQLException
{
- DatabaseMetaData metaData = conn.getMetaData();
- ResultSet rs = metaData.getTables(null, null, "%", null);
-
- try
- {
-
- while(rs.next())
- {
- final String table = rs.getString(3);
- if(tableName.equalsIgnoreCase(table))
- {
- return true;
- }
- }
- return false;
- }
- finally
- {
- rs.close();
- }
- }
-
- private int getConfigVersion(Connection conn) throws SQLException
- {
- Statement stmt = conn.createStatement();
- try
- {
- ResultSet rs = stmt.executeQuery(SELECT_FROM_CONFIG_VERSION);
- try
- {
-
- if(rs.next())
- {
- return rs.getInt(1);
- }
- return DEFAULT_CONFIG_VERSION;
- }
- finally
- {
- rs.close();
- }
-
- }
- finally
- {
- stmt.close();
- }
-
- }
-
- public void closeMessageStore()
- {
- if (_messageStoreOpen.compareAndSet(true, false))
- {
- if (!_configurationStoreOpen.get())
- {
- doClose();
- }
- }
+ return JdbcUtils.tableExists(tableName, conn);
}
@Override
- public void closeConfigurationStore()
- {
- if (_configurationStoreOpen.compareAndSet(true, false))
- {
- if (!_messageStoreOpen.get())
- {
- doClose();
- }
- }
- }
-
- protected abstract void doClose();
-
public StoredMessage addMessage(StorableMessageMetaData metaData)
{
checkMessageStoreOpen();
@@ -1058,30 +477,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
}
-
- @Override
- public void create(ConfiguredObjectRecord object) throws StoreException
- {
- checkConfigurationStoreOpen();
- try
- {
- Connection conn = newConnection();
- try
- {
- insertConfiguredObject(object, conn);
- conn.commit();
- }
- finally
- {
- conn.close();
- }
- }
- catch (SQLException e)
- {
- throw new StoreException("Error creating ConfiguredObject " + object);
- }
- }
-
/**
* Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED
* isolation and with auto-commit transactions enabled.
@@ -1137,6 +532,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
protected abstract Connection getConnection() throws SQLException;
+ @Override
public Transaction newTransaction()
{
checkMessageStoreOpen();
@@ -1154,13 +550,13 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
if (getLogger().isDebugEnabled())
{
getLogger().debug("Enqueuing message "
- + messageId
- + " on queue "
- + queue.getName()
- + " with id " + queue.getId()
- + " [Connection"
- + conn
- + "]");
+ + messageId
+ + " on queue "
+ + queue.getName()
+ + " with id " + queue.getId()
+ + " [Connection"
+ + conn
+ + "]");
}
PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY);
@@ -1180,7 +576,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
{
getLogger().error("Failed to enqueue: " + e.getMessage(), e);
throw new StoreException("Error writing enqueued message with id " + messageId + " for queue " + queue.getName() + " with id " + queue.getId()
- + " to database", e);
+ + " to database", e);
}
}
@@ -1205,13 +601,13 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
if(results != 1)
{
throw new StoreException("Unable to find message with id " + messageId + " on queue " + queue.getName()
- + " with id " + queue.getId());
+ + " with id " + queue.getId());
}
if (getLogger().isDebugEnabled())
{
getLogger().debug("Dequeuing message " + messageId + " on queue " + queue.getName()
- + " with id " + queue.getId());
+ + " with id " + queue.getId());
}
}
finally
@@ -1224,7 +620,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
{
getLogger().error("Failed to dequeue: " + e.getMessage(), e);
throw new StoreException("Error deleting enqueued message with id " + messageId + " for queue " + queue.getName()
- + " with id " + queue.getId() + " from database", e);
+ + " with id " + queue.getId() + " from database", e);
}
}
@@ -1420,7 +816,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
}
private void storeMetaData(Connection conn, long messageId, StorableMessageMetaData metaData)
- throws SQLException
+ throws SQLException
{
if(getLogger().isDebugEnabled())
{
@@ -1606,12 +1002,12 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
}
catch (SQLException e)
{
- closeConnection(conn);
+ JdbcUtils.closeConnection(conn, getLogger());
throw new StoreException("Error adding content for message " + messageId + ": " + e.getMessage(), e);
}
finally
{
- closePreparedStatement(stmt);
+ JdbcUtils.closePreparedStatement(stmt, getLogger());
}
}
@@ -1640,7 +1036,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
if (offset > size)
{
throw new StoreException("Offset " + offset + " is greater than message size " + size
- + " for message id " + messageId + "!");
+ + " for message id " + messageId + "!");
}
@@ -1662,13 +1058,14 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
}
finally
{
- closePreparedStatement(stmt);
- closeConnection(conn);
+ JdbcUtils.closePreparedStatement(stmt, getLogger());
+ JdbcUtils.closeConnection(conn, getLogger());
}
}
+ @Override
public boolean isPersistent()
{
return true;
@@ -1904,7 +1301,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
}
finally
{
- closeConnection(conn);
+ JdbcUtils.closeConnection(conn, AbstractJDBCMessageStore.this.getLogger());
}
return StoreFuture.IMMEDIATE_FUTURE;
}
@@ -1927,7 +1324,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
{
storeMetaData(conn, _messageId, _metaData);
AbstractJDBCMessageStore.this.addContent(conn, _messageId,
- _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data));
+ _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data));
}
finally
{
@@ -1948,310 +1345,13 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
}
}
- protected void closeConnection(final Connection conn)
- {
- if(conn != null)
- {
- try
- {
- conn.close();
- }
- catch (SQLException e)
- {
- getLogger().error("Problem closing connection", e);
- }
- }
- }
-
- protected void closePreparedStatement(final PreparedStatement stmt)
- {
- if (stmt != null)
- {
- try
- {
- stmt.close();
- }
- catch(SQLException e)
- {
- getLogger().error("Problem closing prepared statement", e);
- }
- }
- }
-
+ @Override
public void addEventListener(EventListener eventListener, Event... events)
{
_eventManager.addEventListener(eventListener, events);
}
- private void insertConfiguredObject(ConfiguredObjectRecord configuredObject, final Connection conn) throws StoreException
- {
- try
- {
- PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
- try
- {
- stmt.setString(1, configuredObject.getId().toString());
- ResultSet rs = stmt.executeQuery();
- boolean exists;
- try
- {
- exists = rs.next();
-
- }
- finally
- {
- rs.close();
- }
- // If we don't have any data in the result set then we can add this configured object
- if (!exists)
- {
- PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS);
- try
- {
- insertStmt.setString(1, configuredObject.getId().toString());
- insertStmt.setString(2, configuredObject.getType());
- if(configuredObject.getAttributes() == null)
- {
- insertStmt.setNull(3, Types.BLOB);
- }
- else
- {
- final Map<String, Object> attributes = configuredObject.getAttributes();
- final ObjectMapper objectMapper = new ObjectMapper();
- objectMapper.registerModule(_module);
- byte[] attributesAsBytes = objectMapper.writeValueAsBytes(attributes);
-
- ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
- insertStmt.setBinaryStream(3, bis, attributesAsBytes.length);
- }
- insertStmt.execute();
- }
- finally
- {
- insertStmt.close();
- }
-
- writeHierarchy(configuredObject, conn);
- }
-
- }
- finally
- {
- stmt.close();
- }
-
- }
- catch (JsonMappingException e)
- {
- throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
- }
- catch (JsonGenerationException e)
- {
- throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
- }
- catch (IOException e)
- {
- throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
- }
- catch (SQLException e)
- {
- throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
- }
- }
-
@Override
- public UUID[] remove(ConfiguredObjectRecord... objects) throws StoreException
- {
- checkConfigurationStoreOpen();
-
- Collection<UUID> removed = new ArrayList<UUID>(objects.length);
- try
- {
-
- Connection conn = newAutoCommitConnection();
- try
- {
- for(ConfiguredObjectRecord record : objects)
- {
- if(removeConfiguredObject(record.getId(), conn) != 0)
- {
- removed.add(record.getId());
- }
- }
- }
- finally
- {
- conn.close();
- }
- }
- catch (SQLException e)
- {
- throw new StoreException("Error deleting of configured objects " + Arrays.asList(objects) + " from database: " + e.getMessage(), e);
- }
- return removed.toArray(new UUID[removed.size()]);
- }
-
- private int removeConfiguredObject(final UUID id, final Connection conn) throws SQLException
- {
- final int results;
- PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECTS);
- try
- {
- stmt.setString(1, id.toString());
- results = stmt.executeUpdate();
- }
- finally
- {
- stmt.close();
- }
- stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECT_HIERARCHY);
- try
- {
- stmt.setString(1, id.toString());
- stmt.executeUpdate();
- }
- finally
- {
- stmt.close();
- }
-
- return results;
- }
-
- public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException
- {
- checkConfigurationStoreOpen();
- try
- {
- Connection conn = newConnection();
- try
- {
- for(ConfiguredObjectRecord record : records)
- {
- updateConfiguredObject(record, createIfNecessary, conn);
- }
- conn.commit();
- }
- finally
- {
- conn.close();
- }
- }
- catch (SQLException e)
- {
- throw new StoreException("Error updating configured objects in database: " + e.getMessage(), e);
- }
- }
-
- private void updateConfiguredObject(ConfiguredObjectRecord configuredObject,
- boolean createIfNecessary,
- Connection conn)
- throws SQLException, StoreException
- {
- PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
- try
- {
- stmt.setString(1, configuredObject.getId().toString());
- ResultSet rs = stmt.executeQuery();
- try
- {
- final ObjectMapper objectMapper = new ObjectMapper();
- objectMapper.registerModule(_module);
- if (rs.next())
- {
- PreparedStatement stmt2 = conn.prepareStatement(UPDATE_CONFIGURED_OBJECTS);
- try
- {
- stmt2.setString(1, configuredObject.getType());
- if (configuredObject.getAttributes() != null)
- {
- byte[] attributesAsBytes = objectMapper.writeValueAsBytes(
- configuredObject.getAttributes());
- ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
- stmt2.setBinaryStream(2, bis, attributesAsBytes.length);
- }
- else
- {
- stmt2.setNull(2, Types.BLOB);
- }
- stmt2.setString(3, configuredObject.getId().toString());
- stmt2.execute();
- }
- finally
- {
- stmt2.close();
- }
- }
- else if(createIfNecessary)
- {
- PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS);
- try
- {
- insertStmt.setString(1, configuredObject.getId().toString());
- insertStmt.setString(2, configuredObject.getType());
- if(configuredObject.getAttributes() == null)
- {
- insertStmt.setNull(3, Types.BLOB);
- }
- else
- {
- final Map<String, Object> attributes = configuredObject.getAttributes();
- byte[] attributesAsBytes = objectMapper.writeValueAsBytes(attributes);
- ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
- insertStmt.setBinaryStream(3, bis, attributesAsBytes.length);
- }
- insertStmt.execute();
- }
- finally
- {
- insertStmt.close();
- }
- writeHierarchy(configuredObject, conn);
- }
- }
- finally
- {
- rs.close();
- }
- }
- catch (JsonMappingException e)
- {
- throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
- }
- catch (JsonGenerationException e)
- {
- throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
- }
- catch (IOException e)
- {
- throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
- }
- finally
- {
- stmt.close();
- }
- }
-
- private void writeHierarchy(final ConfiguredObjectRecord configuredObject, final Connection conn) throws SQLException, StoreException
- {
- PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECT_HIERARCHY);
- try
- {
- for(Map.Entry<String,ConfiguredObjectRecord> parentEntry : configuredObject.getParents().entrySet())
- {
- insertStmt.setString(1, configuredObject.getId().toString());
- insertStmt.setString(2, parentEntry.getKey());
- insertStmt.setString(3, parentEntry.getValue().getId().toString());
-
- insertStmt.execute();
- }
- }
- finally
- {
- insertStmt.close();
- }
- }
-
public void visitMessages(MessageHandler handler) throws StoreException
{
checkMessageStoreOpen();
@@ -2298,10 +1398,11 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
}
finally
{
- closeConnection(conn);
+ JdbcUtils.closeConnection(conn, getLogger());
}
}
+ @Override
public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
{
checkMessageStoreOpen();
@@ -2342,10 +1443,11 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
}
finally
{
- closeConnection(conn);
+ JdbcUtils.closeConnection(conn, getLogger());
}
}
+ @Override
public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
{
checkMessageStoreOpen();
@@ -2422,8 +1524,8 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
}
if (!handler.handle(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(),
- enqueues.toArray(new RecordImpl[enqueues.size()]),
- dequeues.toArray(new RecordImpl[dequeues.size()])))
+ enqueues.toArray(new RecordImpl[enqueues.size()]),
+ dequeues.toArray(new RecordImpl[dequeues.size()])))
{
break;
}
@@ -2437,13 +1539,10 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
}
finally
{
- closeConnection(conn);
+ JdbcUtils.closeConnection(conn, getLogger());
}
}
-
- protected abstract String getBlobAsString(ResultSet rs, int col) throws SQLException;
-
protected abstract void storedSizeChange(int storeSizeIncrease);
@Override
@@ -2456,7 +1555,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
try
{
List<String> tables = new ArrayList<String>();
- tables.addAll(CONFIGURATION_STORE_TABLE_NAMES);
tables.addAll(MESSAGE_STORE_TABLE_NAMES);
for (String tableName : tables)
@@ -2488,57 +1586,4 @@ abstract public class AbstractJDBCMessageStore implements MessageStoreProvider,
}
- private static final class ConfiguredObjectRecordImpl implements ConfiguredObjectRecord
- {
-
- private final UUID _id;
- private final String _type;
- private final Map<String, Object> _attributes;
- private final Map<String, ConfiguredObjectRecord> _parents = new HashMap<String, ConfiguredObjectRecord>();
-
- private ConfiguredObjectRecordImpl(final UUID id,
- final String type,
- final Map<String, Object> attributes)
- {
- _id = id;
- _type = type;
- _attributes = Collections.unmodifiableMap(attributes);
- }
-
- @Override
- public UUID getId()
- {
- return _id;
- }
-
- @Override
- public String getType()
- {
- return _type;
- }
-
- private void addParent(String parentType, ConfiguredObjectRecord parent)
- {
- _parents.put(parentType, parent);
- }
-
- @Override
- public Map<String, Object> getAttributes()
- {
- return _attributes;
- }
-
- @Override
- public Map<String, ConfiguredObjectRecord> getParents()
- {
- return Collections.unmodifiableMap(_parents);
- }
-
- @Override
- public String toString()
- {
- return "ConfiguredObjectRecordImpl [_id=" + _id + ", _type=" + _type + ", _attributes=" + _attributes + ", _parents="
- + _parents + "]";
- }
- }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JdbcUtils.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JdbcUtils.java
new file mode 100644
index 0000000000..a26e478a50
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JdbcUtils.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.store;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+
+import org.apache.log4j.Logger;
+
+public class JdbcUtils
+{
+ public static void closeConnection(final Connection conn, final Logger logger)
+ {
+ if(conn != null)
+ {
+ try
+ {
+ conn.close();
+ }
+ catch (SQLException e)
+ {
+ logger.error("Problem closing connection", e);
+ }
+ }
+ }
+
+ public static void closePreparedStatement(final PreparedStatement stmt, final Logger logger)
+ {
+ if (stmt != null)
+ {
+ try
+ {
+ stmt.close();
+ }
+ catch(SQLException e)
+ {
+ logger.error("Problem closing prepared statement", e);
+ }
+ }
+ }
+
+ public static boolean tableExists(final String tableName, final Connection conn) throws SQLException
+ {
+ DatabaseMetaData metaData = conn.getMetaData();
+ ResultSet rs = metaData.getTables(null, null, "%", null);
+
+ try
+ {
+
+ while(rs.next())
+ {
+ final String table = rs.getString(3);
+ if(tableName.equalsIgnoreCase(table))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+}
diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java
new file mode 100644
index 0000000000..0a121ad476
--- /dev/null
+++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java
@@ -0,0 +1,337 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.store.derby;
+
+
+import java.sql.CallableStatement;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.store.AbstractJDBCMessageStore;
+import org.apache.qpid.server.store.Event;
+import org.apache.qpid.server.store.JdbcUtils;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreException;
+
+public abstract class AbstractDerbyMessageStore extends AbstractJDBCMessageStore
+{
+ private final AtomicBoolean _messageStoreOpen = new AtomicBoolean(false);
+
+ private long _persistentSizeLowThreshold;
+ private long _persistentSizeHighThreshold;
+ private long _totalStoreSize;
+ private boolean _limitBusted;
+
+ private ConfiguredObject<?> _parent;
+
+ @Override
+ public final void openMessageStore(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings)
+ {
+ if (_messageStoreOpen.compareAndSet(false, true))
+ {
+ _parent = parent;
+
+ DerbyUtils.loadDerbyDriver();
+
+ doOpen(parent, messageStoreSettings);
+
+ Object overfullAttr = messageStoreSettings.get(MessageStore.OVERFULL_SIZE);
+ Object underfullAttr = messageStoreSettings.get(MessageStore.UNDERFULL_SIZE);
+
+ _persistentSizeHighThreshold = overfullAttr == null ? -1l :
+ overfullAttr instanceof Number
+ ? ((Number) overfullAttr).longValue()
+ : Long.parseLong(overfullAttr.toString());
+ _persistentSizeLowThreshold = underfullAttr == null ? _persistentSizeHighThreshold :
+ underfullAttr instanceof Number
+ ? ((Number) underfullAttr).longValue()
+ : Long.parseLong(underfullAttr.toString());
+
+ if (_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l)
+ {
+ _persistentSizeLowThreshold = _persistentSizeHighThreshold;
+ }
+
+
+ createOrOpenMessageStoreDatabase();
+ setInitialSize();
+ setMaximumMessageId();
+ }
+ }
+
+ protected abstract void doOpen(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings);
+
+ @Override
+ public final void upgradeStoreStructure() throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ upgrade(_parent);
+ }
+
+ @Override
+ public final void closeMessageStore()
+ {
+ if (_messageStoreOpen.compareAndSet(true, false))
+ {
+ doClose();
+ }
+ }
+
+ protected abstract void doClose();
+
+ @Override
+ protected boolean isMessageStoreOpen()
+ {
+ return _messageStoreOpen.get();
+ }
+
+ @Override
+ protected void checkMessageStoreOpen()
+ {
+ if (!_messageStoreOpen.get())
+ {
+ throw new IllegalStateException("Message store is not open");
+ }
+ }
+
+ @Override
+ protected String getSqlBlobType()
+ {
+ return "blob";
+ }
+
+ @Override
+ protected String getSqlVarBinaryType(int size)
+ {
+ return "varchar("+size+") for bit data";
+ }
+
+ @Override
+ protected String getSqlBigIntType()
+ {
+ return "bigint";
+ }
+
+ @Override
+ protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException
+ {
+ return DerbyUtils.getBlobAsBytes(rs, col);
+ }
+
+ @Override
+ protected boolean tableExists(final String tableName, final Connection conn) throws SQLException
+ {
+ return DerbyUtils.tableExists(tableName, conn);
+ }
+
+ @Override
+ protected void storedSizeChange(final int delta)
+ {
+ if(getPersistentSizeHighThreshold() > 0)
+ {
+ synchronized(this)
+ {
+ // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every
+ // time, so we do so only when there's been enough change that it is worth looking again. We do this by
+ // assuming the total size will change by less than twice the amount of the message data change.
+ long newSize = _totalStoreSize += 3*delta;
+
+ Connection conn = null;
+ try
+ {
+
+ if(!_limitBusted && newSize > getPersistentSizeHighThreshold())
+ {
+ conn = newAutoCommitConnection();
+ _totalStoreSize = getSizeOnDisk(conn);
+ if(_totalStoreSize > getPersistentSizeHighThreshold())
+ {
+ _limitBusted = true;
+ _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
+ }
+ }
+ else if(_limitBusted && newSize < getPersistentSizeLowThreshold())
+ {
+ long oldSize = _totalStoreSize;
+ conn = newAutoCommitConnection();
+ _totalStoreSize = getSizeOnDisk(conn);
+ if(oldSize <= _totalStoreSize)
+ {
+
+ reduceSizeOnDisk(conn);
+
+ _totalStoreSize = getSizeOnDisk(conn);
+ }
+
+ if(_totalStoreSize < getPersistentSizeLowThreshold())
+ {
+ _limitBusted = false;
+ _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
+ }
+
+
+ }
+ }
+ catch (SQLException e)
+ {
+ JdbcUtils.closeConnection(conn, getLogger());
+ throw new StoreException("Exception while processing store size change", e);
+ }
+ }
+ }
+ }
+
+ private void setInitialSize()
+ {
+ Connection conn = null;
+ try
+ {
+ conn = newAutoCommitConnection();
+ _totalStoreSize = getSizeOnDisk(conn);
+ }
+ catch (SQLException e)
+ {
+ getLogger().error("Unable to set initial store size", e);
+ }
+ finally
+ {
+ JdbcUtils.closeConnection(conn, getLogger());
+ }
+ }
+
+ private long getSizeOnDisk(Connection conn)
+ {
+ PreparedStatement stmt = null;
+ try
+ {
+ String sizeQuery = "SELECT SUM(T2.NUMALLOCATEDPAGES * T2.PAGESIZE) TOTALSIZE" +
+ " FROM " +
+ " SYS.SYSTABLES systabs," +
+ " TABLE (SYSCS_DIAG.SPACE_TABLE(systabs.tablename)) AS T2" +
+ " WHERE systabs.tabletype = 'T'";
+
+ stmt = conn.prepareStatement(sizeQuery);
+
+ ResultSet rs = null;
+ long size = 0l;
+
+ try
+ {
+ rs = stmt.executeQuery();
+ while(rs.next())
+ {
+ size = rs.getLong(1);
+ }
+ }
+ finally
+ {
+ if(rs != null)
+ {
+ rs.close();
+ }
+ }
+
+ return size;
+
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Error establishing on disk size", e);
+ }
+ finally
+ {
+ JdbcUtils.closePreparedStatement(stmt, getLogger());
+ }
+ }
+
+ private void reduceSizeOnDisk(Connection conn)
+ {
+ CallableStatement cs = null;
+ PreparedStatement stmt = null;
+ try
+ {
+ String tableQuery =
+ "SELECT S.SCHEMANAME, T.TABLENAME FROM SYS.SYSSCHEMAS S, SYS.SYSTABLES T WHERE S.SCHEMAID = T.SCHEMAID AND T.TABLETYPE='T'";
+ stmt = conn.prepareStatement(tableQuery);
+ ResultSet rs = null;
+
+ List<String> schemas = new ArrayList<String>();
+ List<String> tables = new ArrayList<String>();
+
+ try
+ {
+ rs = stmt.executeQuery();
+ while(rs.next())
+ {
+ schemas.add(rs.getString(1));
+ tables.add(rs.getString(2));
+ }
+ }
+ finally
+ {
+ if(rs != null)
+ {
+ rs.close();
+ }
+ }
+
+
+ cs = conn.prepareCall
+ ("CALL SYSCS_UTIL.SYSCS_COMPRESS_TABLE(?, ?, ?)");
+
+ for(int i = 0; i < schemas.size(); i++)
+ {
+ cs.setString(1, schemas.get(i));
+ cs.setString(2, tables.get(i));
+ cs.setShort(3, (short) 0);
+ cs.execute();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Error reducing on disk size", e);
+ }
+ finally
+ {
+ JdbcUtils.closePreparedStatement(stmt, getLogger());
+ JdbcUtils.closePreparedStatement(cs, getLogger());
+ }
+ }
+
+ private long getPersistentSizeLowThreshold()
+ {
+ return _persistentSizeLowThreshold;
+ }
+
+ private long getPersistentSizeHighThreshold()
+ {
+ return _persistentSizeHighThreshold;
+ }
+
+}
diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyConfigurationStore.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyConfigurationStore.java
new file mode 100644
index 0000000000..540e92fac7
--- /dev/null
+++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyConfigurationStore.java
@@ -0,0 +1,221 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.store.derby;
+
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.store.AbstractJDBCConfigurationStore;
+import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreProvider;
+import org.apache.qpid.server.store.StoreException;
+
+/**
+ * Implementation of a DurableConfigurationStore backed by Apache Derby
+ * that also provides a MessageStore.
+ */
+public class DerbyConfigurationStore extends AbstractJDBCConfigurationStore
+ implements MessageStoreProvider, DurableConfigurationStore
+{
+ private static final Logger LOGGER = Logger.getLogger(DerbyConfigurationStore.class);
+
+ private final AtomicBoolean _configurationStoreOpen = new AtomicBoolean();
+ private final MessageStoreWrapper _messageStore = new MessageStoreWrapper();
+
+ private String _connectionURL;
+ private String _storeLocation;
+
+ private ConfiguredObject<?> _parent;
+
+ @Override
+ public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings)
+ throws StoreException
+ {
+ if (_configurationStoreOpen.compareAndSet(false, true))
+ {
+ _parent = parent;
+ DerbyUtils.loadDerbyDriver();
+
+ String databasePath = (String) storeSettings.get(MessageStore.STORE_PATH);
+
+ _storeLocation = databasePath;
+ _connectionURL = DerbyUtils.createConnectionUrl(parent.getName(), databasePath);
+
+ createOrOpenConfigurationStoreDatabase();
+ }
+ }
+
+ @Override
+ public void upgradeStoreStructure() throws StoreException
+ {
+ checkConfigurationStoreOpen();
+ upgradeIfNecessary(_parent);
+ }
+
+ @Override
+ protected Connection getConnection() throws SQLException
+ {
+ checkConfigurationStoreOpen();
+ return DriverManager.getConnection(_connectionURL);
+ }
+
+ @Override
+ public void closeConfigurationStore() throws StoreException
+ {
+ if (_messageStore.isMessageStoreOpen())
+ {
+ throw new IllegalStateException("Cannot close the store as the provided message store is still open");
+ }
+
+ if (_configurationStoreOpen.compareAndSet(true, false))
+ {
+ try
+ {
+ DerbyUtils.shutdownDatabase(_connectionURL);
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Error closing configuration store", e);
+ }
+ }
+ }
+
+ @Override
+ protected String getSqlBlobType()
+ {
+ return "blob";
+ }
+
+ @Override
+ protected String getSqlVarBinaryType(int size)
+ {
+ return "varchar("+size+") for bit data";
+ }
+
+ @Override
+ protected String getSqlBigIntType()
+ {
+ return "bigint";
+ }
+
+ @Override
+ protected String getBlobAsString(ResultSet rs, int col) throws SQLException
+ {
+ return DerbyUtils.getBlobAsString(rs, col);
+ }
+
+ @Override
+ public void onDelete()
+ {
+ if (_messageStore.isMessageStoreOpen())
+ {
+ throw new IllegalStateException("Cannot delete the store as the provided message store is still open");
+ }
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Deleting store " + _storeLocation);
+ }
+
+ try
+ {
+ DerbyUtils.deleteDatabaseLocation(_storeLocation);
+ }
+ catch (StoreException se)
+ {
+ LOGGER.debug("Failed to delete the store at location " + _storeLocation);
+ }
+ finally
+ {
+ _storeLocation = null;
+ }
+ }
+
+ @Override
+ public MessageStore getMessageStore()
+ {
+ return _messageStore;
+ }
+
+ @Override
+ protected boolean tableExists(final String tableName, final Connection conn) throws SQLException
+ {
+ return DerbyUtils.tableExists(tableName, conn);
+ }
+
+ @Override
+ protected void checkConfigurationStoreOpen()
+ {
+ if (!_configurationStoreOpen.get())
+ {
+ throw new IllegalStateException("Configuration store is not open");
+ }
+ }
+
+ @Override
+ protected Logger getLogger()
+ {
+ return LOGGER;
+ }
+
+ private class MessageStoreWrapper extends AbstractDerbyMessageStore
+ {
+ @Override
+ protected void doOpen(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings)
+ {
+ // Nothing to do, store provided by DerbyConfigurationStore
+ }
+
+ @Override
+ protected Connection getConnection() throws SQLException
+ {
+ checkMessageStoreOpen();
+ return DerbyConfigurationStore.this.getConnection();
+ }
+
+ @Override
+ protected void doClose()
+ {
+ // Nothing to do, store provided by DerbyConfigurationStore
+ }
+
+ @Override
+ public String getStoreLocation()
+ {
+ return DerbyConfigurationStore.this._storeLocation;
+ }
+
+ @Override
+ protected Logger getLogger()
+ {
+ return DerbyConfigurationStore.this.getLogger();
+ }
+ }
+}
diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
index 6f87e81ba1..9e2a2f63d4 100644
--- a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
+++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
@@ -21,522 +21,96 @@
package org.apache.qpid.server.store.derby;
-import java.io.File;
-import java.sql.Blob;
-import java.sql.CallableStatement;
import java.sql.Connection;
-import java.sql.Driver;
import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.store.AbstractJDBCMessageStore;
-import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.server.store.Event;
-import org.apache.qpid.server.store.EventListener;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreProvider;
-import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.Transaction;
-import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
-import org.apache.qpid.server.store.handler.MessageHandler;
-import org.apache.qpid.server.store.handler.MessageInstanceHandler;
-import org.apache.qpid.util.FileUtils;
/**
- * An implementation of a {@link MessageStore} that uses Apache Derby as the persistence
- * mechanism.
- *
+ * Implementation of a MessageStore backed by Apache Derby.
*/
-public class DerbyMessageStore extends AbstractJDBCMessageStore implements MessageStoreProvider,
- DurableConfigurationStore
+public class DerbyMessageStore extends AbstractDerbyMessageStore
{
+ private static final Logger LOGGER = Logger.getLogger(DerbyMessageStore.class);
- private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class);
-
- private static final String SQL_DRIVER_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
-
- public static final String MEMORY_STORE_LOCATION = ":memory:";
-
- private static final String TABLE_EXISTANCE_QUERY = "SELECT 1 FROM SYS.SYSTABLES WHERE TABLENAME = ?";
-
- public static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006";
-
- public static final String TYPE = "DERBY";
-
- private long _totalStoreSize;
- private boolean _limitBusted;
- private long _persistentSizeLowThreshold;
- private long _persistentSizeHighThreshold;
-
- protected String _connectionURL;
-
+ private String _connectionURL;
private String _storeLocation;
- private Class<Driver> _driverClass;
-
- private final MessageStore _messageStoreFacade = new MessageStoreWrapper();
-
- public DerbyMessageStore()
- {
- }
-
- protected Logger getLogger()
- {
- return _logger;
- }
@Override
- protected String getSqlBlobType()
+ protected void doOpen(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings)
{
- return "blob";
- }
+ String databasePath = (String) messageStoreSettings.get(MessageStore.STORE_PATH);
+ String name = parent.getName();
- @Override
- protected String getSqlVarBinaryType(int size)
- {
- return "varchar("+size+") for bit data";
+ _storeLocation = databasePath;
+ _connectionURL = DerbyUtils.createConnectionUrl(name, databasePath);
}
@Override
- protected String getSqlBigIntType()
+ protected Connection getConnection() throws SQLException
{
- return "bigint";
+ checkMessageStoreOpen();
+ return DriverManager.getConnection(_connectionURL);
}
+ @Override
protected void doClose()
{
try
{
- Connection conn = DriverManager.getConnection(_connectionURL + ";shutdown=true");
- // Shouldn't reach this point - shutdown=true should throw SQLException
- conn.close();
- getLogger().error("Unable to shut down the store");
+ DerbyUtils.shutdownDatabase(_connectionURL);
}
catch (SQLException e)
{
- if (e.getSQLState().equalsIgnoreCase(DerbyMessageStore.DERBY_SINGLE_DB_SHUTDOWN_CODE))
- {
- //expected and represents a clean shutdown of this database only, do nothing.
- }
- else
- {
- getLogger().error("Exception whilst shutting down the store: " + e);
- throw new StoreException("Error closing message store", e);
- }
+ throw new StoreException("Error closing configuration store", e);
}
}
@Override
- protected void implementationSpecificConfiguration(String name, Map<String, Object> messageStoreSettings)
- throws ClassNotFoundException
- {
- //Update to pick up QPID_WORK and use that as the default location not just derbyDB
- _driverClass = (Class<Driver>) Class.forName(SQL_DRIVER_NAME);
-
- String databasePath = (String) messageStoreSettings.get(MessageStore.STORE_PATH);;
-
- if(databasePath == null)
- {
- databasePath = System.getProperty("QPID_WORK") + File.separator + "derbyDB";
- }
-
- if(!MEMORY_STORE_LOCATION.equals(databasePath))
- {
- File environmentPath = new File(databasePath);
- if (!environmentPath.exists())
- {
- if (!environmentPath.mkdirs())
- {
- throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. "
- + "Ensure the path is correct and that the permissions are correct.");
- }
- }
- }
-
- _storeLocation = databasePath;
-
- Object overfullAttr = messageStoreSettings.get(MessageStore.OVERFULL_SIZE);
- Object underfullAttr = messageStoreSettings.get(MessageStore.UNDERFULL_SIZE);
-
- _persistentSizeHighThreshold = overfullAttr == null ? -1l :
- overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString());
- _persistentSizeLowThreshold = underfullAttr == null ? _persistentSizeHighThreshold :
- underfullAttr instanceof Number ? ((Number) underfullAttr).longValue() : Long.parseLong(underfullAttr.toString());
-
- if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l)
- {
- _persistentSizeLowThreshold = _persistentSizeHighThreshold;
- }
-
- //FIXME this the _vhost name should not be added here, but derby wont use an empty directory as was possibly just created.
- _connectionURL = "jdbc:derby" + (databasePath.equals(MEMORY_STORE_LOCATION) ? databasePath: ":" + databasePath+ "/") + name + ";create=true";
-
- setInitialSize();
-
- }
-
- private void setInitialSize()
- {
- Connection conn = null;
- try
- {
-
-
- try
- {
- conn = newAutoCommitConnection();
- _totalStoreSize = getSizeOnDisk(conn);
- }
- finally
- {
- if(conn != null)
- {
- conn.close();
-
-
- }
- }
- }
- catch (SQLException e)
- {
- getLogger().error("Unable to set initial store size", e);
- }
- }
-
- protected String getBlobAsString(ResultSet rs, int col) throws SQLException
- {
- Blob blob = rs.getBlob(col);
- if(blob == null)
- {
- return null;
- }
- byte[] bytes = blob.getBytes(1, (int)blob.length());
- return new String(bytes, UTF8_CHARSET);
- }
-
- protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException
- {
- Blob dataAsBlob = rs.getBlob(col);
- return dataAsBlob.getBytes(1,(int) dataAsBlob.length());
- }
-
-
- protected boolean tableExists(final String tableName, final Connection conn) throws SQLException
- {
- PreparedStatement stmt = conn.prepareStatement(TABLE_EXISTANCE_QUERY);
- try
- {
- stmt.setString(1, tableName);
- ResultSet rs = stmt.executeQuery();
- try
- {
- return rs.next();
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
- }
-
- public String getStoreLocation()
- {
- return _storeLocation;
- }
-
- protected synchronized void storedSizeChange(final int delta)
+ public void onDelete()
{
- if(getPersistentSizeHighThreshold() > 0)
+ if (isMessageStoreOpen())
{
- synchronized(this)
- {
- // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every
- // time, so we do so only when there's been enough change that it is worth looking again. We do this by
- // assuming the total size will change by less than twice the amount of the message data change.
- long newSize = _totalStoreSize += 3*delta;
-
- Connection conn = null;
- try
- {
-
- if(!_limitBusted && newSize > getPersistentSizeHighThreshold())
- {
- conn = newAutoCommitConnection();
- _totalStoreSize = getSizeOnDisk(conn);
- if(_totalStoreSize > getPersistentSizeHighThreshold())
- {
- _limitBusted = true;
- _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
- }
- }
- else if(_limitBusted && newSize < getPersistentSizeLowThreshold())
- {
- long oldSize = _totalStoreSize;
- conn = newAutoCommitConnection();
- _totalStoreSize = getSizeOnDisk(conn);
- if(oldSize <= _totalStoreSize)
- {
-
- reduceSizeOnDisk(conn);
-
- _totalStoreSize = getSizeOnDisk(conn);
- }
-
- if(_totalStoreSize < getPersistentSizeLowThreshold())
- {
- _limitBusted = false;
- _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
- }
-
-
- }
- }
- catch (SQLException e)
- {
- closeConnection(conn);
- throw new StoreException("Exception while processing store size change", e);
- }
- }
+ throw new IllegalStateException("Cannot delete the store as the message store is still open");
}
- }
-
- private void reduceSizeOnDisk(Connection conn)
- {
- CallableStatement cs = null;
- PreparedStatement stmt = null;
- try
- {
- String tableQuery =
- "SELECT S.SCHEMANAME, T.TABLENAME FROM SYS.SYSSCHEMAS S, SYS.SYSTABLES T WHERE S.SCHEMAID = T.SCHEMAID AND T.TABLETYPE='T'";
- stmt = conn.prepareStatement(tableQuery);
- ResultSet rs = null;
- List<String> schemas = new ArrayList<String>();
- List<String> tables = new ArrayList<String>();
-
- try
- {
- rs = stmt.executeQuery();
- while(rs.next())
- {
- schemas.add(rs.getString(1));
- tables.add(rs.getString(2));
- }
- }
- finally
- {
- if(rs != null)
- {
- rs.close();
- }
- }
-
-
- cs = conn.prepareCall
- ("CALL SYSCS_UTIL.SYSCS_COMPRESS_TABLE(?, ?, ?)");
-
- for(int i = 0; i < schemas.size(); i++)
- {
- cs.setString(1, schemas.get(i));
- cs.setString(2, tables.get(i));
- cs.setShort(3, (short) 0);
- cs.execute();
- }
- }
- catch (SQLException e)
- {
- closeConnection(conn);
- throw new StoreException("Error reducing on disk size", e);
- }
- finally
+ if (LOGGER.isDebugEnabled())
{
- closePreparedStatement(stmt);
- closePreparedStatement(cs);
+ LOGGER.debug("Deleting store " + _storeLocation);
}
- }
-
- private long getSizeOnDisk(Connection conn)
- {
- PreparedStatement stmt = null;
try
{
- String sizeQuery = "SELECT SUM(T2.NUMALLOCATEDPAGES * T2.PAGESIZE) TOTALSIZE" +
- " FROM " +
- " SYS.SYSTABLES systabs," +
- " TABLE (SYSCS_DIAG.SPACE_TABLE(systabs.tablename)) AS T2" +
- " WHERE systabs.tabletype = 'T'";
-
- stmt = conn.prepareStatement(sizeQuery);
-
- ResultSet rs = null;
- long size = 0l;
-
- try
- {
- rs = stmt.executeQuery();
- while(rs.next())
- {
- size = rs.getLong(1);
- }
- }
- finally
- {
- if(rs != null)
- {
- rs.close();
- }
- }
-
- return size;
-
+ DerbyUtils.deleteDatabaseLocation(_storeLocation);
}
- catch (SQLException e)
+ catch (StoreException se)
{
- closeConnection(conn);
- throw new StoreException("Error establishing on disk size", e);
+ LOGGER.debug("Failed to delete the store at location " + _storeLocation);
}
finally
{
- closePreparedStatement(stmt);
+ _storeLocation = null;
}
-
- }
-
-
- private long getPersistentSizeLowThreshold()
- {
- return _persistentSizeLowThreshold;
- }
-
- private long getPersistentSizeHighThreshold()
- {
- return _persistentSizeHighThreshold;
}
@Override
- public void onDelete()
+ protected Logger getLogger()
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Deleting store " + _storeLocation);
- }
-
- if (MEMORY_STORE_LOCATION.equals(_storeLocation))
- {
- return;
- }
-
- if (_storeLocation != null)
- {
- File location = new File(_storeLocation);
- if (location.exists())
- {
- if (!FileUtils.delete(location, true))
- {
- _logger.error("Cannot delete " + _storeLocation);
- }
- }
- }
+ return LOGGER;
}
- protected Connection getConnection() throws SQLException
- {
- return DriverManager.getConnection(_connectionURL);
- }
@Override
- public MessageStore getMessageStore()
+ public String getStoreLocation()
{
- return _messageStoreFacade;
+ return _storeLocation;
}
- private class MessageStoreWrapper implements MessageStore
- {
-
- @Override
- public void openMessageStore(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings)
- {
- DerbyMessageStore.this.openMessageStore(parent, messageStoreSettings);
- }
-
- @Override
- public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(final T metaData)
- {
- return DerbyMessageStore.this.addMessage(metaData);
- }
-
- @Override
- public boolean isPersistent()
- {
- return DerbyMessageStore.this.isPersistent();
- }
-
- @Override
- public Transaction newTransaction()
- {
- return DerbyMessageStore.this.newTransaction();
- }
-
- @Override
- public void closeMessageStore()
- {
- DerbyMessageStore.this.closeMessageStore();
- }
-
- @Override
- public void addEventListener(final EventListener eventListener, final Event... events)
- {
- DerbyMessageStore.this.addEventListener(eventListener, events);
- }
-
- @Override
- public void upgradeStoreStructure() throws StoreException
- {
- }
-
- @Override
- public String getStoreLocation()
- {
- return DerbyMessageStore.this.getStoreLocation();
- }
-
- @Override
- public void onDelete()
- {
- DerbyMessageStore.this.onDelete();
- }
-
- @Override
- public void visitMessages(final MessageHandler handler) throws StoreException
- {
- DerbyMessageStore.this.visitMessages(handler);
- }
-
- @Override
- public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException
- {
- DerbyMessageStore.this.visitMessageInstances(handler);
- }
-
- @Override
- public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException
- {
- DerbyMessageStore.this.visitDistributedTransactions(handler);
- }
- }
}
diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyUtils.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyUtils.java
new file mode 100644
index 0000000000..b0f4a137f2
--- /dev/null
+++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyUtils.java
@@ -0,0 +1,165 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.store.derby;
+
+
+import java.io.File;
+import java.nio.charset.Charset;
+import java.sql.Blob;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.util.FileUtils;
+
+public class DerbyUtils
+{
+ public static final String MEMORY_STORE_LOCATION = ":memory:";
+ public static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006";
+ private static final String SQL_DRIVER_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
+ private static final String TABLE_EXISTENCE_QUERY = "SELECT 1 FROM SYS.SYSTABLES WHERE TABLENAME = ?";
+ private static final Charset UTF8_CHARSET = Charset.forName("UTF-8");
+
+
+ public static void loadDerbyDriver()
+ {
+ try
+ {
+ Class<Driver> driverClass = (Class<Driver>) Class.forName(SQL_DRIVER_NAME);
+ }
+ catch (ClassNotFoundException e)
+ {
+ throw new StoreException("Failed to load driver " + SQL_DRIVER_NAME, e);
+ }
+ }
+
+ public static String createConnectionUrl(final String name, final String databasePath)
+ {
+ // Derby wont use an existing directory, so we append parent name
+ if (MEMORY_STORE_LOCATION.equals(databasePath))
+ {
+ return "jdbc:derby:" + MEMORY_STORE_LOCATION + "/" + name + ";create=true";
+ }
+ else
+ {
+ File environmentPath = new File(databasePath);
+ if (!environmentPath.exists())
+ {
+ if (!environmentPath.mkdirs())
+ {
+ throw new IllegalArgumentException("Environment path "
+ + environmentPath
+ + " could not be read or created. "
+ + "Ensure the path is correct and that the permissions are correct.");
+ }
+ }
+ return "jdbc:derby:" + databasePath + "/" + name + ";create=true";
+ }
+
+ }
+
+ public static void shutdownDatabase(String connectionURL) throws SQLException
+ {
+ try
+ {
+ Connection conn = DriverManager.getConnection(connectionURL + ";shutdown=true");
+ // Shouldn't reach this point - shutdown=true should throw SQLException
+ conn.close();
+ }
+ catch (SQLException e)
+ {
+ if (e.getSQLState().equalsIgnoreCase(DerbyUtils.DERBY_SINGLE_DB_SHUTDOWN_CODE))
+ {
+ //expected and represents a clean shutdown of this database only, do nothing.
+ }
+ else
+ {
+ throw e;
+ }
+ }
+ }
+
+ public static void deleteDatabaseLocation(String storeLocation)
+ {
+ if (MEMORY_STORE_LOCATION.equals(storeLocation))
+ {
+ return;
+ }
+
+ if (storeLocation != null)
+ {
+ File location = new File(storeLocation);
+ if (location.exists())
+ {
+ if (!FileUtils.delete(location, true))
+ {
+ throw new StoreException("Failed to delete the store at location : " + storeLocation);
+ }
+ }
+ }
+ }
+
+ public static String getBlobAsString(ResultSet rs, int col) throws SQLException
+ {
+ Blob blob = rs.getBlob(col);
+ if(blob == null)
+ {
+ return null;
+ }
+ byte[] bytes = blob.getBytes(1, (int) blob.length());
+ return new String(bytes, UTF8_CHARSET);
+ }
+
+ protected static byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException
+ {
+ Blob dataAsBlob = rs.getBlob(col);
+ return dataAsBlob.getBytes(1,(int) dataAsBlob.length());
+ }
+
+ public static boolean tableExists(final String tableName, final Connection conn) throws SQLException
+ {
+ PreparedStatement stmt = conn.prepareStatement(TABLE_EXISTENCE_QUERY);
+ try
+ {
+ stmt.setString(1, tableName);
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+ return rs.next();
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+
+
+}
+
diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhost/derby/DerbyVirtualHost.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhost/derby/DerbyVirtualHost.java
index 31c3f7c944..fc67d2fa50 100644
--- a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhost/derby/DerbyVirtualHost.java
+++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhost/derby/DerbyVirtualHost.java
@@ -45,7 +45,7 @@ public class DerbyVirtualHost extends AbstractVirtualHost<DerbyVirtualHost>
@Override
protected MessageStore createMessageStore()
{
- return new DerbyMessageStore().getMessageStore();
+ return new DerbyMessageStore();
}
}
diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhostnode/derby/DerbyVirtualHostNode.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhostnode/derby/DerbyVirtualHostNode.java
index ffc19832fa..340d046033 100644
--- a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhostnode/derby/DerbyVirtualHostNode.java
+++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhostnode/derby/DerbyVirtualHostNode.java
@@ -28,7 +28,7 @@ import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.ManagedObject;
import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.server.store.derby.DerbyMessageStore;
+import org.apache.qpid.server.store.derby.DerbyConfigurationStore;
import org.apache.qpid.server.virtualhostnode.AbstractStandardVirtualHostNode;
import org.apache.qpid.server.virtualhostnode.FileBasedVirtualHostNode;
@@ -49,7 +49,7 @@ public class DerbyVirtualHostNode extends AbstractStandardVirtualHostNode<DerbyV
@Override
protected DurableConfigurationStore createConfigurationStore()
{
- return new DerbyMessageStore();
+ return new DerbyConfigurationStore();
}
@Override
diff --git a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreConfigurationTest.java b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreConfigurationTest.java
index aaf65e9ee0..08c421b606 100644
--- a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreConfigurationTest.java
+++ b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreConfigurationTest.java
@@ -26,9 +26,9 @@ public class DerbyMessageStoreConfigurationTest extends AbstractDurableConfigura
{
@Override
- protected DerbyMessageStore createConfigStore() throws Exception
+ protected DerbyConfigurationStore createConfigStore() throws Exception
{
- return new DerbyMessageStore();
+ return new DerbyConfigurationStore();
}
}
diff --git a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java
index 1d35b9ef83..ba7ae26292 100644
--- a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java
+++ b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java
@@ -50,7 +50,7 @@ public class DerbyMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTes
@Override
protected MessageStore createStore() throws Exception
{
- return (new DerbyMessageStore()).getMessageStore();
+ return new DerbyMessageStore();
}
@Override
diff --git a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java
index 4594b7f223..9a2d945494 100644
--- a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java
+++ b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java
@@ -83,7 +83,7 @@ public class DerbyMessageStoreTest extends MessageStoreTestCase
@Override
protected MessageStore createMessageStore()
{
- return (new DerbyMessageStore()).getMessageStore();
+ return new DerbyMessageStore();
}
}
diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
new file mode 100644
index 0000000000..bd245fa4f4
--- /dev/null
+++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.jdbc;
+
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.Transaction;
+
+public abstract class GenericAbstractJDBCMessageStore extends org.apache.qpid.server.store.AbstractJDBCMessageStore
+{
+ private final AtomicBoolean _messageStoreOpen = new AtomicBoolean(false);
+ private final List<RecordedJDBCTransaction> _transactions = new CopyOnWriteArrayList<>();
+
+ private ConfiguredObject<?> _parent;
+
+ @Override
+ public final void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings)
+ {
+ if (_messageStoreOpen.compareAndSet(false, true))
+ {
+ _parent = parent;
+
+ doOpen(parent, storeSettings);
+
+ createOrOpenMessageStoreDatabase();
+ setMaximumMessageId();
+ }
+ }
+
+ protected abstract void doOpen(final ConfiguredObject<?> parent, final Map<String, Object> storeSettings)
+ throws StoreException;
+
+ @Override
+ public final void upgradeStoreStructure() throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ upgrade(_parent);
+ }
+
+ @Override
+ public final void closeMessageStore()
+ {
+ if (_messageStoreOpen.compareAndSet(true, false))
+ {
+ try
+ {
+ while(!_transactions.isEmpty())
+ {
+ RecordedJDBCTransaction txn = _transactions.get(0);
+ txn.abortTran();
+ }
+ }
+ finally
+ {
+ doClose();
+ }
+
+ }
+ }
+
+ protected abstract void doClose();
+
+ protected boolean isMessageStoreOpen()
+ {
+ return _messageStoreOpen.get();
+ }
+
+ @Override
+ protected void checkMessageStoreOpen()
+ {
+ if (!_messageStoreOpen.get())
+ {
+ throw new IllegalStateException("Message store is not open");
+ }
+ }
+
+ @Override
+ protected void storedSizeChange(int contentSize)
+ {
+ }
+
+ @Override
+ public Transaction newTransaction()
+ {
+ return new RecordedJDBCTransaction();
+ }
+
+
+ private class RecordedJDBCTransaction extends JDBCTransaction
+ {
+ private RecordedJDBCTransaction()
+ {
+ super();
+ GenericAbstractJDBCMessageStore.this._transactions.add(this);
+ }
+
+ @Override
+ public void commitTran()
+ {
+ try
+ {
+ super.commitTran();
+ }
+ finally
+ {
+ GenericAbstractJDBCMessageStore.this._transactions.remove(this);
+ }
+ }
+
+ @Override
+ public StoreFuture commitTranAsync()
+ {
+ try
+ {
+ return super.commitTranAsync();
+ }
+ finally
+ {
+ GenericAbstractJDBCMessageStore.this._transactions.remove(this);
+ }
+ }
+
+ @Override
+ public void abortTran()
+ {
+ try
+ {
+ super.abortTran();
+ }
+ finally
+ {
+ GenericAbstractJDBCMessageStore.this._transactions.remove(this);
+ }
+ }
+ }
+}
diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java
new file mode 100644
index 0000000000..7bafe5a859
--- /dev/null
+++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.jdbc;
+
+
+import java.nio.charset.Charset;
+import java.sql.Blob;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory;
+import org.apache.qpid.server.store.*;
+import org.apache.qpid.server.util.MapValueConverter;
+
+/**
+ * Implementation of a DurableConfigurationStore backed by Generic JDBC Database
+ * that also provides a MessageStore.
+ */
+public class GenericJDBCConfigurationStore extends AbstractJDBCConfigurationStore implements MessageStoreProvider
+{
+ private static final Charset UTF8_CHARSET = Charset.forName("UTF-8");
+
+ private static final Logger LOGGER = Logger.getLogger(GenericJDBCConfigurationStore.class);
+
+ public static final String CONNECTION_URL = "connectionUrl";
+ public static final String CONNECTION_POOL_TYPE = "connectionPoolType";
+ public static final String JDBC_BIG_INT_TYPE = "bigIntType";
+ public static final String JDBC_BYTES_FOR_BLOB = "bytesForBlob";
+ public static final String JDBC_VARBINARY_TYPE = "varbinaryType";
+ public static final String JDBC_BLOB_TYPE = "blobType";
+
+ private final AtomicBoolean _configurationStoreOpen = new AtomicBoolean();
+ private final MessageStore _messageStoreFacade = new MessageStoreWrapper();
+
+ protected String _connectionURL;
+ private ConnectionProvider _connectionProvider;
+
+ private String _blobType;
+ private String _varBinaryType;
+ private String _bigIntType;
+ private boolean _useBytesMethodsForBlob;
+
+ private ConfiguredObject<?> _parent;
+
+ @Override
+ public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings)
+ throws StoreException
+ {
+ if (_configurationStoreOpen.compareAndSet(false, true))
+ {
+ _parent = parent;
+ _connectionURL = String.valueOf(storeSettings.get(CONNECTION_URL));
+ Object poolAttribute = storeSettings.get(CONNECTION_POOL_TYPE);
+
+ JDBCDetails details = null;
+
+ String[] components = _connectionURL.split(":", 3);
+ if(components.length >= 2)
+ {
+ String vendor = components[1];
+ details = JDBCDetails.getDetails(vendor);
+ }
+
+ if(details == null)
+ {
+ getLogger().info("Do not recognize vendor from connection URL: " + _connectionURL);
+
+ details = JDBCDetails.getDefaultDetails();
+ }
+
+ String connectionPoolType = poolAttribute == null ? DefaultConnectionProviderFactory.TYPE : String.valueOf(poolAttribute);
+
+ JDBCConnectionProviderFactory connectionProviderFactory =
+ JDBCConnectionProviderFactory.FACTORIES.get(connectionPoolType);
+ if(connectionProviderFactory == null)
+ {
+ LOGGER.warn("Unknown connection pool type: "
+ + connectionPoolType
+ + ". no connection pooling will be used");
+ connectionProviderFactory = new DefaultConnectionProviderFactory();
+ }
+
+ try
+ {
+ _connectionProvider = connectionProviderFactory.getConnectionProvider(_connectionURL, storeSettings);
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Failed to create connection provider for " + _connectionURL);
+ }
+ _blobType = MapValueConverter.getStringAttribute(JDBC_BLOB_TYPE, storeSettings, details.getBlobType());
+ _varBinaryType = MapValueConverter.getStringAttribute(JDBC_VARBINARY_TYPE, storeSettings, details.getVarBinaryType());
+ _useBytesMethodsForBlob = MapValueConverter.getBooleanAttribute(JDBC_BYTES_FOR_BLOB, storeSettings, details.isUseBytesMethodsForBlob());
+ _bigIntType = MapValueConverter.getStringAttribute(JDBC_BIG_INT_TYPE,
+ storeSettings,
+ details.getBigintType());
+
+ createOrOpenConfigurationStoreDatabase();
+ }
+ }
+
+ @Override
+ public void upgradeStoreStructure() throws StoreException
+ {
+ checkConfigurationStoreOpen();
+ upgradeIfNecessary(_parent);
+ }
+
+ @Override
+ protected Connection getConnection() throws SQLException
+ {
+ return _connectionProvider.getConnection();
+ }
+
+ @Override
+ public void closeConfigurationStore() throws StoreException
+ {
+ if (_configurationStoreOpen.compareAndSet(true, false))
+ {
+ try
+ {
+ _connectionProvider.close();
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Unable to close connection provider ", e);
+ }
+ }
+ }
+
+ @Override
+ protected String getSqlBlobType()
+ {
+ return _blobType;
+ }
+
+ @Override
+ protected String getSqlVarBinaryType(int size)
+ {
+ return String.format(_varBinaryType, size);
+ }
+
+ @Override
+ public String getSqlBigIntType()
+ {
+ return _bigIntType;
+ }
+
+ @Override
+ protected String getBlobAsString(ResultSet rs, int col) throws SQLException
+ {
+ byte[] bytes;
+ if(_useBytesMethodsForBlob)
+ {
+ bytes = rs.getBytes(col);
+ return new String(bytes,UTF8_CHARSET);
+ }
+ else
+ {
+ Blob blob = rs.getBlob(col);
+ if(blob == null)
+ {
+ return null;
+ }
+ bytes = blob.getBytes(1, (int)blob.length());
+ }
+ return new String(bytes, UTF8_CHARSET);
+
+ }
+
+ protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException
+ {
+ if(_useBytesMethodsForBlob)
+ {
+ return rs.getBytes(col);
+ }
+ else
+ {
+ Blob dataAsBlob = rs.getBlob(col);
+ return dataAsBlob.getBytes(1,(int) dataAsBlob.length());
+
+ }
+ }
+
+ @Override
+ protected void checkConfigurationStoreOpen()
+ {
+ if (!_configurationStoreOpen.get())
+ {
+ throw new IllegalStateException("Configuration store is not open");
+ }
+ }
+
+ @Override
+ protected Logger getLogger()
+ {
+ return LOGGER;
+ }
+
+ @Override
+ public MessageStore getMessageStore()
+ {
+ return _messageStoreFacade;
+ }
+
+ private class MessageStoreWrapper extends GenericAbstractJDBCMessageStore
+ {
+ @Override
+ protected void doOpen(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings)
+ {
+ // Nothing to do, store provided by DerbyConfigurationStore
+ }
+
+ @Override
+ protected Connection getConnection() throws SQLException
+ {
+ return GenericJDBCConfigurationStore.this.getConnection();
+ }
+
+ @Override
+ protected void doClose()
+ {
+ // Nothing to do, store provided by DerbyConfigurationStore
+ }
+
+ @Override
+ public String getStoreLocation()
+ {
+ return GenericJDBCConfigurationStore.this._connectionURL;
+ }
+
+ @Override
+ protected Logger getLogger()
+ {
+ return GenericJDBCConfigurationStore.this.getLogger();
+ }
+
+ @Override
+ protected String getSqlBlobType()
+ {
+ return GenericJDBCConfigurationStore.this.getSqlBlobType();
+ }
+
+ @Override
+ protected String getSqlVarBinaryType(int size)
+ {
+ return GenericJDBCConfigurationStore.this.getSqlVarBinaryType(size);
+ }
+
+ @Override
+ protected String getSqlBigIntType()
+ {
+ return GenericJDBCConfigurationStore.this.getSqlBigIntType();
+ }
+
+ @Override
+ protected byte[] getBlobAsBytes(final ResultSet rs, final int col) throws SQLException
+ {
+ return GenericJDBCConfigurationStore.this.getBlobAsBytes(rs, col);
+ }
+ }
+
+
+}
diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java
new file mode 100644
index 0000000000..dad4432183
--- /dev/null
+++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java
@@ -0,0 +1,177 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.store.jdbc;
+
+
+import java.sql.Blob;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory;
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.util.MapValueConverter;
+
+/**
+ * Implementation of a MessageStore backed by a Generic JDBC Database.
+ */
+public class GenericJDBCMessageStore extends GenericAbstractJDBCMessageStore
+{
+
+ private static final Logger _logger = Logger.getLogger(GenericJDBCMessageStore.class);
+
+ public static final String TYPE = "JDBC";
+ public static final String CONNECTION_URL = "connectionUrl";
+ public static final String CONNECTION_POOL_TYPE = "connectionPoolType";
+ public static final String JDBC_BIG_INT_TYPE = "bigIntType";
+ public static final String JDBC_BYTES_FOR_BLOB = "bytesForBlob";
+ public static final String JDBC_VARBINARY_TYPE = "varbinaryType";
+ public static final String JDBC_BLOB_TYPE = "blobType";
+
+ protected String _connectionURL;
+ private ConnectionProvider _connectionProvider;
+
+ private String _blobType;
+ private String _varBinaryType;
+ private String _bigIntType;
+ private boolean _useBytesMethodsForBlob;
+
+
+ @Override
+ protected void doOpen(final ConfiguredObject<?> parent, final Map<String, Object> storeSettings) throws StoreException
+ {
+ _connectionURL = String.valueOf(storeSettings.get(CONNECTION_URL));
+
+ org.apache.qpid.server.store.jdbc.JDBCDetails details = null;
+
+ String[] components = _connectionURL.split(":", 3);
+ if(components.length >= 2)
+ {
+ String vendor = components[1];
+ details = org.apache.qpid.server.store.jdbc.JDBCDetails.getDetails(vendor);
+ }
+
+ if(details == null)
+ {
+ getLogger().info("Do not recognize vendor from connection URL: " + _connectionURL);
+
+ details = org.apache.qpid.server.store.jdbc.JDBCDetails.getDefaultDetails();
+ }
+
+
+ _blobType = MapValueConverter.getStringAttribute(JDBC_BLOB_TYPE, storeSettings, details.getBlobType());
+ _varBinaryType = MapValueConverter.getStringAttribute(JDBC_VARBINARY_TYPE, storeSettings, details.getVarBinaryType());
+ _useBytesMethodsForBlob = MapValueConverter.getBooleanAttribute(JDBC_BYTES_FOR_BLOB, storeSettings, details.isUseBytesMethodsForBlob());
+ _bigIntType = MapValueConverter.getStringAttribute(JDBC_BIG_INT_TYPE,
+ storeSettings,
+ details.getBigintType());
+
+ Object poolAttribute = storeSettings.get(CONNECTION_POOL_TYPE);
+ String connectionPoolType = poolAttribute == null ? DefaultConnectionProviderFactory.TYPE : String.valueOf(poolAttribute);
+
+ JDBCConnectionProviderFactory connectionProviderFactory =
+ JDBCConnectionProviderFactory.FACTORIES.get(connectionPoolType);
+ if(connectionProviderFactory == null)
+ {
+ _logger.warn("Unknown connection pool type: " + connectionPoolType + ". no connection pooling will be used");
+ connectionProviderFactory = new DefaultConnectionProviderFactory();
+ }
+
+ try
+ {
+ _connectionProvider = connectionProviderFactory.getConnectionProvider(_connectionURL, storeSettings);
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Failed to create connection provider for " + _connectionURL);
+ }
+
+ }
+
+ @Override
+ protected Connection getConnection() throws SQLException
+ {
+ return _connectionProvider.getConnection();
+ }
+
+ protected void doClose()
+ {
+ try
+ {
+ _connectionProvider.close();
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Unable to close connection provider ", e);
+ }
+ }
+
+
+ @Override
+ protected Logger getLogger()
+ {
+ return _logger;
+ }
+
+ @Override
+ protected String getSqlBlobType()
+ {
+ return _blobType;
+ }
+
+ @Override
+ protected String getSqlVarBinaryType(int size)
+ {
+ return String.format(_varBinaryType, size);
+ }
+
+ @Override
+ protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException
+ {
+ if(_useBytesMethodsForBlob)
+ {
+ return rs.getBytes(col);
+ }
+ else
+ {
+ Blob dataAsBlob = rs.getBlob(col);
+ return dataAsBlob.getBytes(1,(int) dataAsBlob.length());
+
+ }
+ }
+
+ @Override
+ public String getSqlBigIntType()
+ {
+ return _bigIntType;
+ }
+
+ @Override
+ public String getStoreLocation()
+ {
+ return _connectionURL;
+ }
+
+}
diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java
new file mode 100644
index 0000000000..6cf1413b83
--- /dev/null
+++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.store.jdbc;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class JDBCDetails
+{
+
+ private static Map<String, JDBCDetails> VENDOR_DETAILS = new HashMap<>();
+
+ private static JDBCDetails DERBY_DETAILS =
+ new JDBCDetails("derby",
+ "blob",
+ "varchar(%d) for bit data",
+ "bigint",
+ false);
+
+ private static JDBCDetails POSTGRESQL_DETAILS =
+ new JDBCDetails("postgresql",
+ "bytea",
+ "bytea",
+ "bigint",
+ true);
+
+ private static JDBCDetails MYSQL_DETAILS =
+ new JDBCDetails("mysql",
+ "blob",
+ "varbinary(%d)",
+ "bigint",
+ false);
+
+
+ private static JDBCDetails SYBASE_DETAILS =
+ new JDBCDetails("sybase",
+ "image",
+ "varbinary(%d)",
+ "bigint",
+ false);
+
+
+ private static JDBCDetails ORACLE_DETAILS =
+ new JDBCDetails("oracle",
+ "blob",
+ "raw(%d)",
+ "number",
+ false);
+
+
+ static
+ {
+
+ addDetails(DERBY_DETAILS);
+ addDetails(POSTGRESQL_DETAILS);
+ addDetails(MYSQL_DETAILS);
+ addDetails(SYBASE_DETAILS);
+ addDetails(ORACLE_DETAILS);
+ }
+
+ public static JDBCDetails getDetails(String vendor)
+ {
+ return VENDOR_DETAILS.get(vendor);
+ }
+
+ public static JDBCDetails getDefaultDetails()
+ {
+ return DERBY_DETAILS;
+ }
+
+ private static void addDetails(JDBCDetails details)
+ {
+ VENDOR_DETAILS.put(details.getVendor(), details);
+ }
+
+ private final String _vendor;
+ private String _blobType;
+ private String _varBinaryType;
+ private String _bigintType;
+ private boolean _useBytesMethodsForBlob;
+
+ JDBCDetails(String vendor,
+ String blobType,
+ String varBinaryType,
+ String bigIntType,
+ boolean useBytesMethodsForBlob)
+ {
+ _vendor = vendor;
+ setBlobType(blobType);
+ setVarBinaryType(varBinaryType);
+ setBigintType(bigIntType);
+ setUseBytesMethodsForBlob(useBytesMethodsForBlob);
+ }
+
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ JDBCDetails that = (JDBCDetails) o;
+
+ if (!getVendor().equals(that.getVendor()))
+ {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return getVendor().hashCode();
+ }
+
+ @Override
+ public String toString()
+ {
+ return "JDBCDetails{" +
+ "vendor='" + getVendor() + '\'' +
+ ", blobType='" + getBlobType() + '\'' +
+ ", varBinaryType='" + getVarBinaryType() + '\'' +
+ ", bigIntType='" + getBigintType() + '\'' +
+ ", useBytesMethodsForBlob=" + isUseBytesMethodsForBlob() +
+ '}';
+ }
+
+ public String getVendor()
+ {
+ return _vendor;
+ }
+
+ public String getBlobType()
+ {
+ return _blobType;
+ }
+
+ public void setBlobType(String blobType)
+ {
+ _blobType = blobType;
+ }
+
+ public String getVarBinaryType()
+ {
+ return _varBinaryType;
+ }
+
+ public void setVarBinaryType(String varBinaryType)
+ {
+ _varBinaryType = varBinaryType;
+ }
+
+ public boolean isUseBytesMethodsForBlob()
+ {
+ return _useBytesMethodsForBlob;
+ }
+
+ public void setUseBytesMethodsForBlob(boolean useBytesMethodsForBlob)
+ {
+ _useBytesMethodsForBlob = useBytesMethodsForBlob;
+ }
+
+ public String getBigintType()
+ {
+ return _bigintType;
+ }
+
+ public void setBigintType(String bigintType)
+ {
+ _bigintType = bigintType;
+ }
+}
diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
deleted file mode 100644
index 61ecb6748c..0000000000
--- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
+++ /dev/null
@@ -1,522 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-package org.apache.qpid.server.store.jdbc;
-
-
-import java.sql.Blob;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory;
-import org.apache.qpid.server.store.AbstractJDBCMessageStore;
-import org.apache.qpid.server.store.Event;
-import org.apache.qpid.server.store.EventListener;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreProvider;
-import org.apache.qpid.server.store.StorableMessageMetaData;
-import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.StoreFuture;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.Transaction;
-import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
-import org.apache.qpid.server.store.handler.MessageHandler;
-import org.apache.qpid.server.store.handler.MessageInstanceHandler;
-import org.apache.qpid.server.util.MapValueConverter;
-
-/**
- * An implementation of a {@link org.apache.qpid.server.store.MessageStore} that uses a JDBC database as the persistence
- * mechanism.
- *
- */
-public class JDBCMessageStore extends AbstractJDBCMessageStore implements MessageStoreProvider
-{
-
- private static final Logger _logger = Logger.getLogger(JDBCMessageStore.class);
-
- public static final String TYPE = "JDBC";
- public static final String CONNECTION_URL = "connectionUrl";
- public static final String CONNECTION_POOL_TYPE = "connectionPoolType";
- public static final String JDBC_BIG_INT_TYPE = "bigIntType";
- public static final String JDBC_BYTES_FOR_BLOB = "bytesForBlob";
- public static final String JDBC_VARBINARY_TYPE = "varbinaryType";
- public static final String JDBC_BLOB_TYPE = "blobType";
-
- protected String _connectionURL;
- private ConnectionProvider _connectionProvider;
-
- private final MessageStore _messageStoreFacade = new MessageStoreWrapper();
-
- private static class JDBCDetails
- {
- private final String _vendor;
- private String _blobType;
- private String _varBinaryType;
- private String _bigintType;
- private boolean _useBytesMethodsForBlob;
-
- private JDBCDetails(String vendor,
- String blobType,
- String varBinaryType,
- String bigIntType,
- boolean useBytesMethodsForBlob)
- {
- _vendor = vendor;
- setBlobType(blobType);
- setVarBinaryType(varBinaryType);
- setBigintType(bigIntType);
- setUseBytesMethodsForBlob(useBytesMethodsForBlob);
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (this == o)
- {
- return true;
- }
- if (o == null || getClass() != o.getClass())
- {
- return false;
- }
-
- JDBCDetails that = (JDBCDetails) o;
-
- if (!getVendor().equals(that.getVendor()))
- {
- return false;
- }
-
- return true;
- }
-
- @Override
- public int hashCode()
- {
- return getVendor().hashCode();
- }
-
- @Override
- public String toString()
- {
- return "JDBCDetails{" +
- "vendor='" + getVendor() + '\'' +
- ", blobType='" + getBlobType() + '\'' +
- ", varBinaryType='" + getVarBinaryType() + '\'' +
- ", bigIntType='" + getBigintType() + '\'' +
- ", useBytesMethodsForBlob=" + isUseBytesMethodsForBlob() +
- '}';
- }
-
- public String getVendor()
- {
- return _vendor;
- }
-
- public String getBlobType()
- {
- return _blobType;
- }
-
- public void setBlobType(String blobType)
- {
- _blobType = blobType;
- }
-
- public String getVarBinaryType()
- {
- return _varBinaryType;
- }
-
- public void setVarBinaryType(String varBinaryType)
- {
- _varBinaryType = varBinaryType;
- }
-
- public boolean isUseBytesMethodsForBlob()
- {
- return _useBytesMethodsForBlob;
- }
-
- public void setUseBytesMethodsForBlob(boolean useBytesMethodsForBlob)
- {
- _useBytesMethodsForBlob = useBytesMethodsForBlob;
- }
-
- public String getBigintType()
- {
- return _bigintType;
- }
-
- public void setBigintType(String bigintType)
- {
- _bigintType = bigintType;
- }
- }
-
- private static JDBCDetails DERBY_DETAILS =
- new JDBCDetails("derby",
- "blob",
- "varchar(%d) for bit data",
- "bigint",
- false);
-
- private static JDBCDetails POSTGRESQL_DETAILS =
- new JDBCDetails("postgresql",
- "bytea",
- "bytea",
- "bigint",
- true);
-
- private static JDBCDetails MYSQL_DETAILS =
- new JDBCDetails("mysql",
- "blob",
- "varbinary(%d)",
- "bigint",
- false);
-
-
- private static JDBCDetails SYBASE_DETAILS =
- new JDBCDetails("sybase",
- "image",
- "varbinary(%d)",
- "bigint",
- false);
-
-
- private static JDBCDetails ORACLE_DETAILS =
- new JDBCDetails("oracle",
- "blob",
- "raw(%d)",
- "number",
- false);
-
-
- private static Map<String, JDBCDetails> VENDOR_DETAILS = new HashMap<String,JDBCDetails>();
-
- static
- {
-
- addDetails(DERBY_DETAILS);
- addDetails(POSTGRESQL_DETAILS);
- addDetails(MYSQL_DETAILS);
- addDetails(SYBASE_DETAILS);
- addDetails(ORACLE_DETAILS);
- }
-
- private static void addDetails(JDBCDetails details)
- {
- VENDOR_DETAILS.put(details.getVendor(), details);
- }
-
- private String _blobType;
- private String _varBinaryType;
- private String _bigIntType;
- private boolean _useBytesMethodsForBlob;
-
- private List<RecordedJDBCTransaction> _transactions = new CopyOnWriteArrayList<RecordedJDBCTransaction>();
-
-
- public JDBCMessageStore()
- {
- }
-
- protected Logger getLogger()
- {
- return _logger;
- }
-
- protected String getSqlBlobType()
- {
- return _blobType;
- }
-
- protected String getSqlVarBinaryType(int size)
- {
- return String.format(_varBinaryType, size);
- }
-
- public String getSqlBigIntType()
- {
- return _bigIntType;
- }
-
- @Override
- protected void doClose()
- {
- try
- {
- while(!_transactions.isEmpty())
- {
- RecordedJDBCTransaction txn = _transactions.get(0);
- txn.abortTran();
- }
- }
- finally
- {
- try
- {
- _connectionProvider.close();
- }
- catch (SQLException e)
- {
- throw new StoreException("Unable to close connection provider ", e);
- }
- }
- }
-
-
- protected Connection getConnection() throws SQLException
- {
- return _connectionProvider.getConnection();
- }
-
-
- protected void implementationSpecificConfiguration(String name, Map<String, Object> storeSettings)
- throws ClassNotFoundException, SQLException
- {
- _connectionURL = String.valueOf(storeSettings.get(CONNECTION_URL));
- Object poolAttribute = storeSettings.get(CONNECTION_POOL_TYPE);
-
- JDBCDetails details = null;
-
- String[] components = _connectionURL.split(":",3);
- if(components.length >= 2)
- {
- String vendor = components[1];
- details = VENDOR_DETAILS.get(vendor);
- }
-
- if(details == null)
- {
- getLogger().info("Do not recognize vendor from connection URL: " + _connectionURL);
-
- // TODO - is there a better default than derby
- details = DERBY_DETAILS;
- }
-
- String connectionPoolType = poolAttribute == null ? DefaultConnectionProviderFactory.TYPE : String.valueOf(poolAttribute);
-
- JDBCConnectionProviderFactory connectionProviderFactory =
- JDBCConnectionProviderFactory.FACTORIES.get(connectionPoolType);
- if(connectionProviderFactory == null)
- {
- _logger.warn("Unknown connection pool type: " + connectionPoolType + ". no connection pooling will be used");
- connectionProviderFactory = new DefaultConnectionProviderFactory();
- }
-
- _connectionProvider = connectionProviderFactory.getConnectionProvider(_connectionURL, storeSettings);
- _blobType = MapValueConverter.getStringAttribute(JDBC_BLOB_TYPE, storeSettings, details.getBlobType());
- _varBinaryType = MapValueConverter.getStringAttribute(JDBC_VARBINARY_TYPE, storeSettings, details.getVarBinaryType());
- _useBytesMethodsForBlob = MapValueConverter.getBooleanAttribute(JDBC_BYTES_FOR_BLOB, storeSettings, details.isUseBytesMethodsForBlob());
- _bigIntType = MapValueConverter.getStringAttribute(JDBC_BIG_INT_TYPE, storeSettings, details.getBigintType());
- }
-
- @Override
- protected void storedSizeChange(int contentSize)
- {
- }
-
- public String getStoreLocation()
- {
- return _connectionURL;
- }
-
- @Override
- protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException
- {
- if(_useBytesMethodsForBlob)
- {
- return rs.getBytes(col);
- }
- else
- {
- Blob dataAsBlob = rs.getBlob(col);
- return dataAsBlob.getBytes(1,(int) dataAsBlob.length());
-
- }
- }
-
- @Override
- protected String getBlobAsString(ResultSet rs, int col) throws SQLException
- {
- byte[] bytes;
- if(_useBytesMethodsForBlob)
- {
- bytes = rs.getBytes(col);
- return new String(bytes,UTF8_CHARSET);
- }
- else
- {
- Blob blob = rs.getBlob(col);
- if(blob == null)
- {
- return null;
- }
- bytes = blob.getBytes(1, (int)blob.length());
- }
- return new String(bytes, UTF8_CHARSET);
-
- }
-
- @Override
- public Transaction newTransaction()
- {
- return new RecordedJDBCTransaction();
- }
-
- private class RecordedJDBCTransaction extends JDBCTransaction
- {
- private RecordedJDBCTransaction()
- {
- super();
- JDBCMessageStore.this._transactions.add(this);
- }
-
- @Override
- public void commitTran()
- {
- try
- {
- super.commitTran();
- }
- finally
- {
- JDBCMessageStore.this._transactions.remove(this);
- }
- }
-
- @Override
- public StoreFuture commitTranAsync()
- {
- try
- {
- return super.commitTranAsync();
- }
- finally
- {
- JDBCMessageStore.this._transactions.remove(this);
- }
- }
-
- @Override
- public void abortTran()
- {
- try
- {
- super.abortTran();
- }
- finally
- {
- JDBCMessageStore.this._transactions.remove(this);
- }
- }
- }
-
- @Override
- public MessageStore getMessageStore()
- {
- return _messageStoreFacade;
- }
-
- private class MessageStoreWrapper implements MessageStore
- {
-
- @Override
- public void openMessageStore(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings)
- {
- JDBCMessageStore.this.openMessageStore(parent, messageStoreSettings);
- }
-
- @Override
- public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(final T metaData)
- {
- return JDBCMessageStore.this.addMessage(metaData);
- }
-
- @Override
- public boolean isPersistent()
- {
- return JDBCMessageStore.this.isPersistent();
- }
-
- @Override
- public Transaction newTransaction()
- {
- return JDBCMessageStore.this.newTransaction();
- }
-
- @Override
- public void closeMessageStore()
- {
- JDBCMessageStore.this.closeMessageStore();
- }
-
- @Override
- public void addEventListener(final EventListener eventListener, final Event... events)
- {
- JDBCMessageStore.this.addEventListener(eventListener, events);
- }
-
- @Override
- public void upgradeStoreStructure() throws StoreException
- {
- }
-
- @Override
- public String getStoreLocation()
- {
- return JDBCMessageStore.this.getStoreLocation();
- }
-
- @Override
- public void onDelete()
- {
- JDBCMessageStore.this.onDelete();
- }
-
- @Override
- public void visitMessages(final MessageHandler handler) throws StoreException
- {
- JDBCMessageStore.this.visitMessages(handler);
- }
-
- @Override
- public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException
- {
- JDBCMessageStore.this.visitMessageInstances(handler);
- }
-
- @Override
- public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException
- {
- JDBCMessageStore.this.visitDistributedTransactions(handler);
- }
- }
-
-}
diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhost/jdbc/JDBCVirtualHost.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhost/jdbc/JDBCVirtualHost.java
index 1dd39a8696..85e8f89dbe 100644
--- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhost/jdbc/JDBCVirtualHost.java
+++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhost/jdbc/JDBCVirtualHost.java
@@ -26,7 +26,7 @@ import org.apache.qpid.server.model.ManagedObject;
import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.jdbc.JDBCMessageStore;
+import org.apache.qpid.server.store.jdbc.GenericJDBCMessageStore;
import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
@ManagedObject(category = false, type = JDBCVirtualHost.VIRTUAL_HOST_TYPE)
@@ -45,6 +45,6 @@ public class JDBCVirtualHost extends AbstractVirtualHost<JDBCVirtualHost>
@Override
protected MessageStore createMessageStore()
{
- return new JDBCMessageStore().getMessageStore();
+ return new GenericJDBCMessageStore();
}
}
diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java
index 2108b4e09a..ab8f4554cb 100644
--- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java
+++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java
@@ -27,7 +27,7 @@ import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.ManagedObject;
import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.server.store.jdbc.JDBCMessageStore;
+import org.apache.qpid.server.store.jdbc.GenericJDBCConfigurationStore;
import org.apache.qpid.server.virtualhostnode.AbstractStandardVirtualHostNode;
@ManagedObject(type = JDBCVirtualHostNodeImpl.VIRTUAL_HOST_NODE_TYPE, category = false )
@@ -62,7 +62,7 @@ public class JDBCVirtualHostNodeImpl extends AbstractStandardVirtualHostNode<JDB
@Override
protected DurableConfigurationStore createConfigurationStore()
{
- return new JDBCMessageStore();
+ return new GenericJDBCConfigurationStore();
}
@Override
diff --git a/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java b/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
index 1f03b7f75f..8261e93347 100644
--- a/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
+++ b/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
@@ -52,7 +52,7 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase
public void testOnDelete() throws Exception
{
- Set<String> expectedTables = JDBCMessageStore.MESSAGE_STORE_TABLE_NAMES;
+ Set<String> expectedTables = GenericJDBCMessageStore.MESSAGE_STORE_TABLE_NAMES;
assertTablesExist(expectedTables, true);
getStore().closeMessageStore();
assertTablesExist(expectedTables, true);
@@ -65,7 +65,7 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase
{
_connectionURL = "jdbc:derby:memory:/" + getTestName() + ";create=true";
Map<String, Object> messageStoreSettings = new HashMap<String, Object>();
- messageStoreSettings.put(JDBCMessageStore.CONNECTION_URL, _connectionURL);
+ messageStoreSettings.put(GenericJDBCMessageStore.CONNECTION_URL, _connectionURL);
return messageStoreSettings;
}
@@ -73,7 +73,7 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase
@Override
protected MessageStore createMessageStore()
{
- return (new JDBCMessageStore()).getMessageStore();
+ return new GenericJDBCMessageStore();
}
private void assertTablesExist(Set<String> expectedTables, boolean exists) throws SQLException