summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java350
1 files changed, 5 insertions, 345 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
index 154d7e6535..e9946d1860 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
@@ -23,7 +23,6 @@ package org.apache.qpid.server.store.derby;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
@@ -41,7 +40,6 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -55,12 +53,9 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.federation.Bridge;
-import org.apache.qpid.server.federation.BrokerLink;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler;
import org.apache.qpid.server.store.ConfiguredObjectHelper;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.Event;
@@ -236,7 +231,7 @@ public class DerbyMessageStore implements MessageStore
private static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006";
- private static final String DERBY_STORE_TYPE = "DERBY";
+ public static final String TYPE = "DERBY";
private final StateManager _stateManager;
@@ -572,8 +567,7 @@ public class DerbyMessageStore implements MessageStore
BindingRecoveryHandler brh = qrh.completeQueueRecovery();
_configuredObjectHelper.recoverBindings(brh, configuredObjects);
- BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery();
- recoverBrokerLinks(lrh);
+ brh.completeBindingRecovery();
}
catch (SQLException e)
{
@@ -581,144 +575,6 @@ public class DerbyMessageStore implements MessageStore
}
}
- private void recoverBrokerLinks(final ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh)
- throws SQLException
- {
- _logger.info("Recovering broker links...");
-
- Connection conn = null;
- try
- {
- conn = newAutoCommitConnection();
-
- PreparedStatement stmt = conn.prepareStatement(SELECT_ALL_FROM_LINKS);
-
- try
- {
- ResultSet rs = stmt.executeQuery();
-
- try
- {
-
- while(rs.next())
- {
- UUID id = new UUID(rs.getLong(2), rs.getLong(1));
- long createTime = rs.getLong(3);
- Blob argumentsAsBlob = rs.getBlob(4);
-
- byte[] dataAsBytes = argumentsAsBlob.getBytes(1,(int) argumentsAsBlob.length());
-
- DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dataAsBytes));
- int size = dis.readInt();
-
- Map<String,String> arguments = new HashMap<String, String>();
-
- for(int i = 0; i < size; i++)
- {
- arguments.put(dis.readUTF(), dis.readUTF());
- }
-
- ConfigurationRecoveryHandler.BridgeRecoveryHandler brh = lrh.brokerLink(id, createTime, arguments);
-
- recoverBridges(brh, id);
-
- }
- }
- catch (IOException e)
- {
- throw new SQLException(e.getMessage(), e);
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
-
- }
- finally
- {
- if(conn != null)
- {
- conn.close();
- }
- }
-
- }
-
- private void recoverBridges(final ConfigurationRecoveryHandler.BridgeRecoveryHandler brh, final UUID linkId)
- throws SQLException
- {
- _logger.info("Recovering bridges for link " + linkId + "...");
-
- Connection conn = null;
- try
- {
- conn = newAutoCommitConnection();
-
- PreparedStatement stmt = conn.prepareStatement(SELECT_ALL_FROM_BRIDGES);
-
- try
- {
- stmt.setLong(1, linkId.getLeastSignificantBits());
- stmt.setLong(2, linkId.getMostSignificantBits());
-
- ResultSet rs = stmt.executeQuery();
-
- try
- {
-
- while(rs.next())
- {
- UUID id = new UUID(rs.getLong(2), rs.getLong(1));
- long createTime = rs.getLong(3);
- Blob argumentsAsBlob = rs.getBlob(6);
-
- byte[] dataAsBytes = argumentsAsBlob.getBytes(1,(int) argumentsAsBlob.length());
-
- DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dataAsBytes));
- int size = dis.readInt();
-
- Map<String,String> arguments = new HashMap<String, String>();
-
- for(int i = 0; i < size; i++)
- {
- arguments.put(dis.readUTF(), dis.readUTF());
- }
-
- brh.bridge(id, createTime, arguments);
-
- }
- brh.completeBridgeRecoveryForLink();
- }
- catch (IOException e)
- {
- throw new SQLException(e.getMessage(), e);
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
-
- }
- finally
- {
- if(conn != null)
- {
- conn.close();
- }
- }
-
- }
-
@Override
public void close() throws Exception
{
@@ -975,71 +831,6 @@ public class DerbyMessageStore implements MessageStore
}
}
- @Override
- public void createBrokerLink(final BrokerLink link) throws AMQStoreException
- {
- _logger.debug("public void createBrokerLink(BrokerLink = " + link + "): called");
-
- if (_stateManager.isInState(State.ACTIVE))
- {
- try
- {
- Connection conn = newAutoCommitConnection();
-
- PreparedStatement stmt = conn.prepareStatement(FIND_LINK);
- try
- {
-
- stmt.setLong(1, link.getQMFId().getLeastSignificantBits());
- stmt.setLong(2, link.getQMFId().getMostSignificantBits());
- ResultSet rs = stmt.executeQuery();
- try
- {
-
- // If we don't have any data in the result set then we can add this queue
- if (!rs.next())
- {
- PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_LINKS);
-
- try
- {
-
- insertStmt.setLong(1, link.getQMFId().getLeastSignificantBits());
- insertStmt.setLong(2, link.getQMFId().getMostSignificantBits());
- insertStmt.setLong(3, link.getCreateTime());
-
- byte[] argumentBytes = convertStringMapToBytes(link.getArguments());
- ByteArrayInputStream bis = new ByteArrayInputStream(argumentBytes);
-
- insertStmt.setBinaryStream(4,bis,argumentBytes.length);
-
- insertStmt.execute();
- }
- finally
- {
- insertStmt.close();
- }
- }
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
- conn.close();
-
- }
- catch (SQLException e)
- {
- throw new AMQStoreException("Error writing " + link + " to database: " + e.getMessage(), e);
- }
- }
- }
-
private byte[] convertStringMapToBytes(final Map<String, String> arguments) throws AMQStoreException
{
byte[] argumentBytes;
@@ -1072,139 +863,7 @@ public class DerbyMessageStore implements MessageStore
return argumentBytes;
}
- @Override
- public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException
- {
- _logger.debug("public void deleteBrokerLink( " + link + "): called");
- Connection conn = null;
- PreparedStatement stmt = null;
- try
- {
- conn = newAutoCommitConnection();
- stmt = conn.prepareStatement(DELETE_FROM_LINKS);
- stmt.setLong(1, link.getQMFId().getLeastSignificantBits());
- stmt.setLong(2, link.getQMFId().getMostSignificantBits());
- int results = stmt.executeUpdate();
-
- if (results == 0)
- {
- throw new AMQStoreException("Link " + link + " not found");
- }
- }
- catch (SQLException e)
- {
- throw new AMQStoreException("Error deleting Link " + link + " from database: " + e.getMessage(), e);
- }
- finally
- {
- closePreparedStatement(stmt);
- closeConnection(conn);
- }
-
-
- }
-
- @Override
- public void createBridge(final Bridge bridge) throws AMQStoreException
- {
- _logger.debug("public void createBridge(BrokerLink = " + bridge + "): called");
- if (_stateManager.isInState(State.ACTIVE))
- {
- try
- {
- Connection conn = newAutoCommitConnection();
-
- PreparedStatement stmt = conn.prepareStatement(FIND_BRIDGE);
- try
- {
-
- UUID id = bridge.getQMFId();
- stmt.setLong(1, id.getLeastSignificantBits());
- stmt.setLong(2, id.getMostSignificantBits());
- ResultSet rs = stmt.executeQuery();
- try
- {
-
- // If we don't have any data in the result set then we can add this queue
- if (!rs.next())
- {
- PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_BRIDGES);
-
- try
- {
-
- insertStmt.setLong(1, id.getLeastSignificantBits());
- insertStmt.setLong(2, id.getMostSignificantBits());
-
- insertStmt.setLong(3, bridge.getCreateTime());
-
- UUID linkId = bridge.getLink().getQMFId();
- insertStmt.setLong(4, linkId.getLeastSignificantBits());
- insertStmt.setLong(5, linkId.getMostSignificantBits());
-
- byte[] argumentBytes = convertStringMapToBytes(bridge.getArguments());
- ByteArrayInputStream bis = new ByteArrayInputStream(argumentBytes);
-
- insertStmt.setBinaryStream(6,bis,argumentBytes.length);
-
- insertStmt.execute();
- }
- finally
- {
- insertStmt.close();
- }
- }
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
- conn.close();
-
- }
- catch (SQLException e)
- {
- throw new AMQStoreException("Error writing " + bridge + " to database: " + e.getMessage(), e);
- }
- }
- }
-
- @Override
- public void deleteBridge(final Bridge bridge) throws AMQStoreException
- {
- _logger.debug("public void deleteBridge( " + bridge + "): called");
- Connection conn = null;
- PreparedStatement stmt = null;
- try
- {
- conn = newAutoCommitConnection();
- stmt = conn.prepareStatement(DELETE_FROM_BRIDGES);
- stmt.setLong(1, bridge.getQMFId().getLeastSignificantBits());
- stmt.setLong(2, bridge.getQMFId().getMostSignificantBits());
- int results = stmt.executeUpdate();
-
- if (results == 0)
- {
- throw new AMQStoreException("Bridge " + bridge + " not found");
- }
- }
- catch (SQLException e)
- {
- throw new AMQStoreException("Error deleting bridge " + bridge + " from database: " + e.getMessage(), e);
- }
- finally
- {
- closePreparedStatement(stmt);
- closeConnection(conn);
- }
-
- }
@Override
public Transaction newTransaction()
@@ -2134,8 +1793,9 @@ public class DerbyMessageStore implements MessageStore
public ByteBuffer getContent(int offsetInMessage, int size)
{
ByteBuffer buf = ByteBuffer.allocate(size);
- getContent(offsetInMessage, buf);
+ int length = getContent(offsetInMessage, buf);
buf.position(0);
+ buf.limit(length);
return buf;
}
@@ -2673,7 +2333,7 @@ public class DerbyMessageStore implements MessageStore
@Override
public String getStoreType()
{
- return DERBY_STORE_TYPE;
+ return TYPE;
}
} \ No newline at end of file