diff options
| author | Keith Wall <kwall@apache.org> | 2012-01-28 00:05:24 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2012-01-28 00:05:24 +0000 |
| commit | 8d3200e6c7808b7b9798f278e4682b9dca58d5cc (patch) | |
| tree | 52f46fe725520bf77a342adaa63d4053a07a2dd6 /java | |
| parent | 4dd9cbaf7fdc498a4eb5f2652d88afd20fe5d530 (diff) | |
| download | qpid-python-8d3200e6c7808b7b9798f278e4682b9dca58d5cc.tar.gz | |
QPID-3775: Automate the manual persistent store tests.
Add new testcase BDBBackupTest to test the operation of the BDB store backup mechanism (backup.sh).
Changed test case PersistentStoreTest to implement the manually scripted BDB tests.
This required changes to QBTC and BrokerHolder to allow an external Brokers to be forcibly kill'd with -9.
Remove script and class for manual tests.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1236931 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
15 files changed, 590 insertions, 411 deletions
diff --git a/java/bdbstore/bin/backup.sh b/java/bdbstore/bin/backup.sh index b58ab16282..75f9e1d968 100755 --- a/java/bdbstore/bin/backup.sh +++ b/java/bdbstore/bin/backup.sh @@ -33,7 +33,7 @@ if [ -z "$QPID_HOME" ]; then fi VERSION=0.15 -LIBS=$QPID_HOME/lib/opt/je-4.0.117.jar:$QPID_HOME/lib/qpid-bdbstore-$VERSION.jar:$QPID_HOME/lib/qpid-all.jar +LIBS=$QPID_HOME/lib/opt/je-5.0.34.jar:$QPID_HOME/lib/qpid-bdbstore-$VERSION.jar:$QPID_HOME/lib/qpid-all.jar echo "Starting Hot Backup Script" diff --git a/java/bdbstore/bin/storeUpgrade.sh b/java/bdbstore/bin/storeUpgrade.sh index ffb33f7fbd..dd53529a22 100755 --- a/java/bdbstore/bin/storeUpgrade.sh +++ b/java/bdbstore/bin/storeUpgrade.sh @@ -34,6 +34,6 @@ fi VERSION=0.15 -LIBS=$QPID_HOME/lib/opt/je-4.0.117.jar:$QPID_HOME/lib/qpid-bdbstore-$VERSION.jar:$QPID_HOME/lib/qpid-all.jar +LIBS=$QPID_HOME/lib/opt/je-5.0.34.jar:$QPID_HOME/lib/qpid-bdbstore-$VERSION.jar:$QPID_HOME/lib/qpid-all.jar java -Xms256m -Dlog4j.configuration=BDBStoreUpgrade.log4j.xml -Xmx256m -Damqj.logging.level=warn ${JAVA_OPTS} -cp $LIBS org.apache.qpid.server.store.berkeleydb.BDBStoreUpgrade ${ARGS} diff --git a/java/bdbstore/etc/scripts/bdbbackuptest.sh b/java/bdbstore/etc/scripts/bdbbackuptest.sh deleted file mode 100755 index 4224f98de2..0000000000 --- a/java/bdbstore/etc/scripts/bdbbackuptest.sh +++ /dev/null @@ -1,44 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -if [ -z "$QPID_HOME" ]; then - export QPID_HOME=$(dirname $(dirname $(readlink -f $0))) - export PATH=${PATH}:${QPID_HOME}/bin -fi - -# Parse arguements taking all - prefixed args as JAVA_OPTS -for arg in "$@"; do - if [[ $arg == -java:* ]]; then - JAVA_OPTS="${JAVA_OPTS}-`echo $arg|cut -d ':' -f 2` " - else - ARGS="${ARGS}$arg " - fi -done - -VERSION=0.15 - -# Set classpath to include Qpid jar with all required jars in manifest -QPID_LIBS=$QPID_HOME/lib/qpid-all.jar:$QPID_HOME/lib/qpid-junit-toolkit-$VERSION.jar:$QPID_HOME/lib/junit-3.8.1.jar:$QPID_HOME/lib/log4j-1.2.12.jar:$QPID_HOME/lib/qpid-systests-$VERSION.jar:$QPID_HOME/lib/qpid-perftests-$VERSION.jar:$QPID_HOME/lib/slf4j-log4j12-1.6.1.jar:$QPID_HOME/lib/qpid-bdbstore-$VERSION.jar - -# Set other variables used by the qpid-run script before calling -export JAVA=java JAVA_MEM=-Xmx256m QPID_CLASSPATH=$QPID_LIBS - -. qpid-run -Dlog4j.configuration=perftests.log4j -Dbadger.level=warn -Damqj.test.logging.level=warn -Damqj.logging.level=warn ${JAVA_OPTS} org.apache.qpid.server.store.berkeleydb.testclient.BackupTestClient -o $QPID_WORK/results numMessagesToAction=55 ${ARGS} - diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java new file mode 100644 index 0000000000..342c185b99 --- /dev/null +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import org.apache.log4j.Logger; +import org.apache.qpid.test.utils.Piper; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.util.FileUtils; + +/** + * Tests the BDB backup script can successfully perform a backup and that + * backup can be restored and used by the Broker. + */ +public class BDBBackupTest extends QpidBrokerTestCase +{ + protected static final Logger LOGGER = Logger.getLogger(BDBBackupTest.class); + + private static final String BACKUP_SCRIPT = "/bin/backup.sh"; + private static final String BACKUP_COMPLETE_MESSAGE = "Hot Backup Completed"; + + private static final String TEST_VHOST = "test"; + private static final String SYSTEM_TMP_DIR = System.getProperty("java.io.tmpdir"); + + private File _backupToDir; + private File _backupFromDir; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + _backupToDir = new File(SYSTEM_TMP_DIR + File.separator + getTestName()); + _backupToDir.mkdirs(); + + final String qpidWork = getBroker(DEFAULT_PORT).getWorkingDirectory(); + + // It would be preferable to lookup the store path using #getConfigurationStringProperty("virtualhosts...") + // but the config as known to QBTC does not pull-in the virtualhost section from its separate source file + _backupFromDir = new File(qpidWork + "/bdbstore/" + TEST_VHOST + "-store"); + boolean fromDirExistsAndIsDir = _backupFromDir.isDirectory(); + assertTrue("backupFromDir " + _backupFromDir + " should already exist", fromDirExistsAndIsDir); + } + + @Override + protected void tearDown() throws Exception + { + try + { + super.tearDown(); + } + finally + { + FileUtils.delete(_backupToDir, true); + } + } + + public void testBackupAndRestoreMaintainsMessages() throws Exception + { + sendNumberedMessages(0, 10); + invokeBdbBackup(_backupFromDir, _backupToDir); + sendNumberedMessages(10, 20); + confirmBrokerHasMessages(0, 20); + stopBroker(); + + deleteStore(_backupFromDir); + replaceStoreWithBackup(_backupToDir, _backupFromDir); + + startBroker(); + confirmBrokerHasMessages(0, 10); + } + + private void sendNumberedMessages(final int startIndex, final int endIndex) throws JMSException, Exception + { + Connection con = getConnection(); + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(getTestQueueName()); + // Create queue by consumer side-effect + session.createConsumer(destination).close(); + + final int numOfMessages = endIndex - startIndex; + final int batchSize = 0; + sendMessage(session, destination, numOfMessages, startIndex, batchSize); + con.close(); + } + + private void confirmBrokerHasMessages(final int startIndex, final int endIndex) throws Exception + { + Connection con = getConnection(); + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + con.start(); + Destination destination = session.createQueue(getTestQueueName()); + MessageConsumer consumer = session.createConsumer(destination); + for (int i = startIndex; i < endIndex; i++) + { + Message msg = consumer.receive(RECEIVE_TIMEOUT); + assertNotNull("Message " + i + " not received", msg); + assertEquals("Did not receive the expected message", i, msg.getIntProperty(INDEX)); + } + + Message msg = consumer.receive(100); + if(msg != null) + { + fail("No more messages should be received, but received additional message with index: " + msg.getIntProperty(INDEX)); + } + con.close(); + } + + private void invokeBdbBackup(final File backupFromDir, final File backupToDir) throws Exception + { + if (String.valueOf(System.getProperty("os.name")).toLowerCase().contains("windows")) + { + BDBBackup.main(new String[]{"-todir", backupToDir.getAbsolutePath(), "-fromdir", backupFromDir.getAbsolutePath()}); + } + else + { + runBdbBackupScript(backupFromDir, backupToDir); + } + } + + private void runBdbBackupScript(final File backupFromDir, final File backupToDir) throws IOException, + InterruptedException + { + Process backupProcess = null; + try + { + String qpidHome = System.getProperty(QPID_HOME); + ProcessBuilder pb = new ProcessBuilder(qpidHome + BACKUP_SCRIPT, "-todir", backupToDir.getAbsolutePath(), "-fromdir", backupFromDir.getAbsolutePath()); + pb.redirectErrorStream(true); + Map<String, String> env = pb.environment(); + env.put(QPID_HOME, qpidHome); + + LOGGER.debug("Backup command is " + pb.command()); + backupProcess = pb.start(); + Piper piper = new Piper(backupProcess.getInputStream(), _testcaseOutputStream, null, BACKUP_COMPLETE_MESSAGE); + piper.start(); + piper.await(2, TimeUnit.SECONDS); + backupProcess.waitFor(); + piper.join(); + + LOGGER.debug("Backup command completed " + backupProcess.exitValue()); + assertEquals("Unexpected exit value from backup script", 0, backupProcess.exitValue()); + } + finally + { + if (backupProcess != null) + { + backupProcess.getErrorStream().close(); + backupProcess.getInputStream().close(); + backupProcess.getOutputStream().close(); + } + } + } + + private void replaceStoreWithBackup(File source, File dst) throws Exception + { + LOGGER.debug("Copying store " + source + " to " + dst); + FileUtils.copyRecursive(source, dst); + } + + private void deleteStore(File storeDir) + { + LOGGER.debug("Deleting store " + storeDir); + FileUtils.delete(storeDir, true); + } + +} diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/testclient/BackupTestClient.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/testclient/BackupTestClient.java deleted file mode 100644 index f6344b3d7d..0000000000 --- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/testclient/BackupTestClient.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb.testclient; - -import org.apache.log4j.Logger; - -import org.apache.qpid.ping.PingDurableClient; -import org.apache.qpid.server.store.berkeleydb.BDBBackup; -import org.apache.qpid.util.CommandLineParser; - -import java.util.Properties; - -/** - * BackupTestClient extends {@link PingDurableClient} with an action that takes a BDB backup when a configurable - * message count is reached. This enables a test user to restore this beackup, knowing how many committed but undelivered - * messages were in the backup, in order to check that all are re-delivered when the backup is retored. - * - * <p><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Perform BDB Backup on configurable message count. - * </table> - */ -public class BackupTestClient extends PingDurableClient -{ - /** Used for debugging. */ - private static final Logger log = Logger.getLogger(BackupTestClient.class); - - /** Holds the from directory to take backups from. */ - private String fromDir; - - /** Holds the to directory to store backups in. */ - private String toDir; - - /** - * Default constructor, passes all property overrides to the parent. - * - * @param overrides Any property overrides to apply to the defaults. - * - * @throws Exception Any underlying exception is allowed to fall through. - */ - BackupTestClient(Properties overrides) throws Exception - { - super(overrides); - } - - /** - * Starts the ping/wait/receive process. From and to directory locations for the BDB backups must be specified - * on the command line: - * - * <p/><table><caption>Command Line</caption> - * <tr><th> Option <th> Comment - * <tr><td> -fromdir <td> The path to the directory to back the bdb log file from. - * <tr><td> -todir <td> The path to the directory to save the backed up bdb log files to. - * </table> - * - * @param args The command line arguments. - */ - public static void main(String[] args) - { - try - { - // Use the same command line format as BDBBackup utility, (compulsory from and to directories). - Properties options = - CommandLineParser.processCommandLine(args, new CommandLineParser(BDBBackup.COMMAND_LINE_SPEC), - System.getProperties()); - BackupTestClient pingProducer = new BackupTestClient(options); - - // Keep the from and to directories for backups. - pingProducer.fromDir = options.getProperty("fromdir"); - pingProducer.toDir = options.getProperty("todir"); - - // Create a shutdown hook to terminate the ping-pong producer. - Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook()); - - // Ensure that the ping pong producer is registered to listen for exceptions on the connection too. - // pingProducer.getConnection().setExceptionListener(pingProducer); - - // Run the test procedure. - int sent = pingProducer.send(); - pingProducer.waitForUser("Press return to begin receiving the pings."); - pingProducer.receive(sent); - - System.exit(0); - } - catch (Exception e) - { - System.err.println(e.getMessage()); - log.error("Top level handler caught execption.", e); - System.exit(1); - } - } - - /** - * Supplies a triggered action extension, based on message count. This action takes a BDB log file backup. - */ - public void takeAction() - { - BDBBackup backupUtil = new BDBBackup(); - backupUtil.takeBackupNoLock(fromDir, toDir); - System.out.println("Took backup of BDB log files from directory: " + fromDir); - } -} diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/PersistentStoreTest.java b/java/systests/src/main/java/org/apache/qpid/server/store/PersistentStoreTest.java index 3eac80f175..555c4dd20d 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/store/PersistentStoreTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/store/PersistentStoreTest.java @@ -26,146 +26,117 @@ import org.apache.qpid.test.utils.QpidBrokerTestCase; import javax.jms.Connection; import javax.jms.Destination; -import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; -import javax.jms.Queue; import javax.jms.Session; import java.util.ArrayList; import java.util.List; public class PersistentStoreTest extends QpidBrokerTestCase { - private static final int NUM_MESSAGES = 100; private Connection _con; private Session _session; - private Queue _destination; - private MessageConsumer _consumer; + private Destination _destination; - public void setUp() throws Exception, JMSException + public void setUp() throws Exception { super.setUp(); _con = getConnection(); - _con.start(); - _session = _con.createSession(true, Session.SESSION_TRANSACTED); - _destination = _session.createQueue(getTestQueueName()); - _consumer = _session.createConsumer(_destination); - _consumer.close(); + } - sendMessage(_session, _destination, NUM_MESSAGES); - _session.commit(); + public void testCommittedMessagesSurviveBrokerNormalShutdown() throws Exception + { + sendAndCommitMessages(); + stopBroker(); + startBroker(); + confirmBrokerStillHasCommittedMessages(); } - /** Checks that a new consumer on a new connection can get NUM_MESSAGES from _destination */ - private void checkMessages() throws Exception, JMSException + public void testCommittedMessagesSurviveBrokerAbnormalShutdown() throws Exception { - _con = getConnection(); - _session = _con.createSession(false, Session.AUTO_ACKNOWLEDGE); - _con.start(); - _consumer = _session.createConsumer(_destination); - for (int i = 1; i <= NUM_MESSAGES; i++) - { - Message msg = _consumer.receive(RECEIVE_TIMEOUT); - assertNotNull("Message " + i + " not received", msg); - assertEquals("Did not receive the expected message", i, msg.getIntProperty(INDEX)); - } - - Message msg = _consumer.receive(100); - if(msg != null) + if (isInternalBroker()) { - fail("No more messages should be received, but received additional message with index: " + msg.getIntProperty(INDEX)); + return; } - } -// /** -// * starts the server, sends 100 messages, restarts the server and gets 100 messages back -// * the test formerly referred to as BDB-Qpid-1 -// * @throws Exception -// */ -// public void testStartStop() throws Exception -// { -// restartBroker(); -- Not Currently a gracefull restart so not BDB-Qpid-1 -// checkMessages(); -// } + sendAndCommitMessages(); + killBroker(); + startBroker(); + confirmBrokerStillHasCommittedMessages(); + } - /** - * starts the server, sends 100 messages, nukes then starts the server and gets 100 messages back - * the test formerly referred to as BDB-Qpid-2 - * - * @throws Exception - */ - public void testForcibleStartStop() throws Exception + public void testCommittedMessagesSurviveBrokerNormalShutdownMidTransaction() throws Exception { - restartBroker(); - checkMessages(); + sendAndCommitMessages(); + sendMoreMessagesWithoutCommitting(); + stopBroker(); + startBroker(); + confirmBrokerStillHasCommittedMessages(); } -// /** -// * starts the server, sends 100 committed messages, 5 uncommited ones, -// * restarts the server and gets 100 messages back -// * the test formerly referred to as BDB-Qpid-5 -// * @throws Exception -// */ -// public void testStartStopMidTransaction() throws Exception -// { -// sendMessage(_session, _destination, 5); -// restartBroker(); -- Not Currently a gracefull restart so not BDB-Qpid-1 -// checkMessages(); -// } + public void testCommittedMessagesSurviveBrokerAbnormalShutdownMidTransaction() throws Exception + { + if (isInternalBroker()) + { + return; + } + sendAndCommitMessages(); + sendMoreMessagesWithoutCommitting(); + killBroker(); + startBroker(); + confirmBrokerStillHasCommittedMessages(); + } - /** - * starts the server, sends 100 committed messages, 5 uncommited ones, - * nukes and starts the server and gets 100 messages back - * the test formerly referred to as BDB-Qpid-6 - * - * @throws Exception - */ - public void testForcibleStartStopMidTransaction() throws Exception + private void sendAndCommitMessages() throws Exception { - sendMessage(_session, _destination, 5); - //sync to ensure that the above messages have reached the broker - ((AMQSession) _session).sync(); - restartBroker(); - checkMessages(); + _session = _con.createSession(true, Session.SESSION_TRANSACTED); + _destination = _session.createQueue(getTestQueueName()); + // Create queue by consumer side-effect + _session.createConsumer(_destination).close(); + + sendMessage(_session, _destination, NUM_MESSAGES); + _session.commit(); } - /** - * starts the server, sends 100 committed messages, 5 uncommited ones, - * restarts the client and gets 100 messages back. - * the test formerly referred to as BDB-Qpid-7 - * - * FIXME: is this a PersistentStoreTest? Seems more like a transaction test to me.. aidan - * - * @throws Exception - */ - public void testClientDeathMidTransaction() throws Exception + private void sendMoreMessagesWithoutCommitting() throws Exception { sendMessage(_session, _destination, 5); - _con.close(); - checkMessages(); + // sync to ensure that messages have reached the broker + ((AMQSession<?,?>) _session).sync(); } -// /** -// * starts the server, sends 50 committed messages, copies $QPID_WORK to a new location, -// * sends 10 messages, stops the server, nukes the store, restores the copy, starts the server -// * checks that we get the first 50 back. -// */ -// public void testHotBackup() -// { -// -- removing as this will leave 100msgs on a queue -// } + private void confirmBrokerStillHasCommittedMessages() throws Exception + { + Connection con = getConnection(); + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + con.start(); + Destination destination = session.createQueue(getTestQueueName()); + MessageConsumer consumer = session.createConsumer(destination); + for (int i = 1; i <= NUM_MESSAGES; i++) + { + Message msg = consumer.receive(RECEIVE_TIMEOUT); + assertNotNull("Message " + i + " not received", msg); + assertEquals("Did not receive the expected message", i, msg.getIntProperty(INDEX)); + } + + Message msg = consumer.receive(100); + if(msg != null) + { + fail("No more messages should be received, but received additional message with index: " + msg.getIntProperty(INDEX)); + } + } /** - * This test requires that we can send messages without commiting. + * This test requires that we can send messages without committing. * QTC always commits the messages sent via sendMessages. * * @param session the session to use for sending * @param destination where to send them to * @param count no. of messages to send * - * @return the sent messges + * @return the sent messages * * @throws Exception */ diff --git a/java/systests/src/main/java/org/apache/qpid/test/utils/BrokerHolder.java b/java/systests/src/main/java/org/apache/qpid/test/utils/BrokerHolder.java index 8345803d56..66b3fe0c6a 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/utils/BrokerHolder.java +++ b/java/systests/src/main/java/org/apache/qpid/test/utils/BrokerHolder.java @@ -22,5 +22,7 @@ package org.apache.qpid.test.utils; public interface BrokerHolder { + String getWorkingDirectory(); void shutdown(); + void kill(); } diff --git a/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java b/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java index 93e71a8cbe..f6c481431a 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java +++ b/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java @@ -34,8 +34,6 @@ public class FailoverBaseCase extends QpidBrokerTestCase public static final long DEFAULT_FAILOVER_TIME = 10000L; - protected int failingPort; - protected void setUp() throws java.lang.Exception { super.setUp(); @@ -66,15 +64,6 @@ public class FailoverBaseCase extends QpidBrokerTestCase return _connectionFactory; } - @Override - public void stopBroker(int port) throws Exception - { - if (isBrokerPresent(port)) - { - super.stopBroker(port); - } - } - public void tearDown() throws Exception { try @@ -90,11 +79,11 @@ public class FailoverBaseCase extends QpidBrokerTestCase } } - public void failBroker(int port) { try { + //TODO: use killBroker instead stopBroker(port); } catch (Exception e) @@ -102,6 +91,4 @@ public class FailoverBaseCase extends QpidBrokerTestCase throw new RuntimeException(e); } } - - } diff --git a/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java b/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java index d394430079..adda9ca3ec 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java +++ b/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java @@ -27,9 +27,11 @@ import org.apache.qpid.server.Broker; public class InternalBrokerHolder implements BrokerHolder { private static final Logger LOGGER = Logger.getLogger(InternalBrokerHolder.class); + private final Broker _broker; + private final String _workingDirectory; - public InternalBrokerHolder(final Broker broker) + public InternalBrokerHolder(final Broker broker, String workingDirectory) { if(broker == null) { @@ -37,6 +39,13 @@ public class InternalBrokerHolder implements BrokerHolder } _broker = broker; + _workingDirectory = workingDirectory; + } + + @Override + public String getWorkingDirectory() + { + return _workingDirectory; } public void shutdown() @@ -48,4 +57,12 @@ public class InternalBrokerHolder implements BrokerHolder LOGGER.info("Broker instance shutdown"); } + @Override + public void kill() + { + // Can't kill a internal broker as we would also kill ourselves as we share the same JVM. + shutdown(); + } + + } diff --git a/java/systests/src/main/java/org/apache/qpid/test/utils/Piper.java b/java/systests/src/main/java/org/apache/qpid/test/utils/Piper.java new file mode 100644 index 0000000000..9413e38606 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/test/utils/Piper.java @@ -0,0 +1,130 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.utils; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.PrintStream; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.log4j.Logger; + +public final class Piper extends Thread +{ + private static final Logger LOGGER = Logger.getLogger(Piper.class); + + private final BufferedReader _in; + private final PrintStream _out; + private final String _ready; + private final CountDownLatch _latch; + private final String _stopped; + private final String _prefix; + private volatile boolean _seenReady; + private volatile String _stopLine; + + public Piper(InputStream in, PrintStream out, String ready, String stopped) + { + this(in, out, ready, stopped, null); + } + + public Piper(InputStream in, PrintStream out, String ready, String stopped, String prefix) + { + _in = new BufferedReader(new InputStreamReader(in)); + _out = out; + _ready = ready; + _stopped = stopped; + _seenReady = false; + _prefix = prefix; + + if (this._ready != null && !this._ready.equals("")) + { + this._latch = new CountDownLatch(1); + } + else + { + this._latch = null; + } + } + + public boolean await(long timeout, TimeUnit unit) throws InterruptedException + { + if (_latch == null) + { + return true; + } + else + { + _latch.await(timeout, unit); + return _seenReady; + } + } + + public void run() + { + try + { + String line; + while ((line = _in.readLine()) != null) + { + if (_prefix != null) + { + line = _prefix + line; + } + _out.println(line); + + if (_latch != null && line.contains(_ready)) + { + _seenReady = true; + _latch.countDown(); + } + + if (!_seenReady && line.contains(_stopped)) + { + _stopLine = line; + } + } + } + catch (IOException e) + { + LOGGER.warn(e.getMessage() + " : Broker stream from unexpectedly closed; last log lines written by Broker may be lost."); + } + finally + { + if (_latch != null) + { + _latch.countDown(); + } + } + } + + public String getStopLine() + { + return _stopLine; + } + + String getReady() + { + return _ready; + } +}
\ No newline at end of file diff --git a/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java index 6311322522..32c6094adb 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java +++ b/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java @@ -59,9 +59,6 @@ import javax.naming.NamingException; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.LineNumberReader; import java.io.PrintStream; import java.net.MalformedURLException; import java.net.URL; @@ -69,7 +66,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; /** @@ -126,7 +122,6 @@ public class QpidBrokerTestCase extends QpidTestCase private static final String BROKER_LOG_PREFIX = "broker.log.prefix"; private static final String BROKER_PERSITENT = "broker.persistent"; private static final String BROKER_PROTOCOL_EXCLUDES = "broker.protocol.excludes"; - // values protected static final String JAVA = "java"; @@ -154,7 +149,7 @@ public class QpidBrokerTestCase extends QpidTestCase protected File _outputFile; - protected PrintStream _brokerOutputStream; + protected PrintStream _testcaseOutputStream; protected Map<Integer, BrokerHolder> _brokers = new HashMap<Integer, BrokerHolder>(); @@ -195,10 +190,10 @@ public class QpidBrokerTestCase extends QpidTestCase super(); } - public Logger getLogger() - { - return QpidBrokerTestCase._logger; - } + public Logger getLogger() + { + return QpidBrokerTestCase._logger; + } public void runBare() throws Throwable { @@ -228,12 +223,12 @@ public class QpidBrokerTestCase extends QpidTestCase if (_interleaveBrokerLog) { - _brokerOutputStream = out; + _testcaseOutputStream = out; } else { - _brokerOutputStream = new PrintStream(new FileOutputStream(String - .format("%s/TEST-%s.broker.out", _output, qname)), true); + _testcaseOutputStream = new PrintStream(new FileOutputStream(String + .format("%s/TEST-%s.broker.out", _output, qname)), true); } } @@ -278,7 +273,7 @@ public class QpidBrokerTestCase extends QpidTestCase out.close(); if (!_interleaveBrokerLog) { - _brokerOutputStream.close(); + _testcaseOutputStream.close(); } } } @@ -307,108 +302,6 @@ public class QpidBrokerTestCase extends QpidTestCase startBroker(); } - private static final class Piper extends Thread - { - - private LineNumberReader in; - private PrintStream out; - private String ready; - private CountDownLatch latch; - private boolean seenReady; - private String stopped; - private String stopLine; - - public Piper(InputStream in, PrintStream out, String ready) - { - this(in, out, ready, null); - } - - public Piper(InputStream in, PrintStream out, String ready, String stopped) - { - this.in = new LineNumberReader(new InputStreamReader(in)); - this.out = out; - this.ready = ready; - this.stopped = stopped; - this.seenReady = false; - - if (this.getReady() != null && !this.getReady().equals("")) - { - this.latch = new CountDownLatch(1); - } - else - { - this.latch = null; - } - } - - public Piper(InputStream in, PrintStream out) - { - this(in, out, null); - } - - public boolean await(long timeout, TimeUnit unit) throws InterruptedException - { - if (latch == null) - { - return true; - } - else - { - latch.await(timeout, unit); - return seenReady; - } - } - - public void run() - { - try - { - String line; - while ((line = in.readLine()) != null) - { - if (_interleaveBrokerLog) - { - line = _brokerLogPrefix + line; - } - out.println(line); - - if (latch != null && line.contains(getReady())) - { - seenReady = true; - latch.countDown(); - } - - if (!seenReady && line.contains(stopped)) - { - stopLine = line; - } - } - } - catch (IOException e) - { - // this seems to happen regularly even when - // exits are normal - } - finally - { - if (latch != null) - { - latch.countDown(); - } - } - } - - public String getStopLine() - { - return stopLine; - } - - public String getReady() - { - return ready; - } - } - /** * Return the management port in use by the broker on this main port * @@ -494,7 +387,7 @@ public class QpidBrokerTestCase extends QpidTestCase _logger.info("starting internal broker (same JVM)"); broker.startup(options); - _brokers.put(port, new InternalBrokerHolder(broker)); + _brokers.put(port, new InternalBrokerHolder(broker, System.getProperty("QPID_WORK"))); } else if (!_brokerType.equals(BrokerType.EXTERNAL)) { @@ -568,12 +461,13 @@ public class QpidBrokerTestCase extends QpidTestCase // cpp broker requires that the work directory is created createBrokerWork(qpidWork); - Process process = pb.start();; + Process process = pb.start(); Piper p = new Piper(process.getInputStream(), - _brokerOutputStream, + _testcaseOutputStream, System.getProperty(BROKER_READY), - System.getProperty(BROKER_STOPPED)); + System.getProperty(BROKER_STOPPED), + _interleaveBrokerLog ? _brokerLogPrefix : null); p.start(); @@ -600,7 +494,7 @@ public class QpidBrokerTestCase extends QpidTestCase // this is expect if the broker started successfully } - _brokers.put(port, new SpawnedBrokerHolder(process)); + _brokers.put(port, new SpawnedBrokerHolder(process, qpidWork)); } } @@ -747,11 +641,31 @@ public class QpidBrokerTestCase extends QpidTestCase public void stopBroker(int port) throws Exception { - port = getPort(port); + if (isBrokerPresent(port)) + { + port = getPort(port); - _logger.info("stopping broker on port : " + port); - BrokerHolder broker = _brokers.remove(port); - broker.shutdown(); + _logger.info("stopping broker on port : " + port); + BrokerHolder broker = _brokers.remove(port); + broker.shutdown(); + } + } + + public void killBroker() throws Exception + { + killBroker(0); + } + + public void killBroker(int port) throws Exception + { + if (isBrokerPresent(port)) + { + port = getPort(port); + + _logger.info("killing broker on port : " + port); + BrokerHolder broker = _brokers.remove(port); + broker.kill(); + } } public boolean isBrokerPresent(int port) throws Exception @@ -760,7 +674,13 @@ public class QpidBrokerTestCase extends QpidTestCase return _brokers.containsKey(port); } - + + public BrokerHolder getBroker(int port) throws Exception + { + port = getPort(port); + return _brokers.get(port); + } + /** * Attempt to set the Java Broker to use the BDBMessageStore for persistence * Falling back to the DerbyMessageStore if @@ -989,7 +909,6 @@ public class QpidBrokerTestCase extends QpidTestCase /** * we assume that the environment is correctly set * i.e. -Djava.naming.provider.url="..//example010.properties" - * TODO should be a way of setting that through maven * * @return an initial context * @@ -1163,13 +1082,13 @@ public class QpidBrokerTestCase extends QpidTestCase /** * Send messages to the given destination. * - * If session is transacted then messages will be commited before returning + * If session is transacted then messages will be committed before returning * * @param session the session to use for sending * @param destination where to send them to * @param count no. of messages to send * - * @return the sent messges + * @return the sent messages * * @throws Exception */ @@ -1362,6 +1281,6 @@ public class QpidBrokerTestCase extends QpidTestCase protected int getFailingPort() { - return FAILING_PORT; + return FAILING_PORT; } } diff --git a/java/systests/src/main/java/org/apache/qpid/test/utils/ReflectionUtils.java b/java/systests/src/main/java/org/apache/qpid/test/utils/ReflectionUtils.java index 7946c6a6d1..83294c13ad 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/utils/ReflectionUtils.java +++ b/java/systests/src/main/java/org/apache/qpid/test/utils/ReflectionUtils.java @@ -21,6 +21,7 @@ package org.apache.qpid.test.utils; import java.lang.reflect.Constructor; +import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -225,4 +226,34 @@ public class ReflectionUtils throw new ReflectionUtilsException("NoSuchMethodException", e); } } + + @SuppressWarnings("unchecked") + public static <T> T getDeclaredField(final Object obj, final String fieldName) + { + try + { + final Field field = obj.getClass().getDeclaredField(fieldName); + if (!field.isAccessible()) + { + field.setAccessible(true); + } + return (T) field.get(obj); + } + catch (NoSuchFieldException e) + { + throw new ReflectionUtilsException("Unable to read field " + fieldName + "from object " + obj, e); + } + catch (SecurityException e) + { + throw new ReflectionUtilsException("Unable to read field " + fieldName + "from object " + obj, e); + } + catch (IllegalArgumentException e) + { + throw new ReflectionUtilsException("Unable to read field " + fieldName + "from object " + obj, e); + } + catch (IllegalAccessException e) + { + throw new ReflectionUtilsException("Unable to read field " + fieldName + "from object " + obj, e); + } + } } diff --git a/java/systests/src/main/java/org/apache/qpid/test/utils/SpawnedBrokerHolder.java b/java/systests/src/main/java/org/apache/qpid/test/utils/SpawnedBrokerHolder.java index 65239bbe02..50b1ea7cea 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/utils/SpawnedBrokerHolder.java +++ b/java/systests/src/main/java/org/apache/qpid/test/utils/SpawnedBrokerHolder.java @@ -20,15 +20,20 @@ */ package org.apache.qpid.test.utils; +import java.io.IOException; + import org.apache.log4j.Logger; public class SpawnedBrokerHolder implements BrokerHolder { private static final Logger LOGGER = Logger.getLogger(SpawnedBrokerHolder.class); + private final boolean _isWindows = String.valueOf(System.getProperty("os.name")).toLowerCase().contains("windows"); private final Process _process; + private final Integer _pid; + private final String _workingDirectory; - public SpawnedBrokerHolder(final Process process) + public SpawnedBrokerHolder(final Process process, final String workingDirectory) { if(process == null) { @@ -36,14 +41,87 @@ public class SpawnedBrokerHolder implements BrokerHolder } _process = process; + _pid = retrieveUnixPidIfPossible(); + _workingDirectory = workingDirectory; + } + + @Override + public String getWorkingDirectory() + { + return _workingDirectory; } public void shutdown() { LOGGER.info("Destroying broker process"); - _process.destroy(); + reapChildProcess(); + } + + @Override + public void kill() + { + if (_pid == null) + { + LOGGER.info("Destroying broker process"); + _process.destroy(); + } + else + { + LOGGER.info("Killing broker process with PID " + _pid); + sendSigkillForImmediateShutdown(_pid); + } + + reapChildProcess(); + } + + private void sendSigkillForImmediateShutdown(Integer pid) + { + boolean killSuccessful = false; + try + { + final Process killProcess = Runtime.getRuntime().exec("kill -KILL " + pid); + killProcess.waitFor(); + killSuccessful = killProcess.exitValue() == 0; + } + catch (IOException e) + { + LOGGER.error("Error whilst killing process " + _pid, e); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + } + finally + { + if (!killSuccessful) + { + _process.destroy(); + } + } + } + + private Integer retrieveUnixPidIfPossible() + { + if(!_isWindows) + { + try + { + Integer pid = ReflectionUtils.getDeclaredField(_process, "pid"); + LOGGER.info("PID " + pid); + return pid; + } + catch (ReflectionUtilsException e) + { + LOGGER.warn("Could not get pid for process, Broker process shutdown will be ungraceful"); + } + } + return null; + } + + private void reapChildProcess() + { try { _process.waitFor(); @@ -51,8 +129,21 @@ public class SpawnedBrokerHolder implements BrokerHolder } catch (InterruptedException e) { - LOGGER.error("Interrupted whilst waiting for process destruction"); + LOGGER.error("Interrupted whilst waiting for process shutdown"); Thread.currentThread().interrupt(); } + finally + { + try + { + _process.getInputStream().close(); + _process.getErrorStream().close(); + _process.getOutputStream().close(); + } + catch (IOException e) + { + } + } } + } diff --git a/java/test-profiles/JavaDerbyExcludes b/java/test-profiles/JavaDerbyExcludes index 3caa360d48..931a0b0ddb 100644 --- a/java/test-profiles/JavaDerbyExcludes +++ b/java/test-profiles/JavaDerbyExcludes @@ -19,3 +19,4 @@ org.apache.qpid.server.store.berkeleydb.BDBMessageStoreTest#* org.apache.qpid.server.store.berkeleydb.BDBUpgradeTest#* +org.apache.qpid.server.store.berkeleydb.BDBBackupTest#* diff --git a/java/test-profiles/JavaTransientExcludes b/java/test-profiles/JavaTransientExcludes index 67190a6fcc..b4e583ba3a 100644 --- a/java/test-profiles/JavaTransientExcludes +++ b/java/test-profiles/JavaTransientExcludes @@ -34,3 +34,4 @@ org.apache.qpid.server.store.MessageStoreTest#testDurableExchangeRemoval org.apache.qpid.server.store.berkeleydb.BDBMessageStoreTest#* org.apache.qpid.server.store.berkeleydb.BDBUpgradeTest#* +org.apache.qpid.server.store.berkeleydb.BDBBackupTest#* |
