summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java')
-rwxr-xr-xqpid/java/bdbstore/bin/backup.sh2
-rwxr-xr-xqpid/java/bdbstore/bin/storeUpgrade.sh2
-rwxr-xr-xqpid/java/bdbstore/etc/scripts/bdbbackuptest.sh44
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java193
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/testclient/BackupTestClient.java120
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/PersistentStoreTest.java165
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/utils/BrokerHolder.java2
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java15
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java19
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/utils/Piper.java130
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java179
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/utils/ReflectionUtils.java31
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/utils/SpawnedBrokerHolder.java97
-rw-r--r--qpid/java/test-profiles/JavaDerbyExcludes1
-rw-r--r--qpid/java/test-profiles/JavaTransientExcludes1
15 files changed, 590 insertions, 411 deletions
diff --git a/qpid/java/bdbstore/bin/backup.sh b/qpid/java/bdbstore/bin/backup.sh
index b58ab16282..75f9e1d968 100755
--- a/qpid/java/bdbstore/bin/backup.sh
+++ b/qpid/java/bdbstore/bin/backup.sh
@@ -33,7 +33,7 @@ if [ -z "$QPID_HOME" ]; then
fi
VERSION=0.15
-LIBS=$QPID_HOME/lib/opt/je-4.0.117.jar:$QPID_HOME/lib/qpid-bdbstore-$VERSION.jar:$QPID_HOME/lib/qpid-all.jar
+LIBS=$QPID_HOME/lib/opt/je-5.0.34.jar:$QPID_HOME/lib/qpid-bdbstore-$VERSION.jar:$QPID_HOME/lib/qpid-all.jar
echo "Starting Hot Backup Script"
diff --git a/qpid/java/bdbstore/bin/storeUpgrade.sh b/qpid/java/bdbstore/bin/storeUpgrade.sh
index ffb33f7fbd..dd53529a22 100755
--- a/qpid/java/bdbstore/bin/storeUpgrade.sh
+++ b/qpid/java/bdbstore/bin/storeUpgrade.sh
@@ -34,6 +34,6 @@ fi
VERSION=0.15
-LIBS=$QPID_HOME/lib/opt/je-4.0.117.jar:$QPID_HOME/lib/qpid-bdbstore-$VERSION.jar:$QPID_HOME/lib/qpid-all.jar
+LIBS=$QPID_HOME/lib/opt/je-5.0.34.jar:$QPID_HOME/lib/qpid-bdbstore-$VERSION.jar:$QPID_HOME/lib/qpid-all.jar
java -Xms256m -Dlog4j.configuration=BDBStoreUpgrade.log4j.xml -Xmx256m -Damqj.logging.level=warn ${JAVA_OPTS} -cp $LIBS org.apache.qpid.server.store.berkeleydb.BDBStoreUpgrade ${ARGS}
diff --git a/qpid/java/bdbstore/etc/scripts/bdbbackuptest.sh b/qpid/java/bdbstore/etc/scripts/bdbbackuptest.sh
deleted file mode 100755
index 4224f98de2..0000000000
--- a/qpid/java/bdbstore/etc/scripts/bdbbackuptest.sh
+++ /dev/null
@@ -1,44 +0,0 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-if [ -z "$QPID_HOME" ]; then
- export QPID_HOME=$(dirname $(dirname $(readlink -f $0)))
- export PATH=${PATH}:${QPID_HOME}/bin
-fi
-
-# Parse arguements taking all - prefixed args as JAVA_OPTS
-for arg in "$@"; do
- if [[ $arg == -java:* ]]; then
- JAVA_OPTS="${JAVA_OPTS}-`echo $arg|cut -d ':' -f 2` "
- else
- ARGS="${ARGS}$arg "
- fi
-done
-
-VERSION=0.15
-
-# Set classpath to include Qpid jar with all required jars in manifest
-QPID_LIBS=$QPID_HOME/lib/qpid-all.jar:$QPID_HOME/lib/qpid-junit-toolkit-$VERSION.jar:$QPID_HOME/lib/junit-3.8.1.jar:$QPID_HOME/lib/log4j-1.2.12.jar:$QPID_HOME/lib/qpid-systests-$VERSION.jar:$QPID_HOME/lib/qpid-perftests-$VERSION.jar:$QPID_HOME/lib/slf4j-log4j12-1.6.1.jar:$QPID_HOME/lib/qpid-bdbstore-$VERSION.jar
-
-# Set other variables used by the qpid-run script before calling
-export JAVA=java JAVA_MEM=-Xmx256m QPID_CLASSPATH=$QPID_LIBS
-
-. qpid-run -Dlog4j.configuration=perftests.log4j -Dbadger.level=warn -Damqj.test.logging.level=warn -Damqj.logging.level=warn ${JAVA_OPTS} org.apache.qpid.server.store.berkeleydb.testclient.BackupTestClient -o $QPID_WORK/results numMessagesToAction=55 ${ARGS}
-
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java
new file mode 100644
index 0000000000..342c185b99
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.test.utils.Piper;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.util.FileUtils;
+
+/**
+ * Tests the BDB backup script can successfully perform a backup and that
+ * backup can be restored and used by the Broker.
+ */
+public class BDBBackupTest extends QpidBrokerTestCase
+{
+ protected static final Logger LOGGER = Logger.getLogger(BDBBackupTest.class);
+
+ private static final String BACKUP_SCRIPT = "/bin/backup.sh";
+ private static final String BACKUP_COMPLETE_MESSAGE = "Hot Backup Completed";
+
+ private static final String TEST_VHOST = "test";
+ private static final String SYSTEM_TMP_DIR = System.getProperty("java.io.tmpdir");
+
+ private File _backupToDir;
+ private File _backupFromDir;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ _backupToDir = new File(SYSTEM_TMP_DIR + File.separator + getTestName());
+ _backupToDir.mkdirs();
+
+ final String qpidWork = getBroker(DEFAULT_PORT).getWorkingDirectory();
+
+ // It would be preferable to lookup the store path using #getConfigurationStringProperty("virtualhosts...")
+ // but the config as known to QBTC does not pull-in the virtualhost section from its separate source file
+ _backupFromDir = new File(qpidWork + "/bdbstore/" + TEST_VHOST + "-store");
+ boolean fromDirExistsAndIsDir = _backupFromDir.isDirectory();
+ assertTrue("backupFromDir " + _backupFromDir + " should already exist", fromDirExistsAndIsDir);
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ super.tearDown();
+ }
+ finally
+ {
+ FileUtils.delete(_backupToDir, true);
+ }
+ }
+
+ public void testBackupAndRestoreMaintainsMessages() throws Exception
+ {
+ sendNumberedMessages(0, 10);
+ invokeBdbBackup(_backupFromDir, _backupToDir);
+ sendNumberedMessages(10, 20);
+ confirmBrokerHasMessages(0, 20);
+ stopBroker();
+
+ deleteStore(_backupFromDir);
+ replaceStoreWithBackup(_backupToDir, _backupFromDir);
+
+ startBroker();
+ confirmBrokerHasMessages(0, 10);
+ }
+
+ private void sendNumberedMessages(final int startIndex, final int endIndex) throws JMSException, Exception
+ {
+ Connection con = getConnection();
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination destination = session.createQueue(getTestQueueName());
+ // Create queue by consumer side-effect
+ session.createConsumer(destination).close();
+
+ final int numOfMessages = endIndex - startIndex;
+ final int batchSize = 0;
+ sendMessage(session, destination, numOfMessages, startIndex, batchSize);
+ con.close();
+ }
+
+ private void confirmBrokerHasMessages(final int startIndex, final int endIndex) throws Exception
+ {
+ Connection con = getConnection();
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ con.start();
+ Destination destination = session.createQueue(getTestQueueName());
+ MessageConsumer consumer = session.createConsumer(destination);
+ for (int i = startIndex; i < endIndex; i++)
+ {
+ Message msg = consumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Message " + i + " not received", msg);
+ assertEquals("Did not receive the expected message", i, msg.getIntProperty(INDEX));
+ }
+
+ Message msg = consumer.receive(100);
+ if(msg != null)
+ {
+ fail("No more messages should be received, but received additional message with index: " + msg.getIntProperty(INDEX));
+ }
+ con.close();
+ }
+
+ private void invokeBdbBackup(final File backupFromDir, final File backupToDir) throws Exception
+ {
+ if (String.valueOf(System.getProperty("os.name")).toLowerCase().contains("windows"))
+ {
+ BDBBackup.main(new String[]{"-todir", backupToDir.getAbsolutePath(), "-fromdir", backupFromDir.getAbsolutePath()});
+ }
+ else
+ {
+ runBdbBackupScript(backupFromDir, backupToDir);
+ }
+ }
+
+ private void runBdbBackupScript(final File backupFromDir, final File backupToDir) throws IOException,
+ InterruptedException
+ {
+ Process backupProcess = null;
+ try
+ {
+ String qpidHome = System.getProperty(QPID_HOME);
+ ProcessBuilder pb = new ProcessBuilder(qpidHome + BACKUP_SCRIPT, "-todir", backupToDir.getAbsolutePath(), "-fromdir", backupFromDir.getAbsolutePath());
+ pb.redirectErrorStream(true);
+ Map<String, String> env = pb.environment();
+ env.put(QPID_HOME, qpidHome);
+
+ LOGGER.debug("Backup command is " + pb.command());
+ backupProcess = pb.start();
+ Piper piper = new Piper(backupProcess.getInputStream(), _testcaseOutputStream, null, BACKUP_COMPLETE_MESSAGE);
+ piper.start();
+ piper.await(2, TimeUnit.SECONDS);
+ backupProcess.waitFor();
+ piper.join();
+
+ LOGGER.debug("Backup command completed " + backupProcess.exitValue());
+ assertEquals("Unexpected exit value from backup script", 0, backupProcess.exitValue());
+ }
+ finally
+ {
+ if (backupProcess != null)
+ {
+ backupProcess.getErrorStream().close();
+ backupProcess.getInputStream().close();
+ backupProcess.getOutputStream().close();
+ }
+ }
+ }
+
+ private void replaceStoreWithBackup(File source, File dst) throws Exception
+ {
+ LOGGER.debug("Copying store " + source + " to " + dst);
+ FileUtils.copyRecursive(source, dst);
+ }
+
+ private void deleteStore(File storeDir)
+ {
+ LOGGER.debug("Deleting store " + storeDir);
+ FileUtils.delete(storeDir, true);
+ }
+
+}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/testclient/BackupTestClient.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/testclient/BackupTestClient.java
deleted file mode 100644
index f6344b3d7d..0000000000
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/testclient/BackupTestClient.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.testclient;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.ping.PingDurableClient;
-import org.apache.qpid.server.store.berkeleydb.BDBBackup;
-import org.apache.qpid.util.CommandLineParser;
-
-import java.util.Properties;
-
-/**
- * BackupTestClient extends {@link PingDurableClient} with an action that takes a BDB backup when a configurable
- * message count is reached. This enables a test user to restore this beackup, knowing how many committed but undelivered
- * messages were in the backup, in order to check that all are re-delivered when the backup is retored.
- *
- * <p><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Perform BDB Backup on configurable message count.
- * </table>
- */
-public class BackupTestClient extends PingDurableClient
-{
- /** Used for debugging. */
- private static final Logger log = Logger.getLogger(BackupTestClient.class);
-
- /** Holds the from directory to take backups from. */
- private String fromDir;
-
- /** Holds the to directory to store backups in. */
- private String toDir;
-
- /**
- * Default constructor, passes all property overrides to the parent.
- *
- * @param overrides Any property overrides to apply to the defaults.
- *
- * @throws Exception Any underlying exception is allowed to fall through.
- */
- BackupTestClient(Properties overrides) throws Exception
- {
- super(overrides);
- }
-
- /**
- * Starts the ping/wait/receive process. From and to directory locations for the BDB backups must be specified
- * on the command line:
- *
- * <p/><table><caption>Command Line</caption>
- * <tr><th> Option <th> Comment
- * <tr><td> -fromdir <td> The path to the directory to back the bdb log file from.
- * <tr><td> -todir <td> The path to the directory to save the backed up bdb log files to.
- * </table>
- *
- * @param args The command line arguments.
- */
- public static void main(String[] args)
- {
- try
- {
- // Use the same command line format as BDBBackup utility, (compulsory from and to directories).
- Properties options =
- CommandLineParser.processCommandLine(args, new CommandLineParser(BDBBackup.COMMAND_LINE_SPEC),
- System.getProperties());
- BackupTestClient pingProducer = new BackupTestClient(options);
-
- // Keep the from and to directories for backups.
- pingProducer.fromDir = options.getProperty("fromdir");
- pingProducer.toDir = options.getProperty("todir");
-
- // Create a shutdown hook to terminate the ping-pong producer.
- Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook());
-
- // Ensure that the ping pong producer is registered to listen for exceptions on the connection too.
- // pingProducer.getConnection().setExceptionListener(pingProducer);
-
- // Run the test procedure.
- int sent = pingProducer.send();
- pingProducer.waitForUser("Press return to begin receiving the pings.");
- pingProducer.receive(sent);
-
- System.exit(0);
- }
- catch (Exception e)
- {
- System.err.println(e.getMessage());
- log.error("Top level handler caught execption.", e);
- System.exit(1);
- }
- }
-
- /**
- * Supplies a triggered action extension, based on message count. This action takes a BDB log file backup.
- */
- public void takeAction()
- {
- BDBBackup backupUtil = new BDBBackup();
- backupUtil.takeBackupNoLock(fromDir, toDir);
- System.out.println("Took backup of BDB log files from directory: " + fromDir);
- }
-}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/PersistentStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/PersistentStoreTest.java
index 3eac80f175..555c4dd20d 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/PersistentStoreTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/PersistentStoreTest.java
@@ -26,146 +26,117 @@ import org.apache.qpid.test.utils.QpidBrokerTestCase;
import javax.jms.Connection;
import javax.jms.Destination;
-import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
-import javax.jms.Queue;
import javax.jms.Session;
import java.util.ArrayList;
import java.util.List;
public class PersistentStoreTest extends QpidBrokerTestCase
{
-
private static final int NUM_MESSAGES = 100;
private Connection _con;
private Session _session;
- private Queue _destination;
- private MessageConsumer _consumer;
+ private Destination _destination;
- public void setUp() throws Exception, JMSException
+ public void setUp() throws Exception
{
super.setUp();
_con = getConnection();
- _con.start();
- _session = _con.createSession(true, Session.SESSION_TRANSACTED);
- _destination = _session.createQueue(getTestQueueName());
- _consumer = _session.createConsumer(_destination);
- _consumer.close();
+ }
- sendMessage(_session, _destination, NUM_MESSAGES);
- _session.commit();
+ public void testCommittedMessagesSurviveBrokerNormalShutdown() throws Exception
+ {
+ sendAndCommitMessages();
+ stopBroker();
+ startBroker();
+ confirmBrokerStillHasCommittedMessages();
}
- /** Checks that a new consumer on a new connection can get NUM_MESSAGES from _destination */
- private void checkMessages() throws Exception, JMSException
+ public void testCommittedMessagesSurviveBrokerAbnormalShutdown() throws Exception
{
- _con = getConnection();
- _session = _con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- _con.start();
- _consumer = _session.createConsumer(_destination);
- for (int i = 1; i <= NUM_MESSAGES; i++)
- {
- Message msg = _consumer.receive(RECEIVE_TIMEOUT);
- assertNotNull("Message " + i + " not received", msg);
- assertEquals("Did not receive the expected message", i, msg.getIntProperty(INDEX));
- }
-
- Message msg = _consumer.receive(100);
- if(msg != null)
+ if (isInternalBroker())
{
- fail("No more messages should be received, but received additional message with index: " + msg.getIntProperty(INDEX));
+ return;
}
- }
-// /**
-// * starts the server, sends 100 messages, restarts the server and gets 100 messages back
-// * the test formerly referred to as BDB-Qpid-1
-// * @throws Exception
-// */
-// public void testStartStop() throws Exception
-// {
-// restartBroker(); -- Not Currently a gracefull restart so not BDB-Qpid-1
-// checkMessages();
-// }
+ sendAndCommitMessages();
+ killBroker();
+ startBroker();
+ confirmBrokerStillHasCommittedMessages();
+ }
- /**
- * starts the server, sends 100 messages, nukes then starts the server and gets 100 messages back
- * the test formerly referred to as BDB-Qpid-2
- *
- * @throws Exception
- */
- public void testForcibleStartStop() throws Exception
+ public void testCommittedMessagesSurviveBrokerNormalShutdownMidTransaction() throws Exception
{
- restartBroker();
- checkMessages();
+ sendAndCommitMessages();
+ sendMoreMessagesWithoutCommitting();
+ stopBroker();
+ startBroker();
+ confirmBrokerStillHasCommittedMessages();
}
-// /**
-// * starts the server, sends 100 committed messages, 5 uncommited ones,
-// * restarts the server and gets 100 messages back
-// * the test formerly referred to as BDB-Qpid-5
-// * @throws Exception
-// */
-// public void testStartStopMidTransaction() throws Exception
-// {
-// sendMessage(_session, _destination, 5);
-// restartBroker(); -- Not Currently a gracefull restart so not BDB-Qpid-1
-// checkMessages();
-// }
+ public void testCommittedMessagesSurviveBrokerAbnormalShutdownMidTransaction() throws Exception
+ {
+ if (isInternalBroker())
+ {
+ return;
+ }
+ sendAndCommitMessages();
+ sendMoreMessagesWithoutCommitting();
+ killBroker();
+ startBroker();
+ confirmBrokerStillHasCommittedMessages();
+ }
- /**
- * starts the server, sends 100 committed messages, 5 uncommited ones,
- * nukes and starts the server and gets 100 messages back
- * the test formerly referred to as BDB-Qpid-6
- *
- * @throws Exception
- */
- public void testForcibleStartStopMidTransaction() throws Exception
+ private void sendAndCommitMessages() throws Exception
{
- sendMessage(_session, _destination, 5);
- //sync to ensure that the above messages have reached the broker
- ((AMQSession) _session).sync();
- restartBroker();
- checkMessages();
+ _session = _con.createSession(true, Session.SESSION_TRANSACTED);
+ _destination = _session.createQueue(getTestQueueName());
+ // Create queue by consumer side-effect
+ _session.createConsumer(_destination).close();
+
+ sendMessage(_session, _destination, NUM_MESSAGES);
+ _session.commit();
}
- /**
- * starts the server, sends 100 committed messages, 5 uncommited ones,
- * restarts the client and gets 100 messages back.
- * the test formerly referred to as BDB-Qpid-7
- *
- * FIXME: is this a PersistentStoreTest? Seems more like a transaction test to me.. aidan
- *
- * @throws Exception
- */
- public void testClientDeathMidTransaction() throws Exception
+ private void sendMoreMessagesWithoutCommitting() throws Exception
{
sendMessage(_session, _destination, 5);
- _con.close();
- checkMessages();
+ // sync to ensure that messages have reached the broker
+ ((AMQSession<?,?>) _session).sync();
}
-// /**
-// * starts the server, sends 50 committed messages, copies $QPID_WORK to a new location,
-// * sends 10 messages, stops the server, nukes the store, restores the copy, starts the server
-// * checks that we get the first 50 back.
-// */
-// public void testHotBackup()
-// {
-// -- removing as this will leave 100msgs on a queue
-// }
+ private void confirmBrokerStillHasCommittedMessages() throws Exception
+ {
+ Connection con = getConnection();
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ con.start();
+ Destination destination = session.createQueue(getTestQueueName());
+ MessageConsumer consumer = session.createConsumer(destination);
+ for (int i = 1; i <= NUM_MESSAGES; i++)
+ {
+ Message msg = consumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Message " + i + " not received", msg);
+ assertEquals("Did not receive the expected message", i, msg.getIntProperty(INDEX));
+ }
+
+ Message msg = consumer.receive(100);
+ if(msg != null)
+ {
+ fail("No more messages should be received, but received additional message with index: " + msg.getIntProperty(INDEX));
+ }
+ }
/**
- * This test requires that we can send messages without commiting.
+ * This test requires that we can send messages without committing.
* QTC always commits the messages sent via sendMessages.
*
* @param session the session to use for sending
* @param destination where to send them to
* @param count no. of messages to send
*
- * @return the sent messges
+ * @return the sent messages
*
* @throws Exception
*/
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/BrokerHolder.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/BrokerHolder.java
index 8345803d56..66b3fe0c6a 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/BrokerHolder.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/BrokerHolder.java
@@ -22,5 +22,7 @@ package org.apache.qpid.test.utils;
public interface BrokerHolder
{
+ String getWorkingDirectory();
void shutdown();
+ void kill();
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
index 93e71a8cbe..f6c481431a 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
@@ -34,8 +34,6 @@ public class FailoverBaseCase extends QpidBrokerTestCase
public static final long DEFAULT_FAILOVER_TIME = 10000L;
- protected int failingPort;
-
protected void setUp() throws java.lang.Exception
{
super.setUp();
@@ -66,15 +64,6 @@ public class FailoverBaseCase extends QpidBrokerTestCase
return _connectionFactory;
}
- @Override
- public void stopBroker(int port) throws Exception
- {
- if (isBrokerPresent(port))
- {
- super.stopBroker(port);
- }
- }
-
public void tearDown() throws Exception
{
try
@@ -90,11 +79,11 @@ public class FailoverBaseCase extends QpidBrokerTestCase
}
}
-
public void failBroker(int port)
{
try
{
+ //TODO: use killBroker instead
stopBroker(port);
}
catch (Exception e)
@@ -102,6 +91,4 @@ public class FailoverBaseCase extends QpidBrokerTestCase
throw new RuntimeException(e);
}
}
-
-
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java
index d394430079..adda9ca3ec 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java
@@ -27,9 +27,11 @@ import org.apache.qpid.server.Broker;
public class InternalBrokerHolder implements BrokerHolder
{
private static final Logger LOGGER = Logger.getLogger(InternalBrokerHolder.class);
+
private final Broker _broker;
+ private final String _workingDirectory;
- public InternalBrokerHolder(final Broker broker)
+ public InternalBrokerHolder(final Broker broker, String workingDirectory)
{
if(broker == null)
{
@@ -37,6 +39,13 @@ public class InternalBrokerHolder implements BrokerHolder
}
_broker = broker;
+ _workingDirectory = workingDirectory;
+ }
+
+ @Override
+ public String getWorkingDirectory()
+ {
+ return _workingDirectory;
}
public void shutdown()
@@ -48,4 +57,12 @@ public class InternalBrokerHolder implements BrokerHolder
LOGGER.info("Broker instance shutdown");
}
+ @Override
+ public void kill()
+ {
+ // Can't kill a internal broker as we would also kill ourselves as we share the same JVM.
+ shutdown();
+ }
+
+
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/Piper.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/Piper.java
new file mode 100644
index 0000000000..9413e38606
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/Piper.java
@@ -0,0 +1,130 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.utils;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+
+public final class Piper extends Thread
+{
+ private static final Logger LOGGER = Logger.getLogger(Piper.class);
+
+ private final BufferedReader _in;
+ private final PrintStream _out;
+ private final String _ready;
+ private final CountDownLatch _latch;
+ private final String _stopped;
+ private final String _prefix;
+ private volatile boolean _seenReady;
+ private volatile String _stopLine;
+
+ public Piper(InputStream in, PrintStream out, String ready, String stopped)
+ {
+ this(in, out, ready, stopped, null);
+ }
+
+ public Piper(InputStream in, PrintStream out, String ready, String stopped, String prefix)
+ {
+ _in = new BufferedReader(new InputStreamReader(in));
+ _out = out;
+ _ready = ready;
+ _stopped = stopped;
+ _seenReady = false;
+ _prefix = prefix;
+
+ if (this._ready != null && !this._ready.equals(""))
+ {
+ this._latch = new CountDownLatch(1);
+ }
+ else
+ {
+ this._latch = null;
+ }
+ }
+
+ public boolean await(long timeout, TimeUnit unit) throws InterruptedException
+ {
+ if (_latch == null)
+ {
+ return true;
+ }
+ else
+ {
+ _latch.await(timeout, unit);
+ return _seenReady;
+ }
+ }
+
+ public void run()
+ {
+ try
+ {
+ String line;
+ while ((line = _in.readLine()) != null)
+ {
+ if (_prefix != null)
+ {
+ line = _prefix + line;
+ }
+ _out.println(line);
+
+ if (_latch != null && line.contains(_ready))
+ {
+ _seenReady = true;
+ _latch.countDown();
+ }
+
+ if (!_seenReady && line.contains(_stopped))
+ {
+ _stopLine = line;
+ }
+ }
+ }
+ catch (IOException e)
+ {
+ LOGGER.warn(e.getMessage() + " : Broker stream from unexpectedly closed; last log lines written by Broker may be lost.");
+ }
+ finally
+ {
+ if (_latch != null)
+ {
+ _latch.countDown();
+ }
+ }
+ }
+
+ public String getStopLine()
+ {
+ return _stopLine;
+ }
+
+ String getReady()
+ {
+ return _ready;
+ }
+} \ No newline at end of file
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
index 6311322522..32c6094adb 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
@@ -59,9 +59,6 @@ import javax.naming.NamingException;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.LineNumberReader;
import java.io.PrintStream;
import java.net.MalformedURLException;
import java.net.URL;
@@ -69,7 +66,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
@@ -126,7 +122,6 @@ public class QpidBrokerTestCase extends QpidTestCase
private static final String BROKER_LOG_PREFIX = "broker.log.prefix";
private static final String BROKER_PERSITENT = "broker.persistent";
private static final String BROKER_PROTOCOL_EXCLUDES = "broker.protocol.excludes";
-
// values
protected static final String JAVA = "java";
@@ -154,7 +149,7 @@ public class QpidBrokerTestCase extends QpidTestCase
protected File _outputFile;
- protected PrintStream _brokerOutputStream;
+ protected PrintStream _testcaseOutputStream;
protected Map<Integer, BrokerHolder> _brokers = new HashMap<Integer, BrokerHolder>();
@@ -195,10 +190,10 @@ public class QpidBrokerTestCase extends QpidTestCase
super();
}
- public Logger getLogger()
- {
- return QpidBrokerTestCase._logger;
- }
+ public Logger getLogger()
+ {
+ return QpidBrokerTestCase._logger;
+ }
public void runBare() throws Throwable
{
@@ -228,12 +223,12 @@ public class QpidBrokerTestCase extends QpidTestCase
if (_interleaveBrokerLog)
{
- _brokerOutputStream = out;
+ _testcaseOutputStream = out;
}
else
{
- _brokerOutputStream = new PrintStream(new FileOutputStream(String
- .format("%s/TEST-%s.broker.out", _output, qname)), true);
+ _testcaseOutputStream = new PrintStream(new FileOutputStream(String
+ .format("%s/TEST-%s.broker.out", _output, qname)), true);
}
}
@@ -278,7 +273,7 @@ public class QpidBrokerTestCase extends QpidTestCase
out.close();
if (!_interleaveBrokerLog)
{
- _brokerOutputStream.close();
+ _testcaseOutputStream.close();
}
}
}
@@ -307,108 +302,6 @@ public class QpidBrokerTestCase extends QpidTestCase
startBroker();
}
- private static final class Piper extends Thread
- {
-
- private LineNumberReader in;
- private PrintStream out;
- private String ready;
- private CountDownLatch latch;
- private boolean seenReady;
- private String stopped;
- private String stopLine;
-
- public Piper(InputStream in, PrintStream out, String ready)
- {
- this(in, out, ready, null);
- }
-
- public Piper(InputStream in, PrintStream out, String ready, String stopped)
- {
- this.in = new LineNumberReader(new InputStreamReader(in));
- this.out = out;
- this.ready = ready;
- this.stopped = stopped;
- this.seenReady = false;
-
- if (this.getReady() != null && !this.getReady().equals(""))
- {
- this.latch = new CountDownLatch(1);
- }
- else
- {
- this.latch = null;
- }
- }
-
- public Piper(InputStream in, PrintStream out)
- {
- this(in, out, null);
- }
-
- public boolean await(long timeout, TimeUnit unit) throws InterruptedException
- {
- if (latch == null)
- {
- return true;
- }
- else
- {
- latch.await(timeout, unit);
- return seenReady;
- }
- }
-
- public void run()
- {
- try
- {
- String line;
- while ((line = in.readLine()) != null)
- {
- if (_interleaveBrokerLog)
- {
- line = _brokerLogPrefix + line;
- }
- out.println(line);
-
- if (latch != null && line.contains(getReady()))
- {
- seenReady = true;
- latch.countDown();
- }
-
- if (!seenReady && line.contains(stopped))
- {
- stopLine = line;
- }
- }
- }
- catch (IOException e)
- {
- // this seems to happen regularly even when
- // exits are normal
- }
- finally
- {
- if (latch != null)
- {
- latch.countDown();
- }
- }
- }
-
- public String getStopLine()
- {
- return stopLine;
- }
-
- public String getReady()
- {
- return ready;
- }
- }
-
/**
* Return the management port in use by the broker on this main port
*
@@ -494,7 +387,7 @@ public class QpidBrokerTestCase extends QpidTestCase
_logger.info("starting internal broker (same JVM)");
broker.startup(options);
- _brokers.put(port, new InternalBrokerHolder(broker));
+ _brokers.put(port, new InternalBrokerHolder(broker, System.getProperty("QPID_WORK")));
}
else if (!_brokerType.equals(BrokerType.EXTERNAL))
{
@@ -568,12 +461,13 @@ public class QpidBrokerTestCase extends QpidTestCase
// cpp broker requires that the work directory is created
createBrokerWork(qpidWork);
- Process process = pb.start();;
+ Process process = pb.start();
Piper p = new Piper(process.getInputStream(),
- _brokerOutputStream,
+ _testcaseOutputStream,
System.getProperty(BROKER_READY),
- System.getProperty(BROKER_STOPPED));
+ System.getProperty(BROKER_STOPPED),
+ _interleaveBrokerLog ? _brokerLogPrefix : null);
p.start();
@@ -600,7 +494,7 @@ public class QpidBrokerTestCase extends QpidTestCase
// this is expect if the broker started successfully
}
- _brokers.put(port, new SpawnedBrokerHolder(process));
+ _brokers.put(port, new SpawnedBrokerHolder(process, qpidWork));
}
}
@@ -747,11 +641,31 @@ public class QpidBrokerTestCase extends QpidTestCase
public void stopBroker(int port) throws Exception
{
- port = getPort(port);
+ if (isBrokerPresent(port))
+ {
+ port = getPort(port);
- _logger.info("stopping broker on port : " + port);
- BrokerHolder broker = _brokers.remove(port);
- broker.shutdown();
+ _logger.info("stopping broker on port : " + port);
+ BrokerHolder broker = _brokers.remove(port);
+ broker.shutdown();
+ }
+ }
+
+ public void killBroker() throws Exception
+ {
+ killBroker(0);
+ }
+
+ public void killBroker(int port) throws Exception
+ {
+ if (isBrokerPresent(port))
+ {
+ port = getPort(port);
+
+ _logger.info("killing broker on port : " + port);
+ BrokerHolder broker = _brokers.remove(port);
+ broker.kill();
+ }
}
public boolean isBrokerPresent(int port) throws Exception
@@ -760,7 +674,13 @@ public class QpidBrokerTestCase extends QpidTestCase
return _brokers.containsKey(port);
}
-
+
+ public BrokerHolder getBroker(int port) throws Exception
+ {
+ port = getPort(port);
+ return _brokers.get(port);
+ }
+
/**
* Attempt to set the Java Broker to use the BDBMessageStore for persistence
* Falling back to the DerbyMessageStore if
@@ -989,7 +909,6 @@ public class QpidBrokerTestCase extends QpidTestCase
/**
* we assume that the environment is correctly set
* i.e. -Djava.naming.provider.url="..//example010.properties"
- * TODO should be a way of setting that through maven
*
* @return an initial context
*
@@ -1163,13 +1082,13 @@ public class QpidBrokerTestCase extends QpidTestCase
/**
* Send messages to the given destination.
*
- * If session is transacted then messages will be commited before returning
+ * If session is transacted then messages will be committed before returning
*
* @param session the session to use for sending
* @param destination where to send them to
* @param count no. of messages to send
*
- * @return the sent messges
+ * @return the sent messages
*
* @throws Exception
*/
@@ -1362,6 +1281,6 @@ public class QpidBrokerTestCase extends QpidTestCase
protected int getFailingPort()
{
- return FAILING_PORT;
+ return FAILING_PORT;
}
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/ReflectionUtils.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/ReflectionUtils.java
index 7946c6a6d1..83294c13ad 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/ReflectionUtils.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/ReflectionUtils.java
@@ -21,6 +21,7 @@
package org.apache.qpid.test.utils;
import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@@ -225,4 +226,34 @@ public class ReflectionUtils
throw new ReflectionUtilsException("NoSuchMethodException", e);
}
}
+
+ @SuppressWarnings("unchecked")
+ public static <T> T getDeclaredField(final Object obj, final String fieldName)
+ {
+ try
+ {
+ final Field field = obj.getClass().getDeclaredField(fieldName);
+ if (!field.isAccessible())
+ {
+ field.setAccessible(true);
+ }
+ return (T) field.get(obj);
+ }
+ catch (NoSuchFieldException e)
+ {
+ throw new ReflectionUtilsException("Unable to read field " + fieldName + "from object " + obj, e);
+ }
+ catch (SecurityException e)
+ {
+ throw new ReflectionUtilsException("Unable to read field " + fieldName + "from object " + obj, e);
+ }
+ catch (IllegalArgumentException e)
+ {
+ throw new ReflectionUtilsException("Unable to read field " + fieldName + "from object " + obj, e);
+ }
+ catch (IllegalAccessException e)
+ {
+ throw new ReflectionUtilsException("Unable to read field " + fieldName + "from object " + obj, e);
+ }
+ }
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/SpawnedBrokerHolder.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/SpawnedBrokerHolder.java
index 65239bbe02..50b1ea7cea 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/SpawnedBrokerHolder.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/SpawnedBrokerHolder.java
@@ -20,15 +20,20 @@
*/
package org.apache.qpid.test.utils;
+import java.io.IOException;
+
import org.apache.log4j.Logger;
public class SpawnedBrokerHolder implements BrokerHolder
{
private static final Logger LOGGER = Logger.getLogger(SpawnedBrokerHolder.class);
+ private final boolean _isWindows = String.valueOf(System.getProperty("os.name")).toLowerCase().contains("windows");
private final Process _process;
+ private final Integer _pid;
+ private final String _workingDirectory;
- public SpawnedBrokerHolder(final Process process)
+ public SpawnedBrokerHolder(final Process process, final String workingDirectory)
{
if(process == null)
{
@@ -36,14 +41,87 @@ public class SpawnedBrokerHolder implements BrokerHolder
}
_process = process;
+ _pid = retrieveUnixPidIfPossible();
+ _workingDirectory = workingDirectory;
+ }
+
+ @Override
+ public String getWorkingDirectory()
+ {
+ return _workingDirectory;
}
public void shutdown()
{
LOGGER.info("Destroying broker process");
-
_process.destroy();
+ reapChildProcess();
+ }
+
+ @Override
+ public void kill()
+ {
+ if (_pid == null)
+ {
+ LOGGER.info("Destroying broker process");
+ _process.destroy();
+ }
+ else
+ {
+ LOGGER.info("Killing broker process with PID " + _pid);
+ sendSigkillForImmediateShutdown(_pid);
+ }
+
+ reapChildProcess();
+ }
+
+ private void sendSigkillForImmediateShutdown(Integer pid)
+ {
+ boolean killSuccessful = false;
+ try
+ {
+ final Process killProcess = Runtime.getRuntime().exec("kill -KILL " + pid);
+ killProcess.waitFor();
+ killSuccessful = killProcess.exitValue() == 0;
+ }
+ catch (IOException e)
+ {
+ LOGGER.error("Error whilst killing process " + _pid, e);
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ finally
+ {
+ if (!killSuccessful)
+ {
+ _process.destroy();
+ }
+ }
+ }
+
+ private Integer retrieveUnixPidIfPossible()
+ {
+ if(!_isWindows)
+ {
+ try
+ {
+ Integer pid = ReflectionUtils.getDeclaredField(_process, "pid");
+ LOGGER.info("PID " + pid);
+ return pid;
+ }
+ catch (ReflectionUtilsException e)
+ {
+ LOGGER.warn("Could not get pid for process, Broker process shutdown will be ungraceful");
+ }
+ }
+ return null;
+ }
+
+ private void reapChildProcess()
+ {
try
{
_process.waitFor();
@@ -51,8 +129,21 @@ public class SpawnedBrokerHolder implements BrokerHolder
}
catch (InterruptedException e)
{
- LOGGER.error("Interrupted whilst waiting for process destruction");
+ LOGGER.error("Interrupted whilst waiting for process shutdown");
Thread.currentThread().interrupt();
}
+ finally
+ {
+ try
+ {
+ _process.getInputStream().close();
+ _process.getErrorStream().close();
+ _process.getOutputStream().close();
+ }
+ catch (IOException e)
+ {
+ }
+ }
}
+
}
diff --git a/qpid/java/test-profiles/JavaDerbyExcludes b/qpid/java/test-profiles/JavaDerbyExcludes
index 3caa360d48..931a0b0ddb 100644
--- a/qpid/java/test-profiles/JavaDerbyExcludes
+++ b/qpid/java/test-profiles/JavaDerbyExcludes
@@ -19,3 +19,4 @@
org.apache.qpid.server.store.berkeleydb.BDBMessageStoreTest#*
org.apache.qpid.server.store.berkeleydb.BDBUpgradeTest#*
+org.apache.qpid.server.store.berkeleydb.BDBBackupTest#*
diff --git a/qpid/java/test-profiles/JavaTransientExcludes b/qpid/java/test-profiles/JavaTransientExcludes
index 67190a6fcc..b4e583ba3a 100644
--- a/qpid/java/test-profiles/JavaTransientExcludes
+++ b/qpid/java/test-profiles/JavaTransientExcludes
@@ -34,3 +34,4 @@ org.apache.qpid.server.store.MessageStoreTest#testDurableExchangeRemoval
org.apache.qpid.server.store.berkeleydb.BDBMessageStoreTest#*
org.apache.qpid.server.store.berkeleydb.BDBUpgradeTest#*
+org.apache.qpid.server.store.berkeleydb.BDBBackupTest#*