diff options
Diffstat (limited to 'qpid/java')
15 files changed, 590 insertions, 411 deletions
diff --git a/qpid/java/bdbstore/bin/backup.sh b/qpid/java/bdbstore/bin/backup.sh index b58ab16282..75f9e1d968 100755 --- a/qpid/java/bdbstore/bin/backup.sh +++ b/qpid/java/bdbstore/bin/backup.sh @@ -33,7 +33,7 @@ if [ -z "$QPID_HOME" ]; then fi VERSION=0.15 -LIBS=$QPID_HOME/lib/opt/je-4.0.117.jar:$QPID_HOME/lib/qpid-bdbstore-$VERSION.jar:$QPID_HOME/lib/qpid-all.jar +LIBS=$QPID_HOME/lib/opt/je-5.0.34.jar:$QPID_HOME/lib/qpid-bdbstore-$VERSION.jar:$QPID_HOME/lib/qpid-all.jar echo "Starting Hot Backup Script" diff --git a/qpid/java/bdbstore/bin/storeUpgrade.sh b/qpid/java/bdbstore/bin/storeUpgrade.sh index ffb33f7fbd..dd53529a22 100755 --- a/qpid/java/bdbstore/bin/storeUpgrade.sh +++ b/qpid/java/bdbstore/bin/storeUpgrade.sh @@ -34,6 +34,6 @@ fi VERSION=0.15 -LIBS=$QPID_HOME/lib/opt/je-4.0.117.jar:$QPID_HOME/lib/qpid-bdbstore-$VERSION.jar:$QPID_HOME/lib/qpid-all.jar +LIBS=$QPID_HOME/lib/opt/je-5.0.34.jar:$QPID_HOME/lib/qpid-bdbstore-$VERSION.jar:$QPID_HOME/lib/qpid-all.jar java -Xms256m -Dlog4j.configuration=BDBStoreUpgrade.log4j.xml -Xmx256m -Damqj.logging.level=warn ${JAVA_OPTS} -cp $LIBS org.apache.qpid.server.store.berkeleydb.BDBStoreUpgrade ${ARGS} diff --git a/qpid/java/bdbstore/etc/scripts/bdbbackuptest.sh b/qpid/java/bdbstore/etc/scripts/bdbbackuptest.sh deleted file mode 100755 index 4224f98de2..0000000000 --- a/qpid/java/bdbstore/etc/scripts/bdbbackuptest.sh +++ /dev/null @@ -1,44 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -if [ -z "$QPID_HOME" ]; then - export QPID_HOME=$(dirname $(dirname $(readlink -f $0))) - export PATH=${PATH}:${QPID_HOME}/bin -fi - -# Parse arguements taking all - prefixed args as JAVA_OPTS -for arg in "$@"; do - if [[ $arg == -java:* ]]; then - JAVA_OPTS="${JAVA_OPTS}-`echo $arg|cut -d ':' -f 2` " - else - ARGS="${ARGS}$arg " - fi -done - -VERSION=0.15 - -# Set classpath to include Qpid jar with all required jars in manifest -QPID_LIBS=$QPID_HOME/lib/qpid-all.jar:$QPID_HOME/lib/qpid-junit-toolkit-$VERSION.jar:$QPID_HOME/lib/junit-3.8.1.jar:$QPID_HOME/lib/log4j-1.2.12.jar:$QPID_HOME/lib/qpid-systests-$VERSION.jar:$QPID_HOME/lib/qpid-perftests-$VERSION.jar:$QPID_HOME/lib/slf4j-log4j12-1.6.1.jar:$QPID_HOME/lib/qpid-bdbstore-$VERSION.jar - -# Set other variables used by the qpid-run script before calling -export JAVA=java JAVA_MEM=-Xmx256m QPID_CLASSPATH=$QPID_LIBS - -. qpid-run -Dlog4j.configuration=perftests.log4j -Dbadger.level=warn -Damqj.test.logging.level=warn -Damqj.logging.level=warn ${JAVA_OPTS} org.apache.qpid.server.store.berkeleydb.testclient.BackupTestClient -o $QPID_WORK/results numMessagesToAction=55 ${ARGS} - diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java new file mode 100644 index 0000000000..342c185b99 --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import org.apache.log4j.Logger; +import org.apache.qpid.test.utils.Piper; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.util.FileUtils; + +/** + * Tests the BDB backup script can successfully perform a backup and that + * backup can be restored and used by the Broker. + */ +public class BDBBackupTest extends QpidBrokerTestCase +{ + protected static final Logger LOGGER = Logger.getLogger(BDBBackupTest.class); + + private static final String BACKUP_SCRIPT = "/bin/backup.sh"; + private static final String BACKUP_COMPLETE_MESSAGE = "Hot Backup Completed"; + + private static final String TEST_VHOST = "test"; + private static final String SYSTEM_TMP_DIR = System.getProperty("java.io.tmpdir"); + + private File _backupToDir; + private File _backupFromDir; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + _backupToDir = new File(SYSTEM_TMP_DIR + File.separator + getTestName()); + _backupToDir.mkdirs(); + + final String qpidWork = getBroker(DEFAULT_PORT).getWorkingDirectory(); + + // It would be preferable to lookup the store path using #getConfigurationStringProperty("virtualhosts...") + // but the config as known to QBTC does not pull-in the virtualhost section from its separate source file + _backupFromDir = new File(qpidWork + "/bdbstore/" + TEST_VHOST + "-store"); + boolean fromDirExistsAndIsDir = _backupFromDir.isDirectory(); + assertTrue("backupFromDir " + _backupFromDir + " should already exist", fromDirExistsAndIsDir); + } + + @Override + protected void tearDown() throws Exception + { + try + { + super.tearDown(); + } + finally + { + FileUtils.delete(_backupToDir, true); + } + } + + public void testBackupAndRestoreMaintainsMessages() throws Exception + { + sendNumberedMessages(0, 10); + invokeBdbBackup(_backupFromDir, _backupToDir); + sendNumberedMessages(10, 20); + confirmBrokerHasMessages(0, 20); + stopBroker(); + + deleteStore(_backupFromDir); + replaceStoreWithBackup(_backupToDir, _backupFromDir); + + startBroker(); + confirmBrokerHasMessages(0, 10); + } + + private void sendNumberedMessages(final int startIndex, final int endIndex) throws JMSException, Exception + { + Connection con = getConnection(); + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(getTestQueueName()); + // Create queue by consumer side-effect + session.createConsumer(destination).close(); + + final int numOfMessages = endIndex - startIndex; + final int batchSize = 0; + sendMessage(session, destination, numOfMessages, startIndex, batchSize); + con.close(); + } + + private void confirmBrokerHasMessages(final int startIndex, final int endIndex) throws Exception + { + Connection con = getConnection(); + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + con.start(); + Destination destination = session.createQueue(getTestQueueName()); + MessageConsumer consumer = session.createConsumer(destination); + for (int i = startIndex; i < endIndex; i++) + { + Message msg = consumer.receive(RECEIVE_TIMEOUT); + assertNotNull("Message " + i + " not received", msg); + assertEquals("Did not receive the expected message", i, msg.getIntProperty(INDEX)); + } + + Message msg = consumer.receive(100); + if(msg != null) + { + fail("No more messages should be received, but received additional message with index: " + msg.getIntProperty(INDEX)); + } + con.close(); + } + + private void invokeBdbBackup(final File backupFromDir, final File backupToDir) throws Exception + { + if (String.valueOf(System.getProperty("os.name")).toLowerCase().contains("windows")) + { + BDBBackup.main(new String[]{"-todir", backupToDir.getAbsolutePath(), "-fromdir", backupFromDir.getAbsolutePath()}); + } + else + { + runBdbBackupScript(backupFromDir, backupToDir); + } + } + + private void runBdbBackupScript(final File backupFromDir, final File backupToDir) throws IOException, + InterruptedException + { + Process backupProcess = null; + try + { + String qpidHome = System.getProperty(QPID_HOME); + ProcessBuilder pb = new ProcessBuilder(qpidHome + BACKUP_SCRIPT, "-todir", backupToDir.getAbsolutePath(), "-fromdir", backupFromDir.getAbsolutePath()); + pb.redirectErrorStream(true); + Map<String, String> env = pb.environment(); + env.put(QPID_HOME, qpidHome); + + LOGGER.debug("Backup command is " + pb.command()); + backupProcess = pb.start(); + Piper piper = new Piper(backupProcess.getInputStream(), _testcaseOutputStream, null, BACKUP_COMPLETE_MESSAGE); + piper.start(); + piper.await(2, TimeUnit.SECONDS); + backupProcess.waitFor(); + piper.join(); + + LOGGER.debug("Backup command completed " + backupProcess.exitValue()); + assertEquals("Unexpected exit value from backup script", 0, backupProcess.exitValue()); + } + finally + { + if (backupProcess != null) + { + backupProcess.getErrorStream().close(); + backupProcess.getInputStream().close(); + backupProcess.getOutputStream().close(); + } + } + } + + private void replaceStoreWithBackup(File source, File dst) throws Exception + { + LOGGER.debug("Copying store " + source + " to " + dst); + FileUtils.copyRecursive(source, dst); + } + + private void deleteStore(File storeDir) + { + LOGGER.debug("Deleting store " + storeDir); + FileUtils.delete(storeDir, true); + } + +} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/testclient/BackupTestClient.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/testclient/BackupTestClient.java deleted file mode 100644 index f6344b3d7d..0000000000 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/testclient/BackupTestClient.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.testclient; - -import org.apache.log4j.Logger; - -import org.apache.qpid.ping.PingDurableClient; -import org.apache.qpid.server.store.berkeleydb.BDBBackup; -import org.apache.qpid.util.CommandLineParser; - -import java.util.Properties; - -/** - * BackupTestClient extends {@link PingDurableClient} with an action that takes a BDB backup when a configurable - * message count is reached. This enables a test user to restore this beackup, knowing how many committed but undelivered - * messages were in the backup, in order to check that all are re-delivered when the backup is retored. - * - * <p><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Perform BDB Backup on configurable message count. - * </table> - */ -public class BackupTestClient extends PingDurableClient -{ - /** Used for debugging. */ - private static final Logger log = Logger.getLogger(BackupTestClient.class); - - /** Holds the from directory to take backups from. */ - private String fromDir; - - /** Holds the to directory to store backups in. */ - private String toDir; - - /** - * Default constructor, passes all property overrides to the parent. - * - * @param overrides Any property overrides to apply to the defaults. - * - * @throws Exception Any underlying exception is allowed to fall through. - */ - BackupTestClient(Properties overrides) throws Exception - { - super(overrides); - } - - /** - * Starts the ping/wait/receive process. From and to directory locations for the BDB backups must be specified - * on the command line: - * - * <p/><table><caption>Command Line</caption> - * <tr><th> Option <th> Comment - * <tr><td> -fromdir <td> The path to the directory to back the bdb log file from. - * <tr><td> -todir <td> The path to the directory to save the backed up bdb log files to. - * </table> - * - * @param args The command line arguments. - */ - public static void main(String[] args) - { - try - { - // Use the same command line format as BDBBackup utility, (compulsory from and to directories). - Properties options = - CommandLineParser.processCommandLine(args, new CommandLineParser(BDBBackup.COMMAND_LINE_SPEC), - System.getProperties()); - BackupTestClient pingProducer = new BackupTestClient(options); - - // Keep the from and to directories for backups. - pingProducer.fromDir = options.getProperty("fromdir"); - pingProducer.toDir = options.getProperty("todir"); - - // Create a shutdown hook to terminate the ping-pong producer. - Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook()); - - // Ensure that the ping pong producer is registered to listen for exceptions on the connection too. - // pingProducer.getConnection().setExceptionListener(pingProducer); - - // Run the test procedure. - int sent = pingProducer.send(); - pingProducer.waitForUser("Press return to begin receiving the pings."); - pingProducer.receive(sent); - - System.exit(0); - } - catch (Exception e) - { - System.err.println(e.getMessage()); - log.error("Top level handler caught execption.", e); - System.exit(1); - } - } - - /** - * Supplies a triggered action extension, based on message count. This action takes a BDB log file backup. - */ - public void takeAction() - { - BDBBackup backupUtil = new BDBBackup(); - backupUtil.takeBackupNoLock(fromDir, toDir); - System.out.println("Took backup of BDB log files from directory: " + fromDir); - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/PersistentStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/PersistentStoreTest.java index 3eac80f175..555c4dd20d 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/PersistentStoreTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/PersistentStoreTest.java @@ -26,146 +26,117 @@ import org.apache.qpid.test.utils.QpidBrokerTestCase; import javax.jms.Connection; import javax.jms.Destination; -import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; -import javax.jms.Queue; import javax.jms.Session; import java.util.ArrayList; import java.util.List; public class PersistentStoreTest extends QpidBrokerTestCase { - private static final int NUM_MESSAGES = 100; private Connection _con; private Session _session; - private Queue _destination; - private MessageConsumer _consumer; + private Destination _destination; - public void setUp() throws Exception, JMSException + public void setUp() throws Exception { super.setUp(); _con = getConnection(); - _con.start(); - _session = _con.createSession(true, Session.SESSION_TRANSACTED); - _destination = _session.createQueue(getTestQueueName()); - _consumer = _session.createConsumer(_destination); - _consumer.close(); + } - sendMessage(_session, _destination, NUM_MESSAGES); - _session.commit(); + public void testCommittedMessagesSurviveBrokerNormalShutdown() throws Exception + { + sendAndCommitMessages(); + stopBroker(); + startBroker(); + confirmBrokerStillHasCommittedMessages(); } - /** Checks that a new consumer on a new connection can get NUM_MESSAGES from _destination */ - private void checkMessages() throws Exception, JMSException + public void testCommittedMessagesSurviveBrokerAbnormalShutdown() throws Exception { - _con = getConnection(); - _session = _con.createSession(false, Session.AUTO_ACKNOWLEDGE); - _con.start(); - _consumer = _session.createConsumer(_destination); - for (int i = 1; i <= NUM_MESSAGES; i++) - { - Message msg = _consumer.receive(RECEIVE_TIMEOUT); - assertNotNull("Message " + i + " not received", msg); - assertEquals("Did not receive the expected message", i, msg.getIntProperty(INDEX)); - } - - Message msg = _consumer.receive(100); - if(msg != null) + if (isInternalBroker()) { - fail("No more messages should be received, but received additional message with index: " + msg.getIntProperty(INDEX)); + return; } - } -// /** -// * starts the server, sends 100 messages, restarts the server and gets 100 messages back -// * the test formerly referred to as BDB-Qpid-1 -// * @throws Exception -// */ -// public void testStartStop() throws Exception -// { -// restartBroker(); -- Not Currently a gracefull restart so not BDB-Qpid-1 -// checkMessages(); -// } + sendAndCommitMessages(); + killBroker(); + startBroker(); + confirmBrokerStillHasCommittedMessages(); + } - /** - * starts the server, sends 100 messages, nukes then starts the server and gets 100 messages back - * the test formerly referred to as BDB-Qpid-2 - * - * @throws Exception - */ - public void testForcibleStartStop() throws Exception + public void testCommittedMessagesSurviveBrokerNormalShutdownMidTransaction() throws Exception { - restartBroker(); - checkMessages(); + sendAndCommitMessages(); + sendMoreMessagesWithoutCommitting(); + stopBroker(); + startBroker(); + confirmBrokerStillHasCommittedMessages(); } -// /** -// * starts the server, sends 100 committed messages, 5 uncommited ones, -// * restarts the server and gets 100 messages back -// * the test formerly referred to as BDB-Qpid-5 -// * @throws Exception -// */ -// public void testStartStopMidTransaction() throws Exception -// { -// sendMessage(_session, _destination, 5); -// restartBroker(); -- Not Currently a gracefull restart so not BDB-Qpid-1 -// checkMessages(); -// } + public void testCommittedMessagesSurviveBrokerAbnormalShutdownMidTransaction() throws Exception + { + if (isInternalBroker()) + { + return; + } + sendAndCommitMessages(); + sendMoreMessagesWithoutCommitting(); + killBroker(); + startBroker(); + confirmBrokerStillHasCommittedMessages(); + } - /** - * starts the server, sends 100 committed messages, 5 uncommited ones, - * nukes and starts the server and gets 100 messages back - * the test formerly referred to as BDB-Qpid-6 - * - * @throws Exception - */ - public void testForcibleStartStopMidTransaction() throws Exception + private void sendAndCommitMessages() throws Exception { - sendMessage(_session, _destination, 5); - //sync to ensure that the above messages have reached the broker - ((AMQSession) _session).sync(); - restartBroker(); - checkMessages(); + _session = _con.createSession(true, Session.SESSION_TRANSACTED); + _destination = _session.createQueue(getTestQueueName()); + // Create queue by consumer side-effect + _session.createConsumer(_destination).close(); + + sendMessage(_session, _destination, NUM_MESSAGES); + _session.commit(); } - /** - * starts the server, sends 100 committed messages, 5 uncommited ones, - * restarts the client and gets 100 messages back. - * the test formerly referred to as BDB-Qpid-7 - * - * FIXME: is this a PersistentStoreTest? Seems more like a transaction test to me.. aidan - * - * @throws Exception - */ - public void testClientDeathMidTransaction() throws Exception + private void sendMoreMessagesWithoutCommitting() throws Exception { sendMessage(_session, _destination, 5); - _con.close(); - checkMessages(); + // sync to ensure that messages have reached the broker + ((AMQSession<?,?>) _session).sync(); } -// /** -// * starts the server, sends 50 committed messages, copies $QPID_WORK to a new location, -// * sends 10 messages, stops the server, nukes the store, restores the copy, starts the server -// * checks that we get the first 50 back. -// */ -// public void testHotBackup() -// { -// -- removing as this will leave 100msgs on a queue -// } + private void confirmBrokerStillHasCommittedMessages() throws Exception + { + Connection con = getConnection(); + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + con.start(); + Destination destination = session.createQueue(getTestQueueName()); + MessageConsumer consumer = session.createConsumer(destination); + for (int i = 1; i <= NUM_MESSAGES; i++) + { + Message msg = consumer.receive(RECEIVE_TIMEOUT); + assertNotNull("Message " + i + " not received", msg); + assertEquals("Did not receive the expected message", i, msg.getIntProperty(INDEX)); + } + + Message msg = consumer.receive(100); + if(msg != null) + { + fail("No more messages should be received, but received additional message with index: " + msg.getIntProperty(INDEX)); + } + } /** - * This test requires that we can send messages without commiting. + * This test requires that we can send messages without committing. * QTC always commits the messages sent via sendMessages. * * @param session the session to use for sending * @param destination where to send them to * @param count no. of messages to send * - * @return the sent messges + * @return the sent messages * * @throws Exception */ diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/BrokerHolder.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/BrokerHolder.java index 8345803d56..66b3fe0c6a 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/BrokerHolder.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/BrokerHolder.java @@ -22,5 +22,7 @@ package org.apache.qpid.test.utils; public interface BrokerHolder { + String getWorkingDirectory(); void shutdown(); + void kill(); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java index 93e71a8cbe..f6c481431a 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java @@ -34,8 +34,6 @@ public class FailoverBaseCase extends QpidBrokerTestCase public static final long DEFAULT_FAILOVER_TIME = 10000L; - protected int failingPort; - protected void setUp() throws java.lang.Exception { super.setUp(); @@ -66,15 +64,6 @@ public class FailoverBaseCase extends QpidBrokerTestCase return _connectionFactory; } - @Override - public void stopBroker(int port) throws Exception - { - if (isBrokerPresent(port)) - { - super.stopBroker(port); - } - } - public void tearDown() throws Exception { try @@ -90,11 +79,11 @@ public class FailoverBaseCase extends QpidBrokerTestCase } } - public void failBroker(int port) { try { + //TODO: use killBroker instead stopBroker(port); } catch (Exception e) @@ -102,6 +91,4 @@ public class FailoverBaseCase extends QpidBrokerTestCase throw new RuntimeException(e); } } - - } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java index d394430079..adda9ca3ec 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java @@ -27,9 +27,11 @@ import org.apache.qpid.server.Broker; public class InternalBrokerHolder implements BrokerHolder { private static final Logger LOGGER = Logger.getLogger(InternalBrokerHolder.class); + private final Broker _broker; + private final String _workingDirectory; - public InternalBrokerHolder(final Broker broker) + public InternalBrokerHolder(final Broker broker, String workingDirectory) { if(broker == null) { @@ -37,6 +39,13 @@ public class InternalBrokerHolder implements BrokerHolder } _broker = broker; + _workingDirectory = workingDirectory; + } + + @Override + public String getWorkingDirectory() + { + return _workingDirectory; } public void shutdown() @@ -48,4 +57,12 @@ public class InternalBrokerHolder implements BrokerHolder LOGGER.info("Broker instance shutdown"); } + @Override + public void kill() + { + // Can't kill a internal broker as we would also kill ourselves as we share the same JVM. + shutdown(); + } + + } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/Piper.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/Piper.java new file mode 100644 index 0000000000..9413e38606 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/Piper.java @@ -0,0 +1,130 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.utils; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.PrintStream; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.log4j.Logger; + +public final class Piper extends Thread +{ + private static final Logger LOGGER = Logger.getLogger(Piper.class); + + private final BufferedReader _in; + private final PrintStream _out; + private final String _ready; + private final CountDownLatch _latch; + private final String _stopped; + private final String _prefix; + private volatile boolean _seenReady; + private volatile String _stopLine; + + public Piper(InputStream in, PrintStream out, String ready, String stopped) + { + this(in, out, ready, stopped, null); + } + + public Piper(InputStream in, PrintStream out, String ready, String stopped, String prefix) + { + _in = new BufferedReader(new InputStreamReader(in)); + _out = out; + _ready = ready; + _stopped = stopped; + _seenReady = false; + _prefix = prefix; + + if (this._ready != null && !this._ready.equals("")) + { + this._latch = new CountDownLatch(1); + } + else + { + this._latch = null; + } + } + + public boolean await(long timeout, TimeUnit unit) throws InterruptedException + { + if (_latch == null) + { + return true; + } + else + { + _latch.await(timeout, unit); + return _seenReady; + } + } + + public void run() + { + try + { + String line; + while ((line = _in.readLine()) != null) + { + if (_prefix != null) + { + line = _prefix + line; + } + _out.println(line); + + if (_latch != null && line.contains(_ready)) + { + _seenReady = true; + _latch.countDown(); + } + + if (!_seenReady && line.contains(_stopped)) + { + _stopLine = line; + } + } + } + catch (IOException e) + { + LOGGER.warn(e.getMessage() + " : Broker stream from unexpectedly closed; last log lines written by Broker may be lost."); + } + finally + { + if (_latch != null) + { + _latch.countDown(); + } + } + } + + public String getStopLine() + { + return _stopLine; + } + + String getReady() + { + return _ready; + } +}
\ No newline at end of file diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java index 6311322522..32c6094adb 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java @@ -59,9 +59,6 @@ import javax.naming.NamingException; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.LineNumberReader; import java.io.PrintStream; import java.net.MalformedURLException; import java.net.URL; @@ -69,7 +66,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; /** @@ -126,7 +122,6 @@ public class QpidBrokerTestCase extends QpidTestCase private static final String BROKER_LOG_PREFIX = "broker.log.prefix"; private static final String BROKER_PERSITENT = "broker.persistent"; private static final String BROKER_PROTOCOL_EXCLUDES = "broker.protocol.excludes"; - // values protected static final String JAVA = "java"; @@ -154,7 +149,7 @@ public class QpidBrokerTestCase extends QpidTestCase protected File _outputFile; - protected PrintStream _brokerOutputStream; + protected PrintStream _testcaseOutputStream; protected Map<Integer, BrokerHolder> _brokers = new HashMap<Integer, BrokerHolder>(); @@ -195,10 +190,10 @@ public class QpidBrokerTestCase extends QpidTestCase super(); } - public Logger getLogger() - { - return QpidBrokerTestCase._logger; - } + public Logger getLogger() + { + return QpidBrokerTestCase._logger; + } public void runBare() throws Throwable { @@ -228,12 +223,12 @@ public class QpidBrokerTestCase extends QpidTestCase if (_interleaveBrokerLog) { - _brokerOutputStream = out; + _testcaseOutputStream = out; } else { - _brokerOutputStream = new PrintStream(new FileOutputStream(String - .format("%s/TEST-%s.broker.out", _output, qname)), true); + _testcaseOutputStream = new PrintStream(new FileOutputStream(String + .format("%s/TEST-%s.broker.out", _output, qname)), true); } } @@ -278,7 +273,7 @@ public class QpidBrokerTestCase extends QpidTestCase out.close(); if (!_interleaveBrokerLog) { - _brokerOutputStream.close(); + _testcaseOutputStream.close(); } } } @@ -307,108 +302,6 @@ public class QpidBrokerTestCase extends QpidTestCase startBroker(); } - private static final class Piper extends Thread - { - - private LineNumberReader in; - private PrintStream out; - private String ready; - private CountDownLatch latch; - private boolean seenReady; - private String stopped; - private String stopLine; - - public Piper(InputStream in, PrintStream out, String ready) - { - this(in, out, ready, null); - } - - public Piper(InputStream in, PrintStream out, String ready, String stopped) - { - this.in = new LineNumberReader(new InputStreamReader(in)); - this.out = out; - this.ready = ready; - this.stopped = stopped; - this.seenReady = false; - - if (this.getReady() != null && !this.getReady().equals("")) - { - this.latch = new CountDownLatch(1); - } - else - { - this.latch = null; - } - } - - public Piper(InputStream in, PrintStream out) - { - this(in, out, null); - } - - public boolean await(long timeout, TimeUnit unit) throws InterruptedException - { - if (latch == null) - { - return true; - } - else - { - latch.await(timeout, unit); - return seenReady; - } - } - - public void run() - { - try - { - String line; - while ((line = in.readLine()) != null) - { - if (_interleaveBrokerLog) - { - line = _brokerLogPrefix + line; - } - out.println(line); - - if (latch != null && line.contains(getReady())) - { - seenReady = true; - latch.countDown(); - } - - if (!seenReady && line.contains(stopped)) - { - stopLine = line; - } - } - } - catch (IOException e) - { - // this seems to happen regularly even when - // exits are normal - } - finally - { - if (latch != null) - { - latch.countDown(); - } - } - } - - public String getStopLine() - { - return stopLine; - } - - public String getReady() - { - return ready; - } - } - /** * Return the management port in use by the broker on this main port * @@ -494,7 +387,7 @@ public class QpidBrokerTestCase extends QpidTestCase _logger.info("starting internal broker (same JVM)"); broker.startup(options); - _brokers.put(port, new InternalBrokerHolder(broker)); + _brokers.put(port, new InternalBrokerHolder(broker, System.getProperty("QPID_WORK"))); } else if (!_brokerType.equals(BrokerType.EXTERNAL)) { @@ -568,12 +461,13 @@ public class QpidBrokerTestCase extends QpidTestCase // cpp broker requires that the work directory is created createBrokerWork(qpidWork); - Process process = pb.start();; + Process process = pb.start(); Piper p = new Piper(process.getInputStream(), - _brokerOutputStream, + _testcaseOutputStream, System.getProperty(BROKER_READY), - System.getProperty(BROKER_STOPPED)); + System.getProperty(BROKER_STOPPED), + _interleaveBrokerLog ? _brokerLogPrefix : null); p.start(); @@ -600,7 +494,7 @@ public class QpidBrokerTestCase extends QpidTestCase // this is expect if the broker started successfully } - _brokers.put(port, new SpawnedBrokerHolder(process)); + _brokers.put(port, new SpawnedBrokerHolder(process, qpidWork)); } } @@ -747,11 +641,31 @@ public class QpidBrokerTestCase extends QpidTestCase public void stopBroker(int port) throws Exception { - port = getPort(port); + if (isBrokerPresent(port)) + { + port = getPort(port); - _logger.info("stopping broker on port : " + port); - BrokerHolder broker = _brokers.remove(port); - broker.shutdown(); + _logger.info("stopping broker on port : " + port); + BrokerHolder broker = _brokers.remove(port); + broker.shutdown(); + } + } + + public void killBroker() throws Exception + { + killBroker(0); + } + + public void killBroker(int port) throws Exception + { + if (isBrokerPresent(port)) + { + port = getPort(port); + + _logger.info("killing broker on port : " + port); + BrokerHolder broker = _brokers.remove(port); + broker.kill(); + } } public boolean isBrokerPresent(int port) throws Exception @@ -760,7 +674,13 @@ public class QpidBrokerTestCase extends QpidTestCase return _brokers.containsKey(port); } - + + public BrokerHolder getBroker(int port) throws Exception + { + port = getPort(port); + return _brokers.get(port); + } + /** * Attempt to set the Java Broker to use the BDBMessageStore for persistence * Falling back to the DerbyMessageStore if @@ -989,7 +909,6 @@ public class QpidBrokerTestCase extends QpidTestCase /** * we assume that the environment is correctly set * i.e. -Djava.naming.provider.url="..//example010.properties" - * TODO should be a way of setting that through maven * * @return an initial context * @@ -1163,13 +1082,13 @@ public class QpidBrokerTestCase extends QpidTestCase /** * Send messages to the given destination. * - * If session is transacted then messages will be commited before returning + * If session is transacted then messages will be committed before returning * * @param session the session to use for sending * @param destination where to send them to * @param count no. of messages to send * - * @return the sent messges + * @return the sent messages * * @throws Exception */ @@ -1362,6 +1281,6 @@ public class QpidBrokerTestCase extends QpidTestCase protected int getFailingPort() { - return FAILING_PORT; + return FAILING_PORT; } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/ReflectionUtils.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/ReflectionUtils.java index 7946c6a6d1..83294c13ad 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/ReflectionUtils.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/ReflectionUtils.java @@ -21,6 +21,7 @@ package org.apache.qpid.test.utils; import java.lang.reflect.Constructor; +import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -225,4 +226,34 @@ public class ReflectionUtils throw new ReflectionUtilsException("NoSuchMethodException", e); } } + + @SuppressWarnings("unchecked") + public static <T> T getDeclaredField(final Object obj, final String fieldName) + { + try + { + final Field field = obj.getClass().getDeclaredField(fieldName); + if (!field.isAccessible()) + { + field.setAccessible(true); + } + return (T) field.get(obj); + } + catch (NoSuchFieldException e) + { + throw new ReflectionUtilsException("Unable to read field " + fieldName + "from object " + obj, e); + } + catch (SecurityException e) + { + throw new ReflectionUtilsException("Unable to read field " + fieldName + "from object " + obj, e); + } + catch (IllegalArgumentException e) + { + throw new ReflectionUtilsException("Unable to read field " + fieldName + "from object " + obj, e); + } + catch (IllegalAccessException e) + { + throw new ReflectionUtilsException("Unable to read field " + fieldName + "from object " + obj, e); + } + } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/SpawnedBrokerHolder.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/SpawnedBrokerHolder.java index 65239bbe02..50b1ea7cea 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/SpawnedBrokerHolder.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/SpawnedBrokerHolder.java @@ -20,15 +20,20 @@ */ package org.apache.qpid.test.utils; +import java.io.IOException; + import org.apache.log4j.Logger; public class SpawnedBrokerHolder implements BrokerHolder { private static final Logger LOGGER = Logger.getLogger(SpawnedBrokerHolder.class); + private final boolean _isWindows = String.valueOf(System.getProperty("os.name")).toLowerCase().contains("windows"); private final Process _process; + private final Integer _pid; + private final String _workingDirectory; - public SpawnedBrokerHolder(final Process process) + public SpawnedBrokerHolder(final Process process, final String workingDirectory) { if(process == null) { @@ -36,14 +41,87 @@ public class SpawnedBrokerHolder implements BrokerHolder } _process = process; + _pid = retrieveUnixPidIfPossible(); + _workingDirectory = workingDirectory; + } + + @Override + public String getWorkingDirectory() + { + return _workingDirectory; } public void shutdown() { LOGGER.info("Destroying broker process"); - _process.destroy(); + reapChildProcess(); + } + + @Override + public void kill() + { + if (_pid == null) + { + LOGGER.info("Destroying broker process"); + _process.destroy(); + } + else + { + LOGGER.info("Killing broker process with PID " + _pid); + sendSigkillForImmediateShutdown(_pid); + } + + reapChildProcess(); + } + + private void sendSigkillForImmediateShutdown(Integer pid) + { + boolean killSuccessful = false; + try + { + final Process killProcess = Runtime.getRuntime().exec("kill -KILL " + pid); + killProcess.waitFor(); + killSuccessful = killProcess.exitValue() == 0; + } + catch (IOException e) + { + LOGGER.error("Error whilst killing process " + _pid, e); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + } + finally + { + if (!killSuccessful) + { + _process.destroy(); + } + } + } + + private Integer retrieveUnixPidIfPossible() + { + if(!_isWindows) + { + try + { + Integer pid = ReflectionUtils.getDeclaredField(_process, "pid"); + LOGGER.info("PID " + pid); + return pid; + } + catch (ReflectionUtilsException e) + { + LOGGER.warn("Could not get pid for process, Broker process shutdown will be ungraceful"); + } + } + return null; + } + + private void reapChildProcess() + { try { _process.waitFor(); @@ -51,8 +129,21 @@ public class SpawnedBrokerHolder implements BrokerHolder } catch (InterruptedException e) { - LOGGER.error("Interrupted whilst waiting for process destruction"); + LOGGER.error("Interrupted whilst waiting for process shutdown"); Thread.currentThread().interrupt(); } + finally + { + try + { + _process.getInputStream().close(); + _process.getErrorStream().close(); + _process.getOutputStream().close(); + } + catch (IOException e) + { + } + } } + } diff --git a/qpid/java/test-profiles/JavaDerbyExcludes b/qpid/java/test-profiles/JavaDerbyExcludes index 3caa360d48..931a0b0ddb 100644 --- a/qpid/java/test-profiles/JavaDerbyExcludes +++ b/qpid/java/test-profiles/JavaDerbyExcludes @@ -19,3 +19,4 @@ org.apache.qpid.server.store.berkeleydb.BDBMessageStoreTest#* org.apache.qpid.server.store.berkeleydb.BDBUpgradeTest#* +org.apache.qpid.server.store.berkeleydb.BDBBackupTest#* diff --git a/qpid/java/test-profiles/JavaTransientExcludes b/qpid/java/test-profiles/JavaTransientExcludes index 67190a6fcc..b4e583ba3a 100644 --- a/qpid/java/test-profiles/JavaTransientExcludes +++ b/qpid/java/test-profiles/JavaTransientExcludes @@ -34,3 +34,4 @@ org.apache.qpid.server.store.MessageStoreTest#testDurableExchangeRemoval org.apache.qpid.server.store.berkeleydb.BDBMessageStoreTest#* org.apache.qpid.server.store.berkeleydb.BDBUpgradeTest#* +org.apache.qpid.server.store.berkeleydb.BDBBackupTest#* |
