From d96489e6a64861e943a9adffadc5bbcf18632069 Mon Sep 17 00:00:00 2001 From: Rupert Smith Date: Tue, 17 Jul 2007 16:22:16 +0000 Subject: Refactored the distributed test clients and coordinator to support different distribution and sequencing engines. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@556958 13f79535-47bb-0310-9956-ffa450edef68 --- .../interop/coordinator/CoordinatingTestCase.java | 263 ------ .../qpid/interop/coordinator/Coordinator.java | 214 +++-- .../qpid/interop/coordinator/DropInTest.java | 51 ++ .../qpid/interop/coordinator/FanOutTestCase.java | 179 ++++ .../interop/coordinator/FanOutTestDecorator.java | 182 ++++ .../qpid/interop/coordinator/InteropTestCase.java | 259 ++++++ .../interop/coordinator/InteropTestDecorator.java | 184 ++++ .../interop/coordinator/InvitingTestDecorator.java | 179 ++-- .../coordinator/ListeningCoordinatorTest.java | 28 - .../coordinator/ListeningTestDecorator.java | 200 ----- .../qpid/interop/coordinator/OptOutTestCase.java | 2 +- .../interop/coordinator/TestClientDetails.java | 13 +- .../qpid/interop/coordinator/XMLTestListener.java | 94 +-- .../testcases/CoordinatingTestCase1DummyRun.java | 85 -- .../testcases/CoordinatingTestCase2BasicP2P.java | 90 -- .../CoordinatingTestCase3BasicPubSub.java | 92 -- .../testcases/InteropTestCase1DummyRun.java | 89 ++ .../testcases/InteropTestCase2BasicP2P.java | 95 +++ .../testcases/InteropTestCase3BasicPubSub.java | 93 ++ .../interop/testclient/InteropClientTestCase.java | 17 +- .../apache/qpid/interop/testclient/TestClient.java | 212 ++--- .../testclient/testcases/TestCase1DummyRun.java | 47 +- .../testclient/testcases/TestCase2BasicP2P.java | 23 +- .../testclient/testcases/TestCase3BasicPubSub.java | 42 +- .../qpid/sustained/SustainedClientTestCase.java | 905 ++++++++++++++++++++ .../apache/qpid/sustained/SustainedTestCase.java | 125 +++ .../apache/qpid/sustained/SustainedTestClient.java | 931 --------------------- .../qpid/sustained/SustainedTestCoordinator.java | 222 ----- .../java/org/apache/qpid/sustained/TestClient.java | 157 ---- .../org/apache/qpid/sustained/TestCoordinator.java | 117 --- 30 files changed, 2548 insertions(+), 2642 deletions(-) delete mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java create mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/DropInTest.java create mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/FanOutTestCase.java create mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/FanOutTestDecorator.java create mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InteropTestCase.java create mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InteropTestDecorator.java delete mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningCoordinatorTest.java delete mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningTestDecorator.java delete mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase1DummyRun.java delete mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase2BasicP2P.java delete mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase3BasicPubSub.java create mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/InteropTestCase1DummyRun.java create mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/InteropTestCase2BasicP2P.java create mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/InteropTestCase3BasicPubSub.java create mode 100644 java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedClientTestCase.java create mode 100644 java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCase.java delete mode 100644 java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java delete mode 100644 java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java delete mode 100644 java/integrationtests/src/main/java/org/apache/qpid/sustained/TestClient.java delete mode 100644 java/integrationtests/src/main/java/org/apache/qpid/sustained/TestCoordinator.java (limited to 'java/integrationtests/src') diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java deleted file mode 100644 index d2042be741..0000000000 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java +++ /dev/null @@ -1,263 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.interop.coordinator; - -import junit.framework.TestCase; - -import org.apache.log4j.Logger; - -import org.apache.qpid.util.ConversationFactory; - -import javax.jms.*; - -import java.util.Map; - -/** - * A CoordinatingTestCase is a JUnit test case extension that knows how to coordinate test clients that take part in a - * test case as defined in the interop testing specification - * (http://cwiki.apache.org/confluence/display/qpid/Interop+Testing+Specification). - * - *

The real logic of the test cases built on top of this, is embeded in the comparison of the sender and receiver - * reports. An example test method might look like: - * - *

- * public void testExample()
- * {
- *   Properties testConfig = new Properties();
- *   testConfig.add("TEST_CASE", "example");
- *   ...
- *
- *   Report[] reports = sequenceTest(testConfig);
- *
- *   // Compare sender and receiver reports.
- *   if (report[0] ... report[1] ...)
- *   {
- *     Assert.fail("Sender and receiver reports did not match up.");
- *   }
- * }
- *
- * 
- * - *

- *
CRC Card
Responsibilities Collaborations - *
Accept notification of test case participants. {@link InvitingTestDecorator} - *
Accpet JMS Connection to carry out the coordination over. - *
Coordinate the test sequence amongst participants. {@link ConversationFactory} - *
Supply test properties - *
- */ -public abstract class CoordinatingTestCase extends TestCase -{ - /** Used for debugging. */ - private static final Logger log = Logger.getLogger(CoordinatingTestCase.class); - - /** Holds the contact details for the sending test client. */ - protected TestClientDetails sender; - - /** Holds the contact details for the receving test client. */ - protected TestClientDetails receiver; - - /** Holds the conversation factory over which to coordinate the test. */ - protected ConversationFactory conversationFactory; - - /** - * Creates a new coordinating test case with the specified name. - * - * @param name The test case name. - */ - public CoordinatingTestCase(String name) - { - super(name); - } - - /** - * Sets the sender test client to coordinate the test with. - * - * @param sender The contact details of the sending client in the test. - */ - public void setSender(TestClientDetails sender) - { - log.debug("public void setSender(TestClientDetails sender = " + sender + "): called"); - - this.sender = sender; - } - - /** - * Sets the receiving test client to coordinate the test with. - * - * @param receiver The contact details of the sending client in the test. - */ - public void setReceiver(TestClientDetails receiver) - { - log.debug("public void setReceiver(TestClientDetails receiver = " + receiver + "): called"); - - this.receiver = receiver; - } - - /** - * Supplies the sending test client. - * - * @return The sending test client. - */ - public TestClientDetails getSender() - { - return sender; - } - - /** - * Supplies the receiving test client. - * - * @return The receiving test client. - */ - public TestClientDetails getReceiver() - { - return receiver; - } - - /** - * Returns the name of the current test method of this test class, with the sending and receiving client names - * appended on to it, so that the resulting name unqiuely identifies the test and the clients that participated - * in it. - * - * @return The unique test and client name. - */ - public String getName() - { - if ((sender == null) || (receiver == null)) - { - return super.getName(); - } - else - { - return super.getName() + "_sender_" + sender.clientName + "_receiver_" + receiver.clientName; - } - } - - /** - * Should provide a translation from the junit method name of a test to its test case name as defined in the - * interop testing specification. For example the method "testP2P" might map onto the interop test case name - * "TC2_BasicP2P". - * - * @param methodName The name of the JUnit test method. - * - * @return The name of the corresponding interop test case. - */ - public abstract String getTestCaseNameForTestMethod(String methodName); - - /** - * Accepts the conversation factory over which to hold the test coordinating conversation. - * - * @param conversationFactory The conversation factory to coordinate the test over. - */ - public void setConversationFactory(ConversationFactory conversationFactory) - { - this.conversationFactory = conversationFactory; - } - - /** - * Holds a test coordinating conversation with the test clients. This is the basic implementation of the inner - * loop of Use Case 5. It consists of assigning the test roles, begining the test and gathering the test reports - * from the participants. - * - * @param testProperties The test case definition. - * - * @return The test results from the senders and receivers. - * - * @throws JMSException All underlying JMSExceptions are allowed to fall through. - */ - protected Message[] sequenceTest(Map testProperties) throws JMSException - { - log.debug("protected Message[] sequenceTest(Object... testProperties = " + testProperties + "): called"); - - Session session = conversationFactory.getSession(); - Destination senderControlTopic = session.createTopic(sender.privateControlKey); - Destination receiverControlTopic = session.createTopic(receiver.privateControlKey); - - ConversationFactory.Conversation senderConversation = conversationFactory.startConversation(); - ConversationFactory.Conversation receiverConversation = conversationFactory.startConversation(); - - // Assign the sender role to the sending test client. - Message assignSender = conversationFactory.getSession().createMessage(); - setPropertiesOnMessage(assignSender, testProperties); - assignSender.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE"); - assignSender.setStringProperty("ROLE", "SENDER"); - - senderConversation.send(senderControlTopic, assignSender); - - // Assign the receiver role the receiving client. - Message assignReceiver = session.createMessage(); - setPropertiesOnMessage(assignReceiver, testProperties); - assignReceiver.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE"); - assignReceiver.setStringProperty("ROLE", "RECEIVER"); - - receiverConversation.send(receiverControlTopic, assignReceiver); - - // Wait for the senders and receivers to confirm their roles. - senderConversation.receive(); - receiverConversation.receive(); - - // Start the test. - Message start = session.createMessage(); - start.setStringProperty("CONTROL_TYPE", "START"); - - senderConversation.send(senderControlTopic, start); - - // Wait for the test sender to return its report. - Message senderReport = senderConversation.receive(); - - try - { - Thread.sleep(500); - } - catch (InterruptedException e) - { } - - // Ask the receiver for its report. - Message statusRequest = session.createMessage(); - statusRequest.setStringProperty("CONTROL_TYPE", "STATUS_REQUEST"); - - receiverConversation.send(receiverControlTopic, statusRequest); - - // Wait for the receiver to send its report. - Message receiverReport = receiverConversation.receive(); - - return new Message[] { senderReport, receiverReport }; - } - - /** - * Sets properties of different types on a JMS Message. - * - * @param message The message to set properties on. - * @param properties The property name/value pairs to set. - * - * @throws JMSException All underlying JMSExceptions are allowed to fall through. - */ - public void setPropertiesOnMessage(Message message, Map properties) throws JMSException - { - for (Map.Entry entry : properties.entrySet()) - { - String name = entry.getKey(); - Object value = entry.getValue(); - - message.setObjectProperty(name, value); - } - } -} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java index 0eb6be3a91..c6efe05c61 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java @@ -20,31 +20,33 @@ */ package org.apache.qpid.interop.coordinator; -import java.io.*; -import java.util.*; -import java.util.concurrent.LinkedBlockingQueue; - -import javax.jms.*; - import junit.framework.Test; import junit.framework.TestResult; import junit.framework.TestSuite; import org.apache.log4j.Logger; -import org.apache.qpid.interop.coordinator.testcases.CoordinatingTestCase1DummyRun; -import org.apache.qpid.interop.coordinator.testcases.CoordinatingTestCase2BasicP2P; -import org.apache.qpid.interop.coordinator.testcases.CoordinatingTestCase3BasicPubSub; -import org.apache.qpid.interop.testclient.TestClient; -import org.apache.qpid.util.CommandLineParser; +import org.apache.qpid.interop.coordinator.testcases.InteropTestCase1DummyRun; +import org.apache.qpid.interop.coordinator.testcases.InteropTestCase2BasicP2P; +import org.apache.qpid.interop.coordinator.testcases.InteropTestCase3BasicPubSub; +import org.apache.qpid.test.framework.MessagingTestConfigProperties; +import org.apache.qpid.test.framework.TestUtils; import org.apache.qpid.util.ConversationFactory; import org.apache.qpid.util.PrettyPrintingUtils; import uk.co.thebadgerset.junit.extensions.TKTestResult; import uk.co.thebadgerset.junit.extensions.TKTestRunner; import uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator; +import uk.co.thebadgerset.junit.extensions.util.CommandLineParser; +import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; import uk.co.thebadgerset.junit.extensions.util.TestContextProperties; +import javax.jms.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.LinkedBlockingQueue; + /** *

Implements the coordinator client described in the interop testing specification * (http://cwiki.apache.org/confluence/display/qpid/Interop+Testing+Specification). This coordinator is built on @@ -57,12 +59,33 @@ import uk.co.thebadgerset.junit.extensions.util.TestContextProperties; * Attach XML test result logger. * Terminate the interop testing framework. * + * + * @todo The test result is ignored, because it only contains the failures for the last test run. Shoud accumulate + * failures over all tests, and return with success or fail code based on all results. + * + * @todo Remove hard coding of test cases and put on command line instead. */ public class Coordinator extends TKTestRunner { + /** Used for debugging. */ private static final Logger log = Logger.getLogger(Coordinator.class); - public static final String DEFAULT_CONNECTION_PROPS_RESOURCE = "org/apache/qpid/interop/connection.properties"; + /** Defines the possible distributed test engines available to run coordinated test cases with. */ + public enum TestEngine + { + /** Specifies the interop test engine. This tests all available clients in pairs. */ + INTEROP, + + /** Specifies the fanout test engine. This sets up one publisher role, and many reciever roles. */ + FANOUT + } + + /** + * Holds the test context properties that provides the default test parameters, plus command line overrides. + * This is initialized with the default test parameters, to which command line overrides may be applied. + */ + protected static ParsedProperties testContextProperties = + TestContextProperties.getInstance(MessagingTestConfigProperties.defaults); /** Holds the URL of the broker to coordinate the tests on. */ protected String brokerUrl; @@ -86,21 +109,28 @@ public class Coordinator extends TKTestRunner protected String currentTestClassName; /** Holds the path of the directory to output test results too, if one is defined. */ - protected static String _reportDir; + protected String reportDir; + + /** Holds the coordinating test engine type to run the tests through. */ + protected TestEngine engine; /** * Creates an interop test coordinator on the specified broker and virtual host. * * @param brokerUrl The URL of the broker to connect to. * @param virtualHost The virtual host to run all tests on. Optional, may be null. + * @param reportDir The directory to write out test results to. + * @param engine The distributed test engine type to run the tests with. */ - public Coordinator(String brokerUrl, String virtualHost) + public Coordinator(String brokerUrl, String virtualHost, String reportDir, TestEngine engine) { log.debug("Coordinator(String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost + "): called"); // Retain the connection parameters. this.brokerUrl = brokerUrl; this.virtualHost = virtualHost; + this.reportDir = reportDir; + this.engine = engine; } /** @@ -116,59 +146,69 @@ public class Coordinator extends TKTestRunner */ public static void main(String[] args) { - try + // Override the default broker url to be localhost:5672. + testContextProperties.setProperty(MessagingTestConfigProperties.BROKER_PROPNAME, "tcp://localhost:5672"); + + // Use the command line parser to evaluate the command line with standard handling behaviour (print errors + // and usage then exist if there are errors). + // Any options and trailing name=value pairs are also injected into the test context properties object, + // to override any defaults that may have been set up. + Properties options = + CommandLineParser.processCommandLine(args, + new CommandLineParser( + new String[][] + { + { "b", "The broker URL.", "broker", "false" }, + { "h", "The virtual host to use.", "virtual host", "false" }, + { "o", "The name of the directory to output test timings to.", "dir", "false" }, + { + "e", "The test execution engine to use. Default is interop.", "engine", "interop", + "^interop$|^fanout$", "true" + } + }), testContextProperties); + + // Extract the command line options. + String brokerUrl = options.getProperty("b"); + String virtualHost = options.getProperty("h"); + String reportDir = options.getProperty("o"); + String testEngine = options.getProperty("e"); + TestEngine engine = "fanout".equals(testEngine) ? TestEngine.FANOUT : TestEngine.INTEROP; + reportDir = (reportDir == null) ? "." : reportDir; + + // If broker or virtual host settings were specified as command line options, override the defaults in the + // test context properties with them. + + // Scan for available test cases using a classpath scanner. + // Hard code the test classes till the classpath scanner is fixed. + Collection> testCaseClasses = new ArrayList>(); + // ClasspathScanner.getMatches(InteropTestCase.class, "^Test.*", true); + Collections.addAll(testCaseClasses, InteropTestCase1DummyRun.class, InteropTestCase2BasicP2P.class, + InteropTestCase3BasicPubSub.class); + + // Check that some test classes were actually found. + if (testCaseClasses.isEmpty()) { - // Use the command line parser to evaluate the command line with standard handling behaviour (print errors - // and usage then exist if there are errors). - Properties options = - CommandLineParser.processCommandLine(args, - new CommandLineParser( - new String[][] - { - {"b", "The broker URL.", "broker", "false"}, - {"h", "The virtual host to use.", "virtual host", "false"}, - {"o", "The name of the directory to output test timings to.", "dir", "false"} - })); - - // Extract the command line options. - String brokerUrl = options.getProperty("b"); - String virtualHost = options.getProperty("h"); - _reportDir = options.getProperty("o"); - _reportDir = (_reportDir == null) ? "." : _reportDir; - - // Scan for available test cases using a classpath scanner. - Collection> testCaseClasses = - new ArrayList>(); - // ClasspathScanner.getMatches(CoordinatingTestCase.class, "^Test.*", true); - // Hard code the test classes till the classpath scanner is fixed. - Collections.addAll(testCaseClasses, - CoordinatingTestCase1DummyRun.class, - CoordinatingTestCase2BasicP2P.class, - CoordinatingTestCase3BasicPubSub.class); - - // Check that some test classes were actually found. - if (testCaseClasses.isEmpty()) - { - throw new RuntimeException( - "No test classes implementing CoordinatingTestCase were found on the class path."); - } - - int i = 0; - String[] testClassNames = new String[testCaseClasses.size()]; + throw new RuntimeException("No test classes implementing InteropTestCase were found on the class path."); + } - for (Class testClass : testCaseClasses) - { - testClassNames[i++] = testClass.getName(); - } + // Extract the names of all the test classes, to pass to the start method. + int i = 0; + String[] testClassNames = new String[testCaseClasses.size()]; - // Create a coordinator and begin its test procedure. - Coordinator coordinator = new Coordinator(brokerUrl, virtualHost); + for (Class testClass : testCaseClasses) + { + testClassNames[i++] = testClass.getName(); + } - boolean failure = false; + // Create a coordinator and begin its test procedure. + Coordinator coordinator = new Coordinator(brokerUrl, virtualHost, reportDir, engine); + try + { TestResult testResult = coordinator.start(testClassNames); - if (failure) + // Return different error codes, depending on whether or not there were test failures. + if (testResult.failureCount() > 0) { System.exit(FAILURE_EXIT); } @@ -186,7 +226,7 @@ public class Coordinator extends TKTestRunner } /** - * Starts all of the test classes to be run by this coordinator running. + * Starts all of the test classes to be run by this coordinator. * * @param testClassNames An array of all the coordinating test case implementations. * @@ -197,10 +237,10 @@ public class Coordinator extends TKTestRunner public TestResult start(String[] testClassNames) throws Exception { log.debug("public TestResult start(String[] testClassNames = " + PrettyPrintingUtils.printArray(testClassNames) - + ": called"); + + ": called"); // Connect to the broker. - connection = TestClient.createConnection(DEFAULT_CONNECTION_PROPS_RESOURCE, brokerUrl, virtualHost); + connection = TestUtils.createConnection(TestContextProperties.getInstance()); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination controlTopic = session.createTopic("iop.control"); @@ -231,7 +271,7 @@ public class Coordinator extends TKTestRunner // Record the current test class, so that the test results can be output to a file incorporating this name. this.currentTestClassName = testClassName; - result = super.start(new String[]{testClassName}); + result = super.start(new String[] { testClassName }); } // At this point in time, all tests have completed. Broadcast the shutdown message. @@ -255,7 +295,7 @@ public class Coordinator extends TKTestRunner public static Set extractEnlists(Collection enlists) throws JMSException { log.debug("public static Set extractEnlists(Collection enlists = " + enlists - + "): called"); + + "): called"); Set enlistedClients = new HashSet(); @@ -304,17 +344,17 @@ public class Coordinator extends TKTestRunner Test nextTest = suite.testAt(i); log.debug("suite.testAt(" + i + ") = " + nextTest); - if (nextTest instanceof CoordinatingTestCase) + if (nextTest instanceof InteropTestCase) { - log.debug("nextTest is a CoordinatingTestCase"); + log.debug("nextTest is a InteropTestCase"); } } targetTest = new WrappedSuiteTestDecorator(suite); log.debug("Wrapped with a WrappedSuiteTestDecorator."); } - // Wrap the tests in an inviting test decorator, to perform the invite/test cycle. + // Wrap the tests in a suitable distributed test decorator, to perform the invite/test cycle. targetTest = newTestDecorator(targetTest, enlistedClients, conversationFactory, connection); TestSuite suite = new TestSuite(); @@ -326,9 +366,28 @@ public class Coordinator extends TKTestRunner return super.doRun(suite, wait); } - protected WrappedSuiteTestDecorator newTestDecorator(WrappedSuiteTestDecorator targetTest, Set enlistedClients, ConversationFactory conversationFactory, Connection connection) + /** + * Creates a wrapped test decorator, that is capable of inviting enlisted clients to participate in a specified + * test. This is the test engine that sets up the roles and sequences a distributed test case. + * + * @param targetTest The test decorator to wrap. + * @param enlistedClients The enlisted clients available to run the test. + * @param conversationFactory The conversation factory used to build conversation helper over the specified connection. + * @param connection The connection to talk to the enlisted clients over. + * + * @return An invititing test decorator, that invites all the enlisted clients to participate in tests, in pairs. + */ + protected InvitingTestDecorator newTestDecorator(WrappedSuiteTestDecorator targetTest, + Set enlistedClients, ConversationFactory conversationFactory, Connection connection) { - return new InvitingTestDecorator(targetTest, enlistedClients, conversationFactory, connection); + switch (engine) + { + case FANOUT: + return new FanOutTestDecorator(targetTest, enlistedClients, conversationFactory, connection); + case INTEROP: + default: + return new InteropTestDecorator(targetTest, enlistedClients, conversationFactory, connection); + } } /** @@ -343,18 +402,18 @@ public class Coordinator extends TKTestRunner TKTestResult result = new TKTestResult(fPrinter.getWriter(), delay, verbose, testCaseName); // Check if a directory to output reports to has been specified and attach test listeners if so. - if (_reportDir != null) + if (reportDir != null) { // Create the report directory if it does not already exist. - File reportDirFile = new File(_reportDir); + File reportDirFile = new File(reportDir); if (!reportDirFile.exists()) { reportDirFile.mkdir(); } - // Create the timings file (make the name of this configurable as a command line parameter). - Writer timingsWriter = null; + // Create the results file (make the name of this configurable as a command line parameter). + Writer timingsWriter; try { @@ -366,7 +425,7 @@ public class Coordinator extends TKTestRunner throw new RuntimeException("Unable to create the log file to write test results to: " + e, e); } - // Set up a CSV results listener to output the timings to the results file. + // Set up an XML results listener to output the timings to the results file. XMLTestListener listener = new XMLTestListener(timingsWriter, currentTestClassName); result.addListener(listener); result.addTKTestListener(listener); @@ -385,9 +444,4 @@ public class Coordinator extends TKTestRunner return result; } - - public void setReportDir(String reportDir) - { - _reportDir = reportDir; - } } diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/DropInTest.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/DropInTest.java new file mode 100644 index 0000000000..f7e38fb1ad --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/DropInTest.java @@ -0,0 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.interop.coordinator; + +import javax.jms.JMSException; +import javax.jms.Message; + +/** + * A DropIn test is a test case that can accept late joining test clients into a running test. This can be usefull, + * for interactive experimentation. + * + *

+ *
CRC Card
Responsibilities + *
Accept late joining test clients. + *
+ */ +public interface DropInTest +{ + /** + * Should accept a late joining client into a running test case. The client will be enlisted with a control message + * with the 'CONTROL_TYPE' field set to the value 'LATEJOIN'. It should also provide values for the fields: + * + *

+ *
CLIENT_NAME A unique name for the new client. + *
CLIENT_PRIVATE_CONTROL_KEY The key for the route on which the client receives its control messages. + *
+ * + * @param message The late joiners join message. + * + * @throws JMSException Any JMS Exception are allowed to fall through, indicating that the join failed. + */ + public void lateJoin(Message message) throws JMSException; +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/FanOutTestCase.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/FanOutTestCase.java new file mode 100644 index 0000000000..ba737dffab --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/FanOutTestCase.java @@ -0,0 +1,179 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.interop.coordinator; + +import org.apache.log4j.Logger; + +import org.apache.qpid.test.framework.TestUtils; +import org.apache.qpid.util.ConversationFactory; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Session; + +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * FanOutTestCase is a {@link org.apache.qpid.interop.coordinator.InteropTestCase} across one sending client and + * zero or more receiving clients. Its main purpose is to coordinate the setting up of one test client in the sending + * role and the remainder in the receiving role. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Accept notification of test case participants. + * {@link org.apache.qpid.interop.coordinator.InvitingTestDecorator} + *
Accept JMS Connection to carry out the coordination over. + *
Coordinate the test sequence amongst participants. {@link ConversationFactory} + *
Supply test properties + *
+ * + * @todo Gather all the receivers reports. + */ +public abstract class FanOutTestCase extends InteropTestCase +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(FanOutTestCase.class); + + /** The test clients in the receiving role. */ + private List receivers = new LinkedList(); + + /** + * Creates a new coordinating test case with the specified name. + * + * @param name The test case name. + */ + public FanOutTestCase(String name) + { + super(name); + } + + /** + * Adds a receiver to this test. + * + * @param receiver The contact details of the sending client in the test. + */ + public void setReceiver(TestClientDetails receiver) + { + receivers.add(receiver); + } + + /** + * Holds a test coordinating conversation with the test clients. This is the basic implementation of the inner loop + * of Use Case 5. It consists of assigning the test roles, begining the test and gathering the test reports from the + * participants. + * + * @param testProperties The test case definition. + * + * @return The test results from the senders and receivers. The senders report will always be returned first, + * followed by the receivers reports. + * + * @throws JMSException All underlying JMSExceptions are allowed to fall through. + */ + protected Message[] sequenceTest(Map testProperties) throws JMSException + { + log.debug("protected Message[] sequenceTest(Object... testProperties = " + testProperties + "): called"); + + // Create a conversation on the sender clients private control rouete. + Session session = conversationFactory.getSession(); + Destination senderControlTopic = session.createTopic(sender.privateControlKey); + ConversationFactory.Conversation senderConversation = conversationFactory.startConversation(); + + // Assign the sender role to the sending test client. + Message assignSender = conversationFactory.getSession().createMessage(); + setPropertiesOnMessage(assignSender, testProperties); + assignSender.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE"); + assignSender.setStringProperty("ROLE", "SENDER"); + assignSender.setStringProperty("CLIENT_NAME", "Sustained_SENDER"); + + senderConversation.send(senderControlTopic, assignSender); + + // Wait for the sender to confirm its role. + senderConversation.receive(); + + // Assign the receivers roles. + for (TestClientDetails receiver : receivers) + { + assignReceiverRole(receiver, testProperties, true); + } + + // Start the test on the sender. + Message start = session.createMessage(); + start.setStringProperty("CONTROL_TYPE", "START"); + + senderConversation.send(senderControlTopic, start); + + // Wait for the test sender to return its report. + Message senderReport = senderConversation.receive(); + TestUtils.pause(500); + + // Ask the receivers for their reports. + Message statusRequest = session.createMessage(); + statusRequest.setStringProperty("CONTROL_TYPE", "STATUS_REQUEST"); + + // Gather the reports from all of the receiving clients. + + // Return all of the test reports, the senders report first. + return new Message[] { senderReport }; + } + + /** + * Assigns the receiver role to the specified test client that is to act as a receiver during the test. This method + * does not always wait for the receiving clients to confirm their role assignments. This is because this method + * may be called from an 'onMessage' method, when a client is joining the test at a later point in time, and it + * is not possible to do a synchronous receive during an 'onMessage' method. There is a flag to indicate whether + * or not to wait for role confirmations. + * + * @param receiver The test client to assign the receiver role to. + * @param testProperties The test parameters. + * @param confirm Indicates whether role confirmation should be waited for. + * + * @throws JMSException Any JMSExceptions occurring during the conversation are allowed to fall through. + */ + protected void assignReceiverRole(TestClientDetails receiver, Map testProperties, boolean confirm) + throws JMSException + { + log.info("assignReceiverRole(TestClientDetails receiver = " + receiver + ", Map testProperties = " + + testProperties + "): called"); + + // Create a conversation with the receiving test client. + Session session = conversationFactory.getSession(); + Destination receiverControlTopic = session.createTopic(receiver.privateControlKey); + ConversationFactory.Conversation receiverConversation = conversationFactory.startConversation(); + + // Assign the receiver role to the receiving client. + Message assignReceiver = session.createMessage(); + setPropertiesOnMessage(assignReceiver, testProperties); + assignReceiver.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE"); + assignReceiver.setStringProperty("ROLE", "RECEIVER"); + assignReceiver.setStringProperty("CLIENT_NAME", receiver.clientName); + + receiverConversation.send(receiverControlTopic, assignReceiver); + + // Wait for the role confirmation to come back. + if (confirm) + { + receiverConversation.receive(); + } + } +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/FanOutTestDecorator.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/FanOutTestDecorator.java new file mode 100644 index 0000000000..5e3fb51b97 --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/FanOutTestDecorator.java @@ -0,0 +1,182 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.interop.coordinator; + +import junit.framework.Test; +import junit.framework.TestResult; + +import org.apache.log4j.Logger; + +import org.apache.qpid.util.ConversationFactory; + +import uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; + +import java.util.Collection; +import java.util.Iterator; +import java.util.Set; + +/** + * FanOutTestDecorator is an {@link InvitingTestDecorator} that runs one test client in the sender role, and the remainder + * in the receiver role. It also has the capability to listen for new test cases joining the test beyond the initial start + * point. This feature can be usefull when experimenting with adding more load, in the form of more test clients, to assess + * its impact on a running test. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Execute coordinated test cases. {@link InteropTestCase} + *
Accept test clients joining a running test. + *
+ */ +public class FanOutTestDecorator extends InvitingTestDecorator implements MessageListener +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(FanOutTestDecorator.class); + + /** Holds the currently running test case. */ + InteropTestCase currentTest = null; + + /** + * Creates a wrapped suite test decorator from another one. + * + * @param suite The test suite. + * @param availableClients The list of all clients that responded to the compulsory invite. + * @param controlConversation The conversation helper for the control level, test coordination conversation. + * @param controlConnection The connection that the coordination messages are sent over. + */ + public FanOutTestDecorator(WrappedSuiteTestDecorator suite, Set availableClients, + ConversationFactory controlConversation, Connection controlConnection) + { + super(suite, availableClients, controlConversation, controlConnection); + + log.debug("public InvitingTestDecorator(WrappedSuiteTestDecorator suite, Set allClients = " + + availableClients + ", ConversationHelper controlConversation = " + controlConversation + "): called"); + + testSuite = suite; + allClients = availableClients; + conversationFactory = controlConversation; + connection = controlConnection; + } + + /** + * Broadcasts a test invitation and accepts enlists from participating clients. The wrapped test cases are run + * with one test client in the sender role, and the remaining test clients in the receiving role. + * + *

Any JMSExceptions during the invite/enlist conversation will be allowed to fall through as runtime + * exceptions, resulting in the non-completion of the test run. + * + * @param testResult The the results object to monitor the test results with. + * + * @todo Better error recovery for failure of the invite/enlist conversation could be added. + */ + public void run(TestResult testResult) + { + log.debug("public void run(TestResult testResult): called"); + + Collection tests = testSuite.getAllUnderlyingTests(); + + // Listen for late joiners on the control topic. + try + { + conversationFactory.getSession().createConsumer(controlTopic).setMessageListener(this); + } + catch (JMSException e) + { + throw new RuntimeException("Unable to set up the message listener on the control topic.", e); + } + + // Run all of the test cases in the test suite. + for (Test test : tests) + { + InteropTestCase coordTest = (InteropTestCase) test; + + // Get all of the clients able to participate in the test. + Set enlists = signupClients(coordTest); + + // Check that there were some clients available. + if (enlists.size() == 0) + { + throw new RuntimeException("No clients to test with"); + } + + // Set up the first client in the sender role, and the remainder in the receiver role. + Iterator clients = enlists.iterator(); + coordTest.setSender(clients.next()); + + while (clients.hasNext()) + { + // Set the sending and receiving client details on the test case. + coordTest.setReceiver(clients.next()); + } + + // Pass down the connection to hold the coordinating conversation over. + coordTest.setConversationFactory(conversationFactory); + + // If the current test case is a drop-in test, set it up as the currently running test for late joiners to + // add in to. Otherwise the current test field is set to null, to indicate that late joiners are not allowed. + currentTest = (coordTest instanceof DropInTest) ? coordTest : null; + + // Execute the test case. + coordTest.run(testResult); + + currentTest = null; + } + } + + /** + * Listens to incoming messages on the control topic. If the messages are 'join' messages, signalling a new + * test client wishing to join the current test, then the new client will be added to the current test in the + * receiver role. + * + * @param message The incoming control message. + */ + public void onMessage(Message message) + { + try + { + // Check if the message is from a test client attempting to join a running test, and join it to the current + // test case if so. + if (message.getStringProperty("CONTROL_TYPE").equals("JOIN") && (currentTest != null)) + { + ((DropInTest) currentTest).lateJoin(message); + } + } + // There is not a lot can be done with this error, so it is deliberately ignored. + catch (JMSException e) + { + log.debug("Unable to process message:" + message); + } + } + + /** + * Prints a string summarizing this test decorator, mainly for debugging purposes. + * + * @return String representation for debugging purposes. + */ + public String toString() + { + return "FanOutTestDecorator: [ testSuite = " + testSuite + " ]"; + } +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InteropTestCase.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InteropTestCase.java new file mode 100644 index 0000000000..f895b781f0 --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InteropTestCase.java @@ -0,0 +1,259 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.interop.coordinator; + +import junit.framework.TestCase; + +import org.apache.log4j.Logger; + +import org.apache.qpid.test.framework.TestUtils; +import org.apache.qpid.util.ConversationFactory; + +import javax.jms.*; + +import java.util.Map; + +/** + * A InteropTestCase is a JUnit test case extension that knows how to coordinate test clients that take part in a + * test case as defined in the interop testing specification + * (http://cwiki.apache.org/confluence/display/qpid/Interop+Testing+Specification). + * + *

The real logic of the test cases built on top of this, is embeded in the comparison of the sender and receiver + * reports. An example test method might look like: + * + *

+ * public void testExample()
+ * {
+ *   Properties testConfig = new Properties();
+ *   testConfig.add("TEST_CASE", "example");
+ *   ...
+ *
+ *   Report[] reports = sequenceTest(testConfig);
+ *
+ *   // Compare sender and receiver reports.
+ *   if (report[0] ... report[1] ...)
+ *   {
+ *     Assert.fail("Sender and receiver reports did not match up.");
+ *   }
+ * }
+ *
+ * 
+ * + *

+ *
CRC Card
Responsibilities Collaborations + *
Accept notification of test case participants. {@link InvitingTestDecorator} + *
Accpet JMS Connection to carry out the coordination over. + *
Coordinate the test sequence amongst participants. {@link ConversationFactory} + *
Supply test properties + *
+ */ +public abstract class InteropTestCase extends TestCase +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(InteropTestCase.class); + + /** Holds the contact details for the sending test client. */ + protected TestClientDetails sender; + + /** Holds the contact details for the receving test client. */ + protected TestClientDetails receiver; + + /** Holds the conversation factory over which to coordinate the test. */ + protected ConversationFactory conversationFactory; + + /** + * Creates a new coordinating test case with the specified name. + * + * @param name The test case name. + */ + public InteropTestCase(String name) + { + super(name); + } + + /** + * Sets the sender test client to coordinate the test with. + * + * @param sender The contact details of the sending client in the test. + */ + public void setSender(TestClientDetails sender) + { + log.debug("public void setSender(TestClientDetails sender = " + sender + "): called"); + + this.sender = sender; + } + + /** + * Sets the receiving test client to coordinate the test with. + * + * @param receiver The contact details of the sending client in the test. + */ + public void setReceiver(TestClientDetails receiver) + { + log.debug("public void setReceiver(TestClientDetails receiver = " + receiver + "): called"); + + this.receiver = receiver; + } + + /** + * Supplies the sending test client. + * + * @return The sending test client. + */ + public TestClientDetails getSender() + { + return sender; + } + + /** + * Supplies the receiving test client. + * + * @return The receiving test client. + */ + public TestClientDetails getReceiver() + { + return receiver; + } + + /** + * Returns the name of the current test method of this test class, with the sending and receiving client names + * appended on to it, so that the resulting name unqiuely identifies the test and the clients that participated + * in it. + * + * @return The unique test and client name. + */ + public String getName() + { + if ((sender == null) || (receiver == null)) + { + return super.getName(); + } + else + { + return super.getName() + "_sender_" + sender.clientName + "_receiver_" + receiver.clientName; + } + } + + /** + * Should provide a translation from the junit method name of a test to its test case name as known to the test + * clients that will run the test. The purpose of this is to convert the JUnit method name into the correct test + * case name to place into the test invite. For example the method "testP2P" might map onto the interop test case + * name "TC2_BasicP2P". + * + * @param methodName The name of the JUnit test method. + * + * @return The name of the corresponding interop test case. + */ + public abstract String getTestCaseNameForTestMethod(String methodName); + + /** + * Accepts the conversation factory over which to hold the test coordinating conversation. + * + * @param conversationFactory The conversation factory to coordinate the test over. + */ + public void setConversationFactory(ConversationFactory conversationFactory) + { + this.conversationFactory = conversationFactory; + } + + /** + * Holds a test coordinating conversation with the test clients. This is the basic implementation of the inner + * loop of Use Case 5. It consists of assigning the test roles, begining the test and gathering the test reports + * from the participants. + * + * @param testProperties The test case definition. + * + * @return The test results from the senders and receivers. + * + * @throws JMSException All underlying JMSExceptions are allowed to fall through. + */ + protected Message[] sequenceTest(Map testProperties) throws JMSException + { + log.debug("protected Message[] sequenceTest(Object... testProperties = " + testProperties + "): called"); + + Session session = conversationFactory.getSession(); + Destination senderControlTopic = session.createTopic(sender.privateControlKey); + Destination receiverControlTopic = session.createTopic(receiver.privateControlKey); + + ConversationFactory.Conversation senderConversation = conversationFactory.startConversation(); + ConversationFactory.Conversation receiverConversation = conversationFactory.startConversation(); + + // Assign the sender role to the sending test client. + Message assignSender = conversationFactory.getSession().createMessage(); + setPropertiesOnMessage(assignSender, testProperties); + assignSender.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE"); + assignSender.setStringProperty("ROLE", "SENDER"); + + senderConversation.send(senderControlTopic, assignSender); + + // Assign the receiver role the receiving client. + Message assignReceiver = session.createMessage(); + setPropertiesOnMessage(assignReceiver, testProperties); + assignReceiver.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE"); + assignReceiver.setStringProperty("ROLE", "RECEIVER"); + + receiverConversation.send(receiverControlTopic, assignReceiver); + + // Wait for the senders and receivers to confirm their roles. + senderConversation.receive(); + receiverConversation.receive(); + + // Start the test. + Message start = session.createMessage(); + start.setStringProperty("CONTROL_TYPE", "START"); + + senderConversation.send(senderControlTopic, start); + + // Wait for the test sender to return its report. + Message senderReport = senderConversation.receive(); + TestUtils.pause(500); + + // Ask the receiver for its report. + Message statusRequest = session.createMessage(); + statusRequest.setStringProperty("CONTROL_TYPE", "STATUS_REQUEST"); + + receiverConversation.send(receiverControlTopic, statusRequest); + + // Wait for the receiver to send its report. + Message receiverReport = receiverConversation.receive(); + + return new Message[] { senderReport, receiverReport }; + } + + /** + * Sets properties of different types on a JMS Message. + * + * @param message The message to set properties on. + * @param properties The property name/value pairs to set. + * + * @throws JMSException All underlying JMSExceptions are allowed to fall through. + */ + public void setPropertiesOnMessage(Message message, Map properties) throws JMSException + { + for (Map.Entry entry : properties.entrySet()) + { + String name = entry.getKey(); + Object value = entry.getValue(); + + message.setObjectProperty(name, value); + } + } +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InteropTestDecorator.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InteropTestDecorator.java new file mode 100644 index 0000000000..85d127110d --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InteropTestDecorator.java @@ -0,0 +1,184 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.interop.coordinator; + +import junit.framework.Test; +import junit.framework.TestResult; + +import org.apache.log4j.Logger; + +import org.apache.qpid.util.ConversationFactory; + +import uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator; + +import javax.jms.Connection; + +import java.util.*; + +/** + * InvitingTestDecorator is a test decorator, written to implement the interop test specification. Given a list + * of enlisted test clients, that are available to run interop tests, this decorator invites them to participate + * in each test in the wrapped test suite. Amongst all the clients that respond to the invite, all pairs are formed, + * and each pairing (in both directions, but excluding the reflexive pairings) is split into a sender and receiver + * role and a test case run between them. Any enlisted combinations that do not accept a test invite are automatically + * failed. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Broadcast test invitations and collect enlists. {@link org.apache.qpid.util.ConversationFactory}. + *
Output test failures for clients unwilling to run the test case. {@link Coordinator} + *
Execute coordinated test cases. {@link InteropTestCase} + *
Fail non participating pairings. {@link OptOutTestCase} + *
+ */ +public class InteropTestDecorator extends InvitingTestDecorator +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(InteropTestDecorator.class); + + /** + * Creates a wrapped suite test decorator from another one. + * + * @param suite The test suite. + * @param availableClients The list of all clients that responded to the compulsory invite. + * @param controlConversation The conversation helper for the control level, test coordination conversation. + * @param controlConnection The connection that the coordination messages are sent over. + */ + public InteropTestDecorator(WrappedSuiteTestDecorator suite, Set availableClients, + ConversationFactory controlConversation, Connection controlConnection) + { + super(suite, availableClients, controlConversation, controlConnection); + } + + /** + * Broadcasts a test invitation and accetps enlisting from participating clients. The wrapped test case is + * then repeated for every combination of test clients (provided the wrapped test case extends + * {@link InteropTestCase}. + * + *

Any JMSExceptions during the invite/enlist conversation will be allowed to fall through as runtime exceptions, + * resulting in the non-completion of the test run. + * + * @todo Better error recovery for failure of the invite/enlist conversation could be added. + * + * @param testResult The the results object to monitor the test results with. + */ + public void run(TestResult testResult) + { + log.debug("public void run(TestResult testResult): called"); + + Collection tests = testSuite.getAllUnderlyingTests(); + + for (Test test : tests) + { + InteropTestCase coordTest = (InteropTestCase) test; + + // Broadcast the invitation to find out what clients are available to test. + Set enlists = signupClients(coordTest); + + // Compare the list of willing clients to the list of all available. + Set optOuts = new HashSet(allClients); + optOuts.removeAll(enlists); + + // Output test failures for clients that will not particpate in the test. + Set> failPairs = allPairs(optOuts, allClients); + + for (List failPair : failPairs) + { + InteropTestCase failTest = new OptOutTestCase("testOptOut"); + failTest.setSender(failPair.get(0)); + failTest.setReceiver(failPair.get(1)); + + failTest.run(testResult); + } + + // Loop over all combinations of clients, willing to run the test. + Set> enlistedPairs = allPairs(enlists, enlists); + + for (List enlistedPair : enlistedPairs) + { + // Set the sending and receiving client details on the test case. + coordTest.setSender(enlistedPair.get(0)); + coordTest.setReceiver(enlistedPair.get(1)); + + // Pass down the connection to hold the coordination conversation over. + coordTest.setConversationFactory(conversationFactory); + + // Execute the test case. + coordTest.run(testResult); + } + } + } + + /** + * Produces all pairs of combinations of elements from two sets. The ordering of the elements in the pair is + * important, that is the pair is distinct from ; both pairs are generated. For any element, i, in + * both the left and right sets, the reflexive pair is not generated. + * + * @param left The left set. + * @param right The right set. + * @param The type of the content of the pairs. + * + * @return All pairs formed from the permutations of all elements of the left and right sets. + */ + private Set> allPairs(Set left, Set right) + { + log.debug("private Set> allPairs(Set left = " + left + ", Set right = " + right + "): called"); + + Set> results = new HashSet>(); + + // Form all pairs from left to right. + // Form all pairs from right to left. + for (E le : left) + { + for (E re : right) + { + if (!le.equals(re)) + { + results.add(new Pair(le, re)); + results.add(new Pair(re, le)); + } + } + } + + log.debug("results = " + results); + + return results; + } + + /** + * A simple implementation of a pair, using a list. + */ + private class Pair extends ArrayList + { + /** + * Creates a new pair of elements. + * + * @param first The first element. + * @param second The second element. + */ + public Pair(T first, T second) + { + super(); + super.add(first); + super.add(second); + } + } +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java index 8695f7f66f..1225d74fbf 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java @@ -20,14 +20,6 @@ */ package org.apache.qpid.interop.coordinator; -import java.util.*; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; - -import junit.framework.Test; import junit.framework.TestResult; import org.apache.log4j.Logger; @@ -36,16 +28,26 @@ import org.apache.qpid.util.ConversationFactory; import uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; + +import java.util.*; + /** + * InvitingTestDecorator is a base class for writing test decorators that invite test clients to participate in + * distributed test cases. It provides a helper method, {@link #signupClients(InteropTestCase)}, that broadcasts + * an invitation and return the set of test clients that are available to particiapte in the test. + * *

*
CRC Card
Responsibilities Collaborations *
Broadcast test invitations and collect enlists. {@link ConversationFactory}. - *
Output test failures for clients unwilling to run the test case. {@link Coordinator} - *
Execute coordinated test cases. {@link CoordinatingTestCase} *
*/ -public class InvitingTestDecorator extends WrappedSuiteTestDecorator +public abstract class InvitingTestDecorator extends WrappedSuiteTestDecorator { + /** Used for debugging. */ private static final Logger log = Logger.getLogger(InvitingTestDecorator.class); /** Holds the contact information for all test clients that are available and that may take part in the test. */ @@ -57,9 +59,12 @@ public class InvitingTestDecorator extends WrappedSuiteTestDecorator /** Holds the connection that the control conversation is held over. */ Connection connection; - /** Holds the underlying {@link CoordinatingTestCase}s that this decorator wraps. */ + /** Holds the underlying {@link InteropTestCase}s that this decorator wraps. */ WrappedSuiteTestDecorator testSuite; + /** Holds the control topic, on which test invitations are broadcast. */ + protected Destination controlTopic; + /** * Creates a wrapped suite test decorator from another one. * @@ -80,141 +85,67 @@ public class InvitingTestDecorator extends WrappedSuiteTestDecorator allClients = availableClients; conversationFactory = controlConversation; connection = controlConnection; - } - /** - * Broadcasts a test invitation and accetps enlisting from participating clients. The wrapped test case is - * then repeated for every combination of test clients (provided the wrapped test case extends - * {@link CoordinatingTestCase}. - * - *

Any JMSExceptions during the invite/enlist conversation will be allowed to fall through as runtime exceptions, - * resulting in the non-completion of the test run. - * - * @todo Better error recovery for failure of the invite/enlist conversation could be added. - * - * @param testResult The the results object to monitor the test results with. - */ - public void run(TestResult testResult) - { - log.debug("public void run(TestResult testResult): called"); - - Collection tests = testSuite.getAllUnderlyingTests(); - - for (Test test : tests) + // Set up the test control topic. + try + { + controlTopic = conversationFactory.getSession().createTopic("iop.control"); + } + catch (JMSException e) { - CoordinatingTestCase coordTest = (CoordinatingTestCase) test; - - // Broadcast the invitation to find out what clients are available to test. - Set enlists; - try - { - Message invite = conversationFactory.getSession().createMessage(); - Destination controlTopic = conversationFactory.getSession().createTopic("iop.control"); - ConversationFactory.Conversation conversation = conversationFactory.startConversation(); - - invite.setStringProperty("CONTROL_TYPE", "INVITE"); - invite.setStringProperty("TEST_NAME", coordTest.getTestCaseNameForTestMethod(coordTest.getName())); - - conversation.send(controlTopic, invite); - - // Wait for a short time, to give test clients an opportunity to reply to the invitation. - Collection replies = conversation.receiveAll(allClients.size(), 3000); - enlists = Coordinator.extractEnlists(replies); - } - catch (JMSException e) - { - throw new RuntimeException("There was a JMSException during the invite/enlist conversation.", e); - } - - // Compare the list of willing clients to the list of all available. - Set optOuts = new HashSet(allClients); - optOuts.removeAll(enlists); - - // Output test failures for clients that will not particpate in the test. - Set> failPairs = allPairs(optOuts, allClients); - - for (List failPair : failPairs) - { - CoordinatingTestCase failTest = new OptOutTestCase("testOptOut"); - failTest.setSender(failPair.get(0)); - failTest.setReceiver(failPair.get(1)); - - failTest.run(testResult); - } - - // Loop over all combinations of clients, willing to run the test. - Set> enlistedPairs = allPairs(enlists, enlists); - - for (List enlistedPair : enlistedPairs) - { - // Set the sending and receiving client details on the test case. - coordTest.setSender(enlistedPair.get(0)); - coordTest.setReceiver(enlistedPair.get(1)); - - // Pass down the connection to hold the coordination conversation over. - coordTest.setConversationFactory(conversationFactory); - - // Execute the test case. - coordTest.run(testResult); - } + throw new RuntimeException("Unable to create the coordinating control topic to broadcast test invites on.", e); } } /** - * Prints a string summarizing this test decorator, mainly for debugging purposes. + * Should run all of the tests in the wrapped test suite. * - * @return String representation for debugging purposes. + * @param testResult The the results object to monitor the test results with. */ - public String toString() - { - return "InvitingTestDecorator: [ testSuite = " + testSuite + " ]"; - } + public abstract void run(TestResult testResult); /** - * Produces all pairs of combinations of elements from two sets. The ordering of the elements in the pair is - * important, that is the pair is distinct from ; both pairs are generated. For any element, i, in - * both the left and right sets, the reflexive pair is not generated. + * Broadcasts an invitation to participate in a coordinating test case to find out what clients are available to + * run the test case. * - * @param left The left set. - * @param right The right set. + * @param coordTest The coordinating test case to broadcast an inviate for. * - * @return All pairs formed from the permutations of all elements of the left and right sets. + * @return A set of test clients that accepted the invitation. */ - private Set> allPairs(Set left, Set right) + protected Set signupClients(InteropTestCase coordTest) { - log.debug("private Set> allPairs(Set left = " + left + ", Set right = " + right + "): called"); + // Broadcast the invitation to find out what clients are available to test. + Set enlists; + try + { + Message invite = conversationFactory.getSession().createMessage(); - Set> results = new HashSet>(); + ConversationFactory.Conversation conversation = conversationFactory.startConversation(); - // Form all pairs from left to right. - // Form all pairs from right to left. - for (E le : left) + invite.setStringProperty("CONTROL_TYPE", "INVITE"); + invite.setStringProperty("TEST_NAME", coordTest.getTestCaseNameForTestMethod(coordTest.getName())); + + conversation.send(controlTopic, invite); + + // Wait for a short time, to give test clients an opportunity to reply to the invitation. + Collection replies = conversation.receiveAll(allClients.size(), 3000); + enlists = Coordinator.extractEnlists(replies); + } + catch (JMSException e) { - for (E re : right) - { - if (!le.equals(re)) - { - results.add(new Pair(le, re)); - results.add(new Pair(re, le)); - } - } + throw new RuntimeException("There was a JMSException during the invite/enlist conversation.", e); } - log.debug("results = " + results); - - return results; + return enlists; } /** - * A simple implementation of a pair, using a list. + * Prints a string summarizing this test decorator, mainly for debugging purposes. + * + * @return String representation for debugging purposes. */ - private class Pair extends ArrayList + public String toString() { - public Pair(T first, T second) - { - super(); - super.add(first); - super.add(second); - } + return "InvitingTestDecorator: [ testSuite = " + testSuite + " ]"; } } diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningCoordinatorTest.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningCoordinatorTest.java deleted file mode 100644 index 1b4461f8c2..0000000000 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningCoordinatorTest.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.interop.coordinator; - -import javax.jms.Message; - -public interface ListeningCoordinatorTest -{ - public void latejoin(Message message); -} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningTestDecorator.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningTestDecorator.java deleted file mode 100644 index 4312dfbcc6..0000000000 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningTestDecorator.java +++ /dev/null @@ -1,200 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.interop.coordinator; - -import junit.framework.Test; -import junit.framework.TestResult; -import org.apache.log4j.Logger; -import org.apache.qpid.util.ConversationFactory; -import uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import java.util.Collection; -import java.util.Iterator; -import java.util.Set; - -/** - *

CRC Card
Responsibilities Collaborations
Broadcast test - * invitations and collect enlists. {@link ConversationFactory}.
Output test failures for clients - * unwilling to run the test case. {@link Coordinator}
Execute coordinated test cases. {@link - * CoordinatingTestCase}
- */ -public class ListeningTestDecorator extends WrappedSuiteTestDecorator implements MessageListener -{ - private static final Logger log = Logger.getLogger(ListeningTestDecorator.class); - - /** Holds the contact information for all test clients that are available and that may take part in the test. */ - Set allClients; - - /** Holds the conversation helper for the control level conversation for coordinating the test through. */ - ConversationFactory conversationFactory; - - /** Holds the connection that the control conversation is held over. */ - Connection connection; - - /** Holds the underlying {@link CoordinatingTestCase}s that this decorator wraps. */ - WrappedSuiteTestDecorator testSuite; - - /** Hold the current running test case. */ - CoordinatingTestCase _currentTest = null; - - /** - * Creates a wrapped suite test decorator from another one. - * - * @param suite The test suite. - * @param availableClients The list of all clients that responded to the compulsory invite. - * @param controlConversation The conversation helper for the control level, test coordination conversation. - * @param controlConnection The connection that the coordination messages are sent over. - */ - public ListeningTestDecorator(WrappedSuiteTestDecorator suite, Set availableClients, - ConversationFactory controlConversation, Connection controlConnection) - { - super(suite); - - log.debug("public InvitingTestDecorator(WrappedSuiteTestDecorator suite, Set allClients = " - + availableClients + ", ConversationHelper controlConversation = " + controlConversation + "): called"); - - testSuite = suite; - allClients = availableClients; - conversationFactory = controlConversation; - connection = controlConnection; - } - - /** - * Broadcasts a test invitation and accetps enlisting from participating clients. The wrapped test case is then - * repeated for every combination of test clients (provided the wrapped test case extends {@link - * CoordinatingTestCase}. - * - *

Any JMSExceptions during the invite/enlist conversation will be allowed to fall through as runtime - * exceptions, resulting in the non-completion of the test run. - * - * @param testResult The the results object to monitor the test results with. - * - * @todo Better error recovery for failure of the invite/enlist conversation could be added. - */ - public void run(TestResult testResult) - { - log.debug("public void run(TestResult testResult): called"); - - Collection tests = testSuite.getAllUnderlyingTests(); - - for (Test test : tests) - { - CoordinatingTestCase coordTest = (CoordinatingTestCase) test; - - Set enlists = signupClients(coordTest); - - if (enlists.size() == 0) - { - throw new RuntimeException("No clients to test with"); - } - - Iterator clients = enlists.iterator(); - coordTest.setSender(clients.next()); - - while (clients.hasNext()) - { - // Set the sending and receiving client details on the test case. - coordTest.setReceiver(clients.next()); - } - - // Pass down the connection to hold the coordination conversation over. - coordTest.setConversationFactory(conversationFactory); - - - if (coordTest instanceof ListeningCoordinatorTest) - { - _currentTest = coordTest; - } - // Execute the test case. - coordTest.run(testResult); - - _currentTest = null; - } - } - - private Set signupClients(CoordinatingTestCase coordTest) - { - // Broadcast the invitation to find out what clients are available to test. - Set enlists; - try - { - Message invite = conversationFactory.getSession().createMessage(); - Destination controlTopic = conversationFactory.getSession().createTopic("iop.control"); - ConversationFactory.Conversation conversation = conversationFactory.startConversation(); - - invite.setStringProperty("CONTROL_TYPE", "INVITE"); - invite.setStringProperty("TEST_NAME", coordTest.getTestCaseNameForTestMethod(coordTest.getName())); - - conversation.send(controlTopic, invite); - - // Wait for a short time, to give test clients an opportunity to reply to the invitation. - Collection replies = conversation.receiveAll(allClients.size(), 5000); - - log.debug("Received " + replies.size() + " enlist replies"); - - enlists = Coordinator.extractEnlists(replies); - - //Create topic to listen on for latejoiners - Destination listenTopic = conversationFactory.getSession().createTopic("iop.control.test." + coordTest.getTestCaseNameForTestMethod(coordTest.getName())); - - //Listen for joiners - conversationFactory.getSession().createConsumer(listenTopic).setMessageListener(this); - log.debug("Created consumer on :" + listenTopic); - } - catch (JMSException e) - { - throw new RuntimeException("There was a JMSException during the invite/enlist conversation.", e); - } - - return enlists; - } - - /** - * Prints a string summarizing this test decorator, mainly for debugging purposes. - * - * @return String representation for debugging purposes. - */ - public String toString() - { - return "ListeningTestDecorator: [ testSuite = " + testSuite + " ]"; - } - - - public void onMessage(Message message) - { - try - { - if (message.getStringProperty("CONTROL_TYPE").equals("LATEJOIN")) - { - ((ListeningCoordinatorTest) _currentTest).latejoin(message); - } - } - catch (JMSException e) - { - log.debug("Unable to process message:" + message); - } - } -} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/OptOutTestCase.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/OptOutTestCase.java index 42a382a898..4332aaf55c 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/OptOutTestCase.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/OptOutTestCase.java @@ -32,7 +32,7 @@ import junit.framework.Assert; * Fail the test with a suitable reason. * */ -public class OptOutTestCase extends CoordinatingTestCase +public class OptOutTestCase extends InteropTestCase { /** * Creates a new coordinating test case with the specified name. diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/TestClientDetails.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/TestClientDetails.java index c4a9d39cd8..742375b7bd 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/TestClientDetails.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/TestClientDetails.java @@ -21,8 +21,12 @@ package org.apache.qpid.interop.coordinator; /** + * TestClientDetails is used to encapsulate information about an interop test client. It pairs together the unique + * name of the client, and the route on which it listens to its control messages. + * *

*
CRC Card
Responsibilities Collaborations + *
Record test clients control addresses together with their names. *
*/ public class TestClientDetails @@ -56,13 +60,8 @@ public class TestClientDetails final TestClientDetails testClientDetails = (TestClientDetails) o; - if ((clientName != null) ? (!clientName.equals(testClientDetails.clientName)) - : (testClientDetails.clientName != null)) - { - return false; - } - - return true; + return !((clientName != null) ? (!clientName.equals(testClientDetails.clientName)) + : (testClientDetails.clientName != null)); } /** diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/XMLTestListener.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/XMLTestListener.java index 747ba0dd0b..74c86b1d83 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/XMLTestListener.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/XMLTestListener.java @@ -18,14 +18,8 @@ * under the License. * */ - package org.apache.qpid.interop.coordinator; -import java.io.IOException; -import java.io.PrintWriter; -import java.io.Writer; -import java.util.*; - import junit.framework.AssertionFailedError; import junit.framework.Test; import junit.framework.TestCase; @@ -34,6 +28,11 @@ import org.apache.log4j.Logger; import uk.co.thebadgerset.junit.extensions.listeners.TKTestListener; +import java.io.IOException; +import java.io.PrintWriter; +import java.io.Writer; +import java.util.*; + /** * Listens for test results for a named test and outputs these in the standard JUnit XML format to the specified * writer. @@ -50,6 +49,12 @@ import uk.co.thebadgerset.junit.extensions.listeners.TKTestListener; * *

*
CRC Card
Responsibilities Collaborations + *
Listen to test lifecycle notifications. + *
Listen to test errors and failures. + *
Listen to test timings. + *
Listen to test memory usages. + *
Listen to parameterized test parameters. + *
Responsibilities *
* * @todo Merge this class with CSV test listener, making the collection of results common to both, and only factoring @@ -97,7 +102,8 @@ public class XMLTestListener implements TKTestListener /** * Creates a new XML results output listener that writes to the specified location. * - * @param writer The location to write results to. + * @param writer The location to write results to. + * @param testClassName The name of the test class to include in the test results. */ public XMLTestListener(Writer writer, String testClassName) { @@ -126,7 +132,9 @@ public class XMLTestListener implements TKTestListener } /** - * A test started. + * Notification that a test started. + * + * @param test The test that started. */ public void startTest(Test test) { @@ -189,7 +197,9 @@ public class XMLTestListener implements TKTestListener { } /** - * A test ended. + * Notification that a test ended. + * + * @param test The test that ended. */ public void endTest(Test test) { @@ -225,6 +235,9 @@ public class XMLTestListener implements TKTestListener /** * An error occurred. + * + * @param test The test in which the error occurred. + * @param t The throwable that resulted from the error. */ public void addError(Test test, Throwable t) { @@ -237,6 +250,9 @@ public class XMLTestListener implements TKTestListener /** * A failure occurred. + * + * @param test The test in which the failure occurred. + * @param t The JUnit assertions that led to the failure. */ public void addFailure(Test test, AssertionFailedError t) { @@ -339,13 +355,10 @@ public class XMLTestListener implements TKTestListener */ protected static class Result { - public Result(String testClass, String testName) - { - this.testClass = testClass; - this.testName = testName; - } - + /** Holds the name of the test class. */ public String testClass; + + /** Holds the name of the test method. */ public String testName; /** Holds the exception that caused error in this test. */ @@ -354,49 +367,16 @@ public class XMLTestListener implements TKTestListener /** Holds the assertion exception that caused failure in this test. */ public AssertionFailedError failure; - /** Holds the error count for this test. */ - // public int errors = 0; - - /** Holds the failure count for this tests. */ - // public int failures = 0; - - /** Holds the overall tests run count for this test. */ - // public int runs = 0; - - /*public boolean equals(Object o) + /** + * Creates a placeholder for the results of a test. + * + * @param testClass The test class. + * @param testName The name of the test that was run. + */ + public Result(String testClass, String testName) { - if (this == o) - { - return true; - } - - if (!(o instanceof Result)) - { - return false; - } - - final Result result = (Result) o; - - if ((testClass != null) ? (!testClass.equals(result.testClass)) : (result.testClass != null)) - { - return false; - } - - if ((testName != null) ? (!testName.equals(result.testName)) : (result.testName != null)) - { - return false; - } - - return true; + this.testClass = testClass; + this.testName = testName; } - - public int hashCode() - { - int result; - result = ((testClass != null) ? testClass.hashCode() : 0); - result = (29 * result) + ((testName != null) ? testName.hashCode() : 0); - - return result; - }*/ } } diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase1DummyRun.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase1DummyRun.java deleted file mode 100644 index e642ef792b..0000000000 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase1DummyRun.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.interop.coordinator.testcases; - -import java.util.HashMap; -import java.util.Map; - -import javax.jms.Message; - -import junit.framework.Assert; - -import org.apache.log4j.Logger; - -import org.apache.qpid.interop.coordinator.CoordinatingTestCase; - -/** - *

- *
CRC Card
Responsibilities Collaborations - *
Exercises the interop testing framework without actually sending any test messages. - * {@link org.apache.qpid.interop.coordinator.CoordinatingTestCase} - *
- */ -public class CoordinatingTestCase1DummyRun extends CoordinatingTestCase -{ - /** Used for debugging. */ - private static final Logger log = Logger.getLogger(CoordinatingTestCase1DummyRun.class); - - /** - * Creates a new coordinating test case with the specified name. - * - * @param name The test case name. - */ - public CoordinatingTestCase1DummyRun(String name) - { - super(name); - } - - /** - * Performs the basic P2P test case, "Test Case 2" in the specification. - */ - public void testDummyRun() throws Exception - { - log.debug("public void testDummyRun(): called"); - - Map testConfig = new HashMap(); - testConfig.put("TEST_NAME", "TC1_DummyRun"); - - Message[] reports = sequenceTest(testConfig); - - // Compare sender and receiver reports. - Assert.assertEquals("Expected to get 2 dummy reports.", 2, reports.length); - } - - /** - * Should provide a translation from the junit method name of a test to its test case name as defined in the - * interop testing specification. For example the method "testP2P" might map onto the interop test case name - * "TC2_BasicP2P". - * - * @param methodName The name of the JUnit test method. - * @return The name of the corresponding interop test case. - */ - public String getTestCaseNameForTestMethod(String methodName) - { - return "TC1_DummyRun"; - } -} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase2BasicP2P.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase2BasicP2P.java deleted file mode 100644 index b1b2d9f847..0000000000 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase2BasicP2P.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.interop.coordinator.testcases; - -import java.util.HashMap; -import java.util.Map; - -import javax.jms.Message; - -import junit.framework.Assert; - -import org.apache.log4j.Logger; - -import org.apache.qpid.interop.coordinator.CoordinatingTestCase; - -/** - *

- *
CRC Card
Responsibilities Collaborations - *
Setup p2p test parameters and compare with test output. {@link CoordinatingTestCase} - *
- */ -public class CoordinatingTestCase2BasicP2P extends CoordinatingTestCase -{ - /** Used for debugging. */ - private static final Logger log = Logger.getLogger(CoordinatingTestCase2BasicP2P.class); - - /** - * Creates a new coordinating test case with the specified name. - * - * @param name The test case name. - */ - public CoordinatingTestCase2BasicP2P(String name) - { - super(name); - } - - /** - * Performs the basic P2P test case, "Test Case 2" in the specification. - */ - public void testBasicP2P() throws Exception - { - log.debug("public void testBasicP2P(): called"); - - Map testConfig = new HashMap(); - testConfig.put("TEST_NAME", "TC2_BasicP2P"); - testConfig.put("P2P_QUEUE_AND_KEY_NAME", "tc2queue"); - testConfig.put("P2P_NUM_MESSAGES", 50); - - Message[] reports = sequenceTest(testConfig); - - // Compare sender and receiver reports. - int messagesSent = reports[0].getIntProperty("MESSAGE_COUNT"); - int messagesReceived = reports[1].getIntProperty("MESSAGE_COUNT"); - - Assert.assertEquals("The requested number of messages were not sent.", 50, messagesSent); - Assert.assertEquals("Sender and receiver messages sent did not match up.", messagesSent, messagesReceived); - } - - /** - * Should provide a translation from the junit method name of a test to its test case name as defined in the - * interop testing specification. For example the method "testP2P" might map onto the interop test case name - * "TC2_BasicP2P". - * - * @param methodName The name of the JUnit test method. - * @return The name of the corresponding interop test case. - */ - public String getTestCaseNameForTestMethod(String methodName) - { - return "TC2_BasicP2P"; - } -} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase3BasicPubSub.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase3BasicPubSub.java deleted file mode 100644 index 702c240e9a..0000000000 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase3BasicPubSub.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.interop.coordinator.testcases; - -import java.util.HashMap; -import java.util.Map; - -import javax.jms.Message; - -import junit.framework.Assert; - -import org.apache.log4j.Logger; - -import org.apache.qpid.interop.coordinator.CoordinatingTestCase; - -/** - *

- *
CRC Card
Responsibilities Collaborations - *
Setup pub/sub test parameters and compare with test output. {@link CoordinatingTestCase} - *
- */ -public class CoordinatingTestCase3BasicPubSub extends CoordinatingTestCase -{ - /** Used for debugging. */ - private static final Logger log = Logger.getLogger(CoordinatingTestCase3BasicPubSub.class); - - /** - * Creates a new coordinating test case with the specified name. - * - * @param name The test case name. - */ - public CoordinatingTestCase3BasicPubSub(String name) - { - super(name); - } - - /** - * Performs the basic P2P test case, "Test Case 2" in the specification. - */ - public void testBasicPubSub() throws Exception - { - log.debug("public void testBasicPubSub(): called"); - - Map testConfig = new HashMap(); - testConfig.put("TEST_NAME", "TC3_BasicPubSub"); - testConfig.put("PUBSUB_KEY", "tc3route"); - testConfig.put("PUBSUB_NUM_MESSAGES", 10); - testConfig.put("PUBSUB_NUM_RECEIVERS", 5); - - Message[] reports = sequenceTest(testConfig); - - // Compare sender and receiver reports. - int messagesSent = reports[0].getIntProperty("MESSAGE_COUNT"); - int messagesReceived = reports[1].getIntProperty("MESSAGE_COUNT"); - - Assert.assertEquals("The requested number of messages were not sent.", 10, messagesSent); - Assert.assertEquals("Received messages did not match up to num sent * num receivers.", messagesSent * 5, - messagesReceived); - } - - /** - * Should provide a translation from the junit method name of a test to its test case name as defined in the - * interop testing specification. For example the method "testP2P" might map onto the interop test case name - * "TC2_BasicP2P". - * - * @param methodName The name of the JUnit test method. - * @return The name of the corresponding interop test case. - */ - public String getTestCaseNameForTestMethod(String methodName) - { - return "TC3_BasicPubSub"; - } -} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/InteropTestCase1DummyRun.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/InteropTestCase1DummyRun.java new file mode 100644 index 0000000000..b74a55d964 --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/InteropTestCase1DummyRun.java @@ -0,0 +1,89 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.interop.coordinator.testcases; + +import junit.framework.Assert; + +import org.apache.log4j.Logger; + +import org.apache.qpid.interop.coordinator.InteropTestCase; + +import javax.jms.Message; + +import java.util.HashMap; +import java.util.Map; + +/** + * Coordinates test case 1, from the interop test specification. This test connects up the sender and receiver roles, + * and gets some dummy test reports from them, in order to check that the test framework itself is operational. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Exercises the interop testing framework without actually sending any test messages. + * {@link org.apache.qpid.interop.coordinator.InteropTestCase} + *
+ */ +public class InteropTestCase1DummyRun extends InteropTestCase +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(InteropTestCase1DummyRun.class); + + /** + * Creates a new coordinating test case with the specified name. + * + * @param name The test case name. + */ + public InteropTestCase1DummyRun(String name) + { + super(name); + } + + /** + * Performs the basic P2P test case, "Test Case 2" in the specification. + * + * @throws Exception Any exceptions are allowed to fall through and fail the test. + */ + public void testDummyRun() throws Exception + { + log.debug("public void testDummyRun(): called"); + + Map testConfig = new HashMap(); + testConfig.put("TEST_NAME", "TC1_DummyRun"); + + Message[] reports = sequenceTest(testConfig); + + // Compare sender and receiver reports. + Assert.assertEquals("Expected to get 2 dummy reports.", 2, reports.length); + } + + /** + * Should provide a translation from the junit method name of a test to its test case name as defined in the + * interop testing specification. For example the method "testP2P" might map onto the interop test case name + * "TC2_BasicP2P". + * + * @param methodName The name of the JUnit test method. + * @return The name of the corresponding interop test case. + */ + public String getTestCaseNameForTestMethod(String methodName) + { + return "TC1_DummyRun"; + } +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/InteropTestCase2BasicP2P.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/InteropTestCase2BasicP2P.java new file mode 100644 index 0000000000..406b8b42a6 --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/InteropTestCase2BasicP2P.java @@ -0,0 +1,95 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.interop.coordinator.testcases; + +import junit.framework.Assert; + +import org.apache.log4j.Logger; + +import org.apache.qpid.interop.coordinator.InteropTestCase; + +import javax.jms.Message; + +import java.util.HashMap; +import java.util.Map; + +/** + * Implements test case 2, from the interop test specification. This test sets up the TC2_BasicP2P test for 50 + * messages. It checks that the sender and receiver reports both indicate that all the test messages were transmitted + * successfully. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Setup p2p test parameters and compare with test output. {@link InteropTestCase} + *
+ */ +public class InteropTestCase2BasicP2P extends InteropTestCase +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(InteropTestCase2BasicP2P.class); + + /** + * Creates a new coordinating test case with the specified name. + * + * @param name The test case name. + */ + public InteropTestCase2BasicP2P(String name) + { + super(name); + } + + /** + * Performs the basic P2P test case, "Test Case 2" in the specification. + * + * @throws Exception Any exceptions are allowed to fall through and fail the test. + */ + public void testBasicP2P() throws Exception + { + log.debug("public void testBasicP2P(): called"); + + Map testConfig = new HashMap(); + testConfig.put("TEST_NAME", "TC2_BasicP2P"); + testConfig.put("P2P_QUEUE_AND_KEY_NAME", "tc2queue"); + testConfig.put("P2P_NUM_MESSAGES", 50); + + Message[] reports = sequenceTest(testConfig); + + // Compare sender and receiver reports. + int messagesSent = reports[0].getIntProperty("MESSAGE_COUNT"); + int messagesReceived = reports[1].getIntProperty("MESSAGE_COUNT"); + + Assert.assertEquals("The requested number of messages were not sent.", 50, messagesSent); + Assert.assertEquals("Sender and receiver messages sent did not match up.", messagesSent, messagesReceived); + } + + /** + * Should provide a translation from the junit method name of a test to its test case name as defined in the + * interop testing specification. For example the method "testP2P" might map onto the interop test case name + * "TC2_BasicP2P". + * + * @param methodName The name of the JUnit test method. + * @return The name of the corresponding interop test case. + */ + public String getTestCaseNameForTestMethod(String methodName) + { + return "TC2_BasicP2P"; + } +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/InteropTestCase3BasicPubSub.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/InteropTestCase3BasicPubSub.java new file mode 100644 index 0000000000..ebb4cd764e --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/InteropTestCase3BasicPubSub.java @@ -0,0 +1,93 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.interop.coordinator.testcases; + +import junit.framework.Assert; + +import org.apache.log4j.Logger; + +import org.apache.qpid.interop.coordinator.InteropTestCase; + +import javax.jms.Message; + +import java.util.HashMap; +import java.util.Map; + +/** + *

+ *
CRC Card
Responsibilities Collaborations + *
Setup pub/sub test parameters and compare with test output. {@link InteropTestCase} + *
+ */ +public class InteropTestCase3BasicPubSub extends InteropTestCase +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(InteropTestCase3BasicPubSub.class); + + /** + * Creates a new coordinating test case with the specified name. + * + * @param name The test case name. + */ + public InteropTestCase3BasicPubSub(String name) + { + super(name); + } + + /** + * Performs the basic P2P test case, "Test Case 2" in the specification. + * + * @throws Exception Any exceptions are allowed to fall through and fail the test. + */ + public void testBasicPubSub() throws Exception + { + log.debug("public void testBasicPubSub(): called"); + + Map testConfig = new HashMap(); + testConfig.put("TEST_NAME", "TC3_BasicPubSub"); + testConfig.put("PUBSUB_KEY", "tc3route"); + testConfig.put("PUBSUB_NUM_MESSAGES", 10); + testConfig.put("PUBSUB_NUM_RECEIVERS", 5); + + Message[] reports = sequenceTest(testConfig); + + // Compare sender and receiver reports. + int messagesSent = reports[0].getIntProperty("MESSAGE_COUNT"); + int messagesReceived = reports[1].getIntProperty("MESSAGE_COUNT"); + + Assert.assertEquals("The requested number of messages were not sent.", 10, messagesSent); + Assert.assertEquals("Received messages did not match up to num sent * num receivers.", messagesSent * 5, + messagesReceived); + } + + /** + * Should provide a translation from the junit method name of a test to its test case name as defined in the + * interop testing specification. For example the method "testP2P" might map onto the interop test case name + * "TC2_BasicP2P". + * + * @param methodName The name of the JUnit test method. + * @return The name of the corresponding interop test case. + */ + public String getTestCaseNameForTestMethod(String methodName) + { + return "TC3_BasicPubSub"; + } +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java index 37952d08c8..87f09faf1e 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java @@ -44,7 +44,11 @@ public interface InteropClientTestCase extends MessageListener /** Defines the possible test case roles that an interop test case can take on. */ public enum Roles { - SENDER, RECEIVER; + /** Specifies the sender role. */ + SENDER, + + /** Specifies the receiver role. */ + RECEIVER } /** @@ -78,19 +82,12 @@ public interface InteropClientTestCase extends MessageListener public void assignRole(Roles role, Message assignRoleMessage) throws JMSException; /** - * Performs the test case actions. - * return from here when you have finished the test.. this will signal the controller that the test has ended. + * Performs the test case actions. Returning from here, indicates that the sending role has completed its test. + * * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through. */ public void start() throws JMSException; - /** - * Gives notice of termination of the test case actions. - * - * @throws JMSException Any JMSException resulting from allowed to fall through. - */ - public void terminate() throws JMSException, InterruptedException; - /** * Gets a report on the actions performed by the test case in its assigned role. * diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java index a904bfa419..baf8bc033d 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java @@ -21,42 +21,37 @@ package org.apache.qpid.interop.testclient; import org.apache.log4j.Logger; + import org.apache.qpid.interop.testclient.testcases.TestCase1DummyRun; import org.apache.qpid.interop.testclient.testcases.TestCase2BasicP2P; -import org.apache.qpid.util.CommandLineParser; -import org.apache.qpid.util.PropertiesUtils; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.naming.NamingException; -import java.io.IOException; +import org.apache.qpid.interop.testclient.testcases.TestCase3BasicPubSub; +import org.apache.qpid.test.framework.MessagingTestConfigProperties; +import org.apache.qpid.test.framework.TestUtils; + +import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; +import uk.co.thebadgerset.junit.extensions.util.TestContextProperties; + +import javax.jms.*; + import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.Properties; /** * Implements a test client as described in the interop testing spec * (http://cwiki.apache.org/confluence/display/qpid/Interop+Testing+Specification). A test client is an agent that * reacts to control message sequences send by the test {@link org.apache.qpid.interop.coordinator.Coordinator}. * - *

+ *

Messages Handled by TestClient
*
Messages Handled by SustainedTestClient
Message Action *
Invite(compulsory) Reply with Enlist. *
Invite(test case) Reply with Enlist if test case available. *
AssignRole(test case) Reply with Accept Role if matches an enlisted test. Keep test parameters. *
Start Send test messages defined by test parameters. Send report on messages sent. *
Status Request Send report on messages received. + *
Terminate Terminate the test client. *
* *

@@ -67,12 +62,11 @@ import java.util.Properties; */ public class TestClient implements MessageListener { + /** Used for debugging. */ private static Logger log = Logger.getLogger(TestClient.class); - public static final String CONNECTION_PROPERTY = "connectionfactory.broker"; - public static final String CONNECTION_NAME = "broker"; + /** Holds the default identifying name of the test client. */ public static final String CLIENT_NAME = "java"; - public static final String DEFAULT_CONNECTION_PROPS_RESOURCE = "org/apache/qpid/interop/connection.properties"; /** Holds the URL of the broker to run the tests on. */ public static String brokerUrl; @@ -80,17 +74,34 @@ public class TestClient implements MessageListener /** Holds the virtual host to run the tests on. If null, then the default virtual host is used. */ public static String virtualHost; + /** + * Holds the test context properties that provides the default test parameters, plus command line overrides. + * This is initialized with the default test parameters, to which command line overrides may be applied. + */ + public static ParsedProperties testContextProperties = + TestContextProperties.getInstance(MessagingTestConfigProperties.defaults); + /** Holds all the test cases loaded from the classpath. */ Map testCases = new HashMap(); + /** Holds the test case currently being run by this client. */ protected InteropClientTestCase currentTestCase; - protected Connection _connection; + /** Holds the connection to the broker that the test is being coordinated on. */ + protected Connection connection; + + /** Holds the message producer to hold the test coordination over. */ protected MessageProducer producer; + + /** Holds the JMS session for the test coordination. */ protected Session session; + /** Holds the name of this client, with a default value. */ protected String clientName = CLIENT_NAME; + /** This flag indicates that the test client should attempt to join the currently running test case on start up. */ + protected boolean join; + /** * Creates a new interop test client, listenting to the specified broker and virtual host, with the specified client * identifying name. @@ -99,15 +110,16 @@ public class TestClient implements MessageListener * @param virtualHost The virtual host to conect to. * @param clientName The client name to use. */ - public TestClient(String brokerUrl, String virtualHost, String clientName) + public TestClient(String brokerUrl, String virtualHost, String clientName, boolean join) { - log.debug("public TestClient(String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost - + ", String clientName = " + clientName + "): called"); + log.debug("public SustainedTestClient(String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost + + ", String clientName = " + clientName + "): called"); // Retain the connection parameters. this.brokerUrl = brokerUrl; this.virtualHost = virtualHost; this.clientName = clientName; + this.join = join; } /** @@ -124,49 +136,40 @@ public class TestClient implements MessageListener */ public static void main(String[] args) { - // Use the command line parser to evaluate the command line. - CommandLineParser commandLine = - new CommandLineParser( + // Override the default broker url to be localhost:5672. + testContextProperties.setProperty(MessagingTestConfigProperties.BROKER_PROPNAME, "tcp://localhost:5672"); + + // Use the command line parser to evaluate the command line with standard handling behaviour (print errors + // and usage then exist if there are errors). + // Any options and trailing name=value pairs are also injected into the test context properties object, + // to override any defaults that may have been set up. + ParsedProperties options = + new ParsedProperties(uk.co.thebadgerset.junit.extensions.util.CommandLineParser.processCommandLine(args, + new uk.co.thebadgerset.junit.extensions.util.CommandLineParser( new String[][] - { - {"b", "The broker URL.", "broker", "false"}, - {"h", "The virtual host to use.", "virtual host", "false"}, - {"n", "The test client name.", "name", "false"} - }); - - // Capture the command line arguments or display errors and correct usage and then exit. - Properties options = null; - - try - { - options = commandLine.parseCommandLine(args); - } - catch (IllegalArgumentException e) - { - System.out.println(commandLine.getErrors()); - System.out.println(commandLine.getUsage()); - System.exit(1); - } + { + { "b", "The broker URL.", "broker", "false" }, + { "h", "The virtual host to use.", "virtual host", "false" }, + { "o", "The name of the directory to output test timings to.", "dir", "false" }, + { "n", "The name of the test client.", "name", "false" }, + { "j", "Join this test client to running test.", "false" } + }), testContextProperties)); // Extract the command line options. String brokerUrl = options.getProperty("b"); String virtualHost = options.getProperty("h"); String clientName = options.getProperty("n"); - - // Add all the trailing command line options (name=value pairs) to system properties. Tests may pick up - // overridden values from there. - commandLine.addCommandLineToSysProperties(); + boolean join = options.getPropertyAsBoolean("j"); // Create a test client and start it running. - TestClient client = new TestClient(brokerUrl, virtualHost, (clientName == null) ? CLIENT_NAME : clientName); + TestClient client = new TestClient(brokerUrl, virtualHost, (clientName == null) ? CLIENT_NAME : clientName, join); // Use a class path scanner to find all the interop test case implementations. + // Hard code the test classes till the classpath scanner is fixed. Collection> testCaseClasses = - new ArrayList>(); + new ArrayList>(); // ClasspathScanner.getMatches(InteropClientTestCase.class, "^TestCase.*", true); - // Hard code the test classes till the classpath scanner is fixed. - Collections.addAll(testCaseClasses, - new Class[]{TestCase1DummyRun.class, TestCase2BasicP2P.class, TestClient.class}); + Collections.addAll(testCaseClasses, TestCase1DummyRun.class, TestCase2BasicP2P.class, TestCase3BasicPubSub.class); try { @@ -182,7 +185,10 @@ public class TestClient implements MessageListener /** * Starts the interop test client running. This causes it to start listening for incoming test invites. * - * @throws JMSException Any underlying JMSExceptions are allowed to fall through. @param testCaseClasses + * @param testCaseClasses The classes of the available test cases. The test case names from these are used to + * matchin incoming test invites against. + * + * @throws JMSException Any underlying JMSExceptions are allowed to fall through. */ protected void start(Collection> testCaseClasses) throws JMSException { @@ -209,84 +215,36 @@ public class TestClient implements MessageListener } // Open a connection to communicate with the coordinator on. - _connection = createConnection(DEFAULT_CONNECTION_PROPS_RESOURCE, clientName, brokerUrl, virtualHost); - - session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + connection = TestUtils.createConnection(testContextProperties); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Set this up to listen for control messages. - MessageConsumer consumer = session.createConsumer(session.createTopic("iop.control." + clientName)); + Topic privateControlTopic = session.createTopic("iop.control." + clientName); + MessageConsumer consumer = session.createConsumer(privateControlTopic); consumer.setMessageListener(this); - MessageConsumer consumer2 = session.createConsumer(session.createTopic("iop.control")); + Topic controlTopic = session.createTopic("iop.control"); + MessageConsumer consumer2 = session.createConsumer(controlTopic); consumer2.setMessageListener(this); // Create a producer to send replies with. producer = session.createProducer(null); - // Start listening for incoming control messages. - _connection.start(); - } - - - public static Connection createConnection(String connectionPropsResource, String brokerUrl, String virtualHost) - { - return createConnection(connectionPropsResource, "clientID", brokerUrl, virtualHost); - } - - /** - * Establishes a JMS connection using a properties file and qpids built in JNDI implementation. This is a simple - * convenience method for code that does anticipate handling connection failures. All exceptions that indicate that - * the connection has failed, are wrapped as rutime exceptions, preumably handled by a top level failure handler. - * - * @param connectionPropsResource The name of the connection properties file. - * @param clientID - * @param brokerUrl The broker url to connect to, null to use the default from the - * properties. - * @param virtualHost The virtual host to connectio to, null to use the default. - * - * @return A JMS conneciton. - * - * @todo Make username/password configurable. Allow multiple urls for fail over. Once it feels right, move it to a - * Utils library class. - */ - public static Connection createConnection(String connectionPropsResource, String clientID, String brokerUrl, String virtualHost) - { - log.debug("public static Connection createConnection(String connectionPropsResource = " + connectionPropsResource - + ", String brokerUrl = " + brokerUrl + ", String clientID = " + clientID - + ", String virtualHost = " + virtualHost + " ): called"); - - try + // If the join flag was set, then broadcast a join message to notify the coordinator that a new test client + // is available to join the current test case, if it supports it. This message may be ignored, or it may result + // in this test client receiving a test invite. + if (join) { - Properties connectionProps = - PropertiesUtils.getProperties(TestClient.class.getClassLoader().getResourceAsStream( - connectionPropsResource)); - - if (brokerUrl != null) - { - String connectionString = - "amqp://guest:guest@" + clientID + "/" + ((virtualHost != null) ? virtualHost : "") + "?brokerlist='" + brokerUrl + "'"; - connectionProps.setProperty(CONNECTION_PROPERTY, connectionString); - } - - Context ctx = new InitialContext(connectionProps); - - ConnectionFactory cf = (ConnectionFactory) ctx.lookup(CONNECTION_NAME); - Connection connection = cf.createConnection(); + Message joinMessage = session.createMessage(); - return connection; - } - catch (IOException e) - { - throw new RuntimeException(e); - } - catch (NamingException e) - { - throw new RuntimeException(e); - } - catch (JMSException e) - { - throw new RuntimeException(e); + joinMessage.setStringProperty("CONTROL_TYPE", "JOIN"); + joinMessage.setStringProperty("CLIENT_NAME", clientName); + joinMessage.setStringProperty("CLIENT_PRIVATE_CONTROL_KEY", "iop.control." + clientName); + producer.send(controlTopic, joinMessage); } + + // Start listening for incoming control messages. + connection.start(); } /** @@ -394,16 +352,8 @@ public class TestClient implements MessageListener { log.info("Received termination instruction from coordinator."); -// try -// { -// currentTestCase.terminate(); -// } -// catch (InterruptedException e) -// { -// // -// } // Is a cleaner shutdown needed? - _connection.close(); + connection.close(); System.exit(0); } else diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java index 5f257c0b36..9629e79b2c 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java @@ -43,8 +43,15 @@ import javax.jms.Session; */ public class TestCase1DummyRun implements InteropClientTestCase { + /** Used for debugging. */ private static final Logger log = Logger.getLogger(TestCase1DummyRun.class); + /** + * Should provide the name of the test case that this class implements. The exact names are defined in the + * interop testing spec. + * + * @return The name of the test case that this implements. + */ public String getName() { log.debug("public String getName(): called"); @@ -52,6 +59,15 @@ public class TestCase1DummyRun implements InteropClientTestCase return "TC1_DummyRun"; } + /** + * Determines whether the test invite that matched this test case is acceptable. + * + * @param inviteMessage The invitation to accept or reject. + * + * @return true to accept the invitation, false to reject it. + * + * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through. + */ public boolean acceptInvite(Message inviteMessage) throws JMSException { log.debug("public boolean acceptInvite(Message inviteMessage): called"); @@ -60,6 +76,15 @@ public class TestCase1DummyRun implements InteropClientTestCase return true; } + /** + * Assigns the role to be played by this test case. The test parameters are fully specified in the + * assignment message. When this method return the test case will be ready to execute. + * + * @param role The role to be played; sender or receiver. + * @param assignRoleMessage The role assingment message, contains the full test parameters. + * + * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through. + */ public void assignRole(Roles role, Message assignRoleMessage) throws JMSException { log.debug("public void assignRole(Roles role, Message assignRoleMessage): called"); @@ -67,6 +92,9 @@ public class TestCase1DummyRun implements InteropClientTestCase // Do nothing, both roles are the same. } + /** + * Performs the test case actions. Returning from here, indicates that the sending role has completed its test. + */ public void start() { log.debug("public void start(): called"); @@ -74,11 +102,15 @@ public class TestCase1DummyRun implements InteropClientTestCase // Do nothing. } - public void terminate() throws JMSException - { - //todo - } - + /** + * Gets a report on the actions performed by the test case in its assigned role. + * + * @param session The session to create the report message in. + * + * @return The report message. + * + * @throws JMSException Any JMSExceptions resulting from creating the report are allowed to fall through. + */ public Message getReport(Session session) throws JMSException { log.debug("public Message getReport(Session session): called"); @@ -87,6 +119,11 @@ public class TestCase1DummyRun implements InteropClientTestCase return session.createTextMessage("Dummy Run, Ok."); } + /** + * Handles incoming test messages. Does nothing. + * + * @param message The incoming test message. + */ public void onMessage(Message message) { log.debug("public void onMessage(Message message = " + message + "): called"); diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java index ff56ee9b93..c93d1ab828 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java @@ -20,12 +20,13 @@ */ package org.apache.qpid.interop.testclient.testcases; -import javax.jms.*; - import org.apache.log4j.Logger; import org.apache.qpid.interop.testclient.InteropClientTestCase; import org.apache.qpid.interop.testclient.TestClient; +import org.apache.qpid.test.framework.TestUtils; + +import javax.jms.*; /** * Implements test case 2, basic P2P. Sends/received a specified number of messages to a specified route on the @@ -54,9 +55,6 @@ public class TestCase2BasicP2P implements InteropClientTestCase /** The number of test messages to send. */ private int numMessages; - /** The routing key to send them to on the default direct exchange. */ - private Destination sendDestination; - /** The connection to send the test messages on. */ private Connection connection; @@ -118,14 +116,12 @@ public class TestCase2BasicP2P implements InteropClientTestCase this.role = role; // Create a new connection to pass the test messages on. - connection = - TestClient.createConnection(TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, TestClient.brokerUrl, - TestClient.virtualHost); + connection = TestUtils.createConnection(TestClient.testContextProperties); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Extract and retain the test parameters. numMessages = assignRoleMessage.getIntProperty("P2P_NUM_MESSAGES"); - sendDestination = session.createQueue(assignRoleMessage.getStringProperty("P2P_QUEUE_AND_KEY_NAME")); + Destination sendDestination = session.createQueue(assignRoleMessage.getStringProperty("P2P_QUEUE_AND_KEY_NAME")); log.debug("numMessages = " + numMessages); log.debug("sendDestination = " + sendDestination); @@ -149,7 +145,9 @@ public class TestCase2BasicP2P implements InteropClientTestCase } /** - * Performs the test case actions. + * Performs the test case actions. Returning from here, indicates that the sending role has completed its test. + * + * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through. */ public void start() throws JMSException { @@ -170,11 +168,6 @@ public class TestCase2BasicP2P implements InteropClientTestCase } } - public void terminate() throws JMSException - { - //todo - } - /** * Gets a report on the actions performed by the test case in its assigned role. * diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java index 7b35142c82..57e8634006 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java @@ -18,14 +18,15 @@ * under the License. * */ - package org.apache.qpid.interop.testclient.testcases; -import javax.jms.*; - import org.apache.log4j.Logger; import org.apache.qpid.interop.testclient.InteropClientTestCase; +import org.apache.qpid.interop.testclient.TestClient; +import org.apache.qpid.test.framework.TestUtils; + +import javax.jms.*; /** * Implements test case 3, basic pub/sub. Sends/received a specified number of messages to a specified route on the @@ -55,12 +56,6 @@ public class TestCase3BasicPubSub implements InteropClientTestCase /** The number of test messages to send. */ private int numMessages; - /** The number of receiver connection to use. */ - private int numReceivers; - - /** The routing key to send them to on the default direct exchange. */ - private Destination sendDestination; - /** The connections to send/receive the test messages on. */ private Connection[] connection; @@ -123,7 +118,7 @@ public class TestCase3BasicPubSub implements InteropClientTestCase // Extract and retain the test parameters. numMessages = assignRoleMessage.getIntProperty("PUBSUB_NUM_MESSAGES"); - numReceivers = assignRoleMessage.getIntProperty("PUBSUB_NUM_RECEIVERS"); + int numReceivers = assignRoleMessage.getIntProperty("PUBSUB_NUM_RECEIVERS"); String sendKey = assignRoleMessage.getStringProperty("PUBSUB_KEY"); log.debug("numMessages = " + numMessages); @@ -139,13 +134,11 @@ public class TestCase3BasicPubSub implements InteropClientTestCase connection = new Connection[1]; session = new Session[1]; - connection[0] = - org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, org.apache.qpid.interop.testclient.TestClient.brokerUrl, - org.apache.qpid.interop.testclient.TestClient.virtualHost); + connection[0] = TestUtils.createConnection(TestClient.testContextProperties); session[0] = connection[0].createSession(false, Session.AUTO_ACKNOWLEDGE); // Extract and retain the test parameters. - sendDestination = session[0].createTopic(sendKey); + Destination sendDestination = session[0].createTopic(sendKey); producer = session[0].createProducer(sendDestination); break; @@ -159,9 +152,7 @@ public class TestCase3BasicPubSub implements InteropClientTestCase for (int i = 0; i < numReceivers; i++) { - connection[i] = - org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, org.apache.qpid.interop.testclient.TestClient.brokerUrl, - org.apache.qpid.interop.testclient.TestClient.virtualHost); + connection[i] = TestUtils.createConnection(TestClient.testContextProperties); session[i] = connection[i].createSession(false, Session.AUTO_ACKNOWLEDGE); sendDestination = session[i].createTopic(sendKey); @@ -174,14 +165,16 @@ public class TestCase3BasicPubSub implements InteropClientTestCase } // Start all the connection dispatcher threads running. - for (int i = 0; i < connection.length; i++) + for (Connection conn : connection) { - connection[i].start(); + conn.start(); } } /** - * Performs the test case actions. + * Performs the test case actions. Returning from here, indicates that the sending role has completed its test. + * + * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through. */ public void start() throws JMSException { @@ -202,11 +195,6 @@ public class TestCase3BasicPubSub implements InteropClientTestCase } } - public void terminate() throws JMSException, InterruptedException - { - //todo - } - /** * Gets a report on the actions performed by the test case in its assigned role. * @@ -221,9 +209,9 @@ public class TestCase3BasicPubSub implements InteropClientTestCase log.debug("public Message getReport(Session session): called"); // Close the test connections. - for (int i = 0; i < connection.length; i++) + for (Connection conn : connection) { - connection[i].close(); + conn.close(); } // Generate a report message containing the count of the number of messages passed. diff --git a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedClientTestCase.java b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedClientTestCase.java new file mode 100644 index 0000000000..71ab38ec0a --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedClientTestCase.java @@ -0,0 +1,905 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.sustained; + +import org.apache.log4j.Logger; + +import org.apache.qpid.client.AMQNoConsumersException; +import org.apache.qpid.client.AMQNoRouteException; +import org.apache.qpid.interop.testclient.TestClient; +import org.apache.qpid.interop.testclient.testcases.TestCase3BasicPubSub; +import org.apache.qpid.test.framework.TestUtils; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +/** + * Implements test case 3, basic pub/sub. Sends/received a specified number of messages to a specified route on the + * default topic exchange, using the specified number of receiver connections. Produces reports on the actual number of + * messages sent/received. + * + *

CRC Card
+ *
CRC Card
Responsibilities Collaborations + *
Supply the name of the test case that this implements. + *
Accept/Reject invites based on test parameters. + *
Adapt to assigned roles. + *
Send required number of test messages using pub/sub.
Generate test reports. + *
+ */ +public class SustainedClientTestCase extends TestCase3BasicPubSub implements ExceptionListener +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(SustainedClientTestCase.class); + + /** Used to log to the console. */ + private static final Logger console = Logger.getLogger("SustainedTest"); + + /** The role to be played by the test. */ + private Roles role; + + /** The number of receiver connection to use. */ + private int numReceivers; + + /** The routing key to send them to on the default direct exchange. */ + private Destination sendDestination; + + /** The routing key to send updates to on the default direct exchange. */ + private Destination sendUpdateDestination; + + /** The connections to send/receive the test messages on. */ + private Connection[] connection; + + /** The sessions to send/receive the test messages on. */ + private Session[] session; + + /** The producer to send the test messages with. */ + MessageProducer producer; + + /** Adapter that adjusts the send rate based on the updates from clients. */ + SustainedRateAdapter _rateAdapter; + + /** */ + int _batchSize; + + private static final long TEN_MILLI_SEC = 10000000; + private static final int DEBUG_LOG_UPATE_INTERVAL = 10; + private static final int LOG_UPATE_INTERVAL = 10; + private static final boolean SLEEP_PER_MESSAGE = Boolean.getBoolean("sleepPerMessage"); + + /** + * Should provide the name of the test case that this class implements. The exact names are defined in the interop + * testing spec. + * + * @return The name of the test case that this implements. + */ + public String getName() + { + log.debug("public String getName(): called"); + + return "Perf_SustainedPubSub"; + } + + /** + * Assigns the role to be played by this test case. The test parameters are fully specified in the assignment + * message. When this method return the test case will be ready to execute. + * + * @param role The role to be played; sender or receiver. + * @param assignRoleMessage The role assingment message, contains the full test parameters. + * + * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through. + */ + public void assignRole(Roles role, Message assignRoleMessage) throws JMSException + { + log.debug("public void assignRole(Roles role = " + role + ", Message assignRoleMessage = " + assignRoleMessage + + "): called"); + + // Take note of the role to be played. + this.role = role; + + // Extract and retain the test parameters. + numReceivers = assignRoleMessage.getIntProperty("SUSTAINED_NUM_RECEIVERS"); + _batchSize = assignRoleMessage.getIntProperty("SUSTAINED_UPDATE_INTERVAL"); + String sendKey = assignRoleMessage.getStringProperty("SUSTAINED_KEY"); + String sendUpdateKey = assignRoleMessage.getStringProperty("SUSTAINED_UPDATE_KEY"); + int ackMode = assignRoleMessage.getIntProperty("ACKNOWLEDGE_MODE"); + String clientName = assignRoleMessage.getStringProperty("CLIENT_NAME"); + + if (log.isDebugEnabled()) + { + log.debug("numReceivers = " + numReceivers); + log.debug("_batchSize = " + _batchSize); + log.debug("ackMode = " + ackMode); + log.debug("sendKey = " + sendKey); + log.debug("sendUpdateKey = " + sendUpdateKey); + log.debug("role = " + role); + } + + switch (role) + { + // Check if the sender role is being assigned, and set up a single message producer if so. + case SENDER: + console.info("Creating Sender"); + // Create a new connection to pass the test messages on. + connection = new Connection[1]; + session = new Session[1]; + + connection[0] = TestUtils.createConnection(TestClient.testContextProperties); + session[0] = connection[0].createSession(false, ackMode); + + // Extract and retain the test parameters. + sendDestination = session[0].createTopic(sendKey); + + connection[0].setExceptionListener(this); + + producer = session[0].createProducer(sendDestination); + + sendUpdateDestination = session[0].createTopic(sendUpdateKey); + MessageConsumer updateConsumer = session[0].createConsumer(sendUpdateDestination); + + _rateAdapter = new SustainedRateAdapter(this); + updateConsumer.setMessageListener(_rateAdapter); + + break; + + // Otherwise the receiver role is being assigned, so set this up to listen for messages on the required number + // of receiver connections. + case RECEIVER: + console.info("Creating Receiver"); + // Create the required number of receiver connections. + connection = new Connection[numReceivers]; + session = new Session[numReceivers]; + + for (int i = 0; i < numReceivers; i++) + { + connection[i] = TestUtils.createConnection(TestClient.testContextProperties); + session[i] = connection[i].createSession(false, ackMode); + + sendDestination = session[i].createTopic(sendKey); + + sendUpdateDestination = session[i].createTopic(sendUpdateKey); + + MessageConsumer consumer = session[i].createConsumer(sendDestination); + + consumer.setMessageListener(new SustainedListener(clientName + "-" + i, _batchSize, session[i], + sendUpdateDestination)); + } + + break; + } + + // Start all the connection dispatcher threads running. + for (int i = 0; i < connection.length; i++) + { + connection[i].start(); + } + } + + /** Performs the test case actions. */ + public void start() throws JMSException + { + log.debug("public void start(): called"); + + // Check that the sender role is being performed. + switch (role) + { + // Check if the sender role is being assigned, and set up a single message producer if so. + case SENDER: + _rateAdapter.run(); + break; + case RECEIVER: + + } + + // return from here when you have finished the test.. this will signal the controller and + } + + public void terminate() throws JMSException, InterruptedException + { + if (_rateAdapter != null) + { + _rateAdapter.stop(); + } + } + + /** + * Gets a report on the actions performed by the test case in its assigned role. + * + * @param session The session to create the report message in. + * + * @return The report message. + * + * @throws JMSException Any JMSExceptions resulting from creating the report are allowed to fall through. + */ + public Message getReport(Session session) throws JMSException + { + log.debug("public Message getReport(Session session): called"); + + // Close the test connections. + for (int i = 0; i < connection.length; i++) + { + connection[i].close(); + } + + Message report = session.createMessage(); + report.setStringProperty("CONTROL_TYPE", "REPORT"); + + return report; + } + + public void onException(JMSException jmsException) + { + Exception linked = jmsException.getLinkedException(); + + if (linked != null) + { + if (log.isDebugEnabled()) + { + log.debug("Linked Exception:" + linked); + } + + if ((linked instanceof AMQNoRouteException) || (linked instanceof AMQNoConsumersException)) + { + if (log.isDebugEnabled()) + { + if (linked instanceof AMQNoConsumersException) + { + log.warn("No clients currently available for message:" + + ((AMQNoConsumersException) linked).getUndeliveredMessage()); + } + else + { + log.warn("No route for message"); + } + } + + // Tell the rate adapter that there are no clients ready yet + _rateAdapter.NO_CLIENTS = true; + } + } + else + { + log.warn("Exception:" + linked); + } + } + + /** + * Inner class that listens for messages and sends a report for the time taken between receiving the 'start' and + * 'end' messages. + */ + class SustainedListener implements MessageListener + { + /** Number of messages received */ + private long _received = 0; + /** The number of messages in the batch */ + private int _batchSize = 0; + /** Record of the when the 'start' messagse was sen */ + private Long _startTime; + /** Message producer to use to send reports */ + MessageProducer _updater; + /** Session to create the report message on */ + Session _session; + /** Record of the client ID used for this SustainedListnener */ + String _client; + + /** + * Main Constructor + * + * @param clientname The _client id used to identify this connection. + * @param batchSize The number of messages that are to be sent per batch. Note: This is not used to + * control the interval between sending reports. + * @param session The session used for communication. + * @param sendDestination The destination that update reports should be sent to. + * + * @throws JMSException My occur if creatingthe Producer fails + */ + public SustainedListener(String clientname, int batchSize, Session session, Destination sendDestination) + throws JMSException + { + _batchSize = batchSize; + _client = clientname; + _session = session; + _updater = session.createProducer(sendDestination); + } + + public void onMessage(Message message) + { + if (log.isTraceEnabled()) + { + log.trace("Message " + _received + "received in listener"); + } + + if (message instanceof TextMessage) + { + try + { + _received++; + if (((TextMessage) message).getText().equals("start")) + { + log.debug("Starting Batch"); + _startTime = System.nanoTime(); + } + else if (((TextMessage) message).getText().equals("end")) + { + if (_startTime != null) + { + long currentTime = System.nanoTime(); + sendStatus(currentTime - _startTime, _received, message.getIntProperty("BATCH")); + log.debug("End Batch"); + } + } + } + catch (JMSException e) + { + // ignore error + } + } + + } + + /** + * sendStatus creates and sends the report back to the publisher + * + * @param time taken for the the last batch + * @param received Total number of messages received. + * @param batchNumber the batch number + * @throws JMSException if an error occurs during the send + */ + private void sendStatus(long time, long received, int batchNumber) throws JMSException + { + Message updateMessage = _session.createTextMessage("update"); + updateMessage.setStringProperty("CLIENT_ID", ":" + _client); + updateMessage.setStringProperty("CONTROL_TYPE", "UPDATE"); + updateMessage.setLongProperty("RECEIVED", received); + updateMessage.setIntProperty("BATCH", batchNumber); + updateMessage.setLongProperty("DURATION", time); + + if (log.isInfoEnabled()) + { + log.info("**** SENDING [" + batchNumber + "]**** " + "CLIENT_ID:" + _client + " RECEIVED:" + received + + " BATCH:" + batchNumber + " DURATION:" + time); + } + + // Output on the main console.info the details of this batch + if ((batchNumber % 10) == 0) + { + console.info("Sending Report [" + batchNumber + "] " + "CLIENT_ID:" + _client + " RECEIVED:" + received + + " BATCH:" + batchNumber + " DURATION:" + time); + } + + _updater.send(updateMessage); + } + } + + /** + * This class is used here to adjust the _delay value which in turn is used to control the number of messages/second + * that are sent through the test system. + * + * By keeping a record of the messages recevied and the average time taken to process the batch size can be + * calculated and so the delay can be adjusted to maintain that rate. + * + * Given that delays of < 10ms can be rounded up the delay is only used between messages if the _delay > 10ms * no + * messages in the batch. Otherwise the delay is used at the end of the batch. + */ + class SustainedRateAdapter implements MessageListener, Runnable + { + private SustainedClientTestCase _client; + private long _batchVariance = Integer.getInteger("batchVariance", 3); // no. batches to allow drifting + private long _timeVariance = TEN_MILLI_SEC * 5; // no. nanos between send and report delay (10ms) + private volatile long _delay; // in nanos + private long _sent; + private Map _slowClients = new HashMap(); + private static final long PAUSE_SLEEP = TEN_MILLI_SEC / 1000; // 10 ms + private static final long NO_CLIENT_SLEEP = 1000; // 1s + private volatile boolean NO_CLIENTS = true; + private int _delayShifting; + private final int REPORTS_WITHOUT_CHANGE = Integer.getInteger("stableReportCount", 5); + private boolean _warmedup = false; + private static final long EXPECTED_TIME_PER_BATCH = 100000L; + private int _warmUpBatches = Integer.getInteger("warmUpBatches", 10); + + SustainedRateAdapter(SustainedClientTestCase client) + { + _client = client; + } + + public void onMessage(Message message) + { + if (log.isDebugEnabled()) + { + log.debug("SustainedRateAdapter onMessage(Message message = " + message + "): called"); + } + + try + { + String controlType = message.getStringProperty("CONTROL_TYPE"); + + // Check if the message is a test invite. + if ("UPDATE".equals(controlType)) + { + NO_CLIENTS = false; + long duration = message.getLongProperty("DURATION"); + long totalReceived = message.getLongProperty("RECEIVED"); + String client = message.getStringProperty("CLIENT_ID"); + int batchNumber = message.getIntProperty("BATCH"); + + if (log.isInfoEnabled() && ((batchNumber % DEBUG_LOG_UPATE_INTERVAL) == 0)) + { + log.info("Update Report: CLIENT_ID:" + client + " RECEIVED:" + totalReceived + " Recevied BATCH:" + + batchNumber + " DURATION:" + duration); + } + + recordSlow(client, totalReceived, batchNumber); + + adjustDelay(client, batchNumber, duration); + + // Warm up completes when: + // we haven't warmed up + // and the number of batches sent to each client is at least half of the required warmup batches + if (!_warmedup && (batchNumber >= _warmUpBatches)) + { + _warmedup = true; + _warmup.countDown(); + + } + } + } + catch (JMSException e) + { + // + } + } + + CountDownLatch _warmup = new CountDownLatch(1); + + int _numBatches = 10000; + + // long[] _timings = new long[_numBatches]; + private boolean _running = true; + + public void run() + { + console.info("Warming up"); + + doBatch(_warmUpBatches); + + try + { + // wait for warmup to complete. + _warmup.await(); + + // set delay to the average length of the batches + _delay = _totalDuration / _warmUpBatches / delays.size(); + + console.info("Warmup complete delay set : " + _delay + " based on _totalDuration: " + _totalDuration + + " over no. batches: " + _warmUpBatches + " with client count: " + delays.size()); + + _totalDuration = 0L; + _totalReceived = 0L; + _sent = 0L; + } + catch (InterruptedException e) + { + // + } + + doBatch(_numBatches); + + } + + private void doBatch(int batchSize) // long[] timings, + { + TextMessage testMessage = null; + try + { + testMessage = _client.session[0].createTextMessage("start"); + + for (int batch = 0; batch <= batchSize; batch++) + // while (_running) + { + long start = System.nanoTime(); + + testMessage.setText("start"); + testMessage.setIntProperty("BATCH", batch); + + _client.producer.send(testMessage); + _rateAdapter.sentMessage(); + + testMessage.setText("test"); + // start at 2 so start and end count as part of batch + for (int m = 2; m < _batchSize; m++) + { + _client.producer.send(testMessage); + _rateAdapter.sentMessage(); + } + + testMessage.setText("end"); + _client.producer.send(testMessage); + _rateAdapter.sentMessage(); + + long end = System.nanoTime(); + + long sendtime = end - start; + + if (log.isDebugEnabled()) + { + log.info("Sent batch[" + batch + "](" + _batchSize + ") in " + sendtime); // timings[batch]); + } + + if ((batch % LOG_UPATE_INTERVAL) == 0) + { + console.info("Sent Batch[" + batch + "](" + _batchSize + ")" + status()); + } + + _rateAdapter.sleepBatch(); + + } + } + catch (JMSException e) + { + console.error("Runner ended"); + } + } + + private String status() + { + return " TotalDuration: " + _totalDuration + " for " + delays.size() + " consumers" + " Delay is " + _delay + + " resulting in " + + ((_delay > (TEN_MILLI_SEC * _batchSize)) ? ((_delay / _batchSize) + "/msg") : (_delay + "/batch")); + } + + private void sleepBatch() + { + if (checkForSlowClients()) + { // if there werwe slow clients we have already slept so don't sleep anymore again. + return; + } + + if (!SLEEP_PER_MESSAGE) + { + // per batch sleep.. if sleep is to small to spread over the batch. + if (_delay <= (TEN_MILLI_SEC * _batchSize)) + { + sleepLong(_delay); + } + else + { + log.info("Not sleeping _delay > ten*batch is:" + _delay); + } + } + } + + public void stop() + { + _running = false; + } + + Map delays = new HashMap(); + Long _totalReceived = 0L; + Long _totalDuration = 0L; + int _skipUpdate = 0; + + /** + * Adjust the delay for sending messages based on this update from the client + * + * @param client The client that send this update + * @param duration The time taken for the last batch of messagse + * @param batchNumber The reported batchnumber from the client + */ + private void adjustDelay(String client, int batchNumber, long duration) + { + // Retrieve the current total time taken for this client. + Long currentTime = delays.get(client); + + // Add the new duration time to this client + if (currentTime == null) + { + currentTime = duration; + } + else + { + currentTime += duration; + } + + delays.put(client, currentTime); + + long batchesSent = _sent / _batchSize; + + // ensure we don't divide by zero + if (batchesSent == 0) + { + batchesSent = 1L; + } + + _totalReceived += _batchSize; + _totalDuration += duration; + + // calculate average duration accross clients per batch + long averageDuration = _totalDuration / delays.size() / batchesSent; + + // calculate the difference between current send delay and average report delay + long diff = (duration) - averageDuration; + + if (log.isInfoEnabled() && ((batchNumber % DEBUG_LOG_UPATE_INTERVAL) == 0)) + { + log.info("TotalDuration:" + _totalDuration + " for " + delays.size() + " consumers." + " on batch: " + + batchesSent + " received batch: " + batchNumber + " Batch Duration: " + duration + " Average: " + + averageDuration + " so diff: " + diff + " for : " + client + " Delay is " + _delay + " resulting in " + + ((_delay > (TEN_MILLI_SEC * _batchSize)) ? ((_delay / _batchSize) + "/msg") : (_delay + "/batch"))); + } + + // if the averageDuration differs from the current by more than the specified variane then adjust delay. + if (Math.abs(diff) > _timeVariance) + { + + // if the the _delay is larger than the required duration to send report + // speed up + if (diff > TEN_MILLI_SEC) + { + _delay -= TEN_MILLI_SEC; + + if (_delay < 0) + { + _delay = 0; + log.info("Reset _delay to 0"); + delayStable(); + } + else + { + delayChanged(); + } + + } + else if (diff < 0) // diff < 0 diff cannot be 0 as it is > _timeVariance + { + // the report took longer + _delay += TEN_MILLI_SEC; + delayChanged(); + } + } + else + { + delayStable(); + } + + // If we have a consumer that is behind with the batches. + if ((batchesSent - batchNumber) > _batchVariance) + { + log.debug("Increasing _delay as sending more than receiving"); + + _delay += 2 * TEN_MILLI_SEC; + delayChanged(); + } + + } + + /** Reset the number of iterations before we say the delay has stabilised. */ + private void delayChanged() + { + _delayShifting = REPORTS_WITHOUT_CHANGE; + } + + /** + * Record the fact that delay has stabilised If delay has stablised for REPORTS_WITHOUT_CHANGE then it will + * output Delay stabilised + */ + private void delayStable() + { + _delayShifting--; + + if (_delayShifting < 0) + { + _delayShifting = 0; + console.debug("Delay stabilised:" + _delay); + } + } + + /** + * Checks that the client has received enough messages. If the client has fallen behind then they are put in the + * _slowClients lists which will increase the delay. + * + * @param client The client identifier to check + * @param received the number of messages received by that client + * @param batchNumber + */ + private void recordSlow(String client, long received, int batchNumber) + { + if (Math.abs(batchNumber - (_sent / _batchSize)) > _batchVariance) + { + _slowClients.put(client, received); + } + else + { + _slowClients.remove(client); + } + } + + /** Incrment the number of sent messages and then sleep, if required. */ + public void sentMessage() + { + + _sent++; + + if (_delay > (TEN_MILLI_SEC * _batchSize)) + { + long batchDelay = _delay / _batchSize; + // less than 10ms sleep doesn't always work. + // _delay is in nano seconds + // if (batchDelay < (TEN_MILLI_SEC)) + // { + // sleep(0, (int) batchDelay); + // } + // else + { + // if (batchDelay < 30000000000L) + { + sleepLong(batchDelay); + } + } + } + else + { + if (SLEEP_PER_MESSAGE && (_delay > 0)) + { + sleepLong(_delay / _batchSize); + } + } + } + + /** + * Check at the end of each batch and pause sending messages to allow slow clients to catch up. + * + * @return true if there were slow clients that caught up. + */ + private boolean checkForSlowClients() + { + // This will allways be true as we are running this at the end of each batchSize + // if (_sent % _batchSize == 0) + { + // Cause test to pause when we have slow + if (!_slowClients.isEmpty() || NO_CLIENTS) + { + + while (!_slowClients.isEmpty()) + { + if (log.isInfoEnabled() && ((_sent / _batchSize % DEBUG_LOG_UPATE_INTERVAL) == 0)) + { + String clients = ""; + Iterator it = _slowClients.keySet().iterator(); + while (it.hasNext()) + { + clients += it.next(); + if (it.hasNext()) + { + clients += ", "; + } + } + + log.info("Pausing for slow clients:" + clients); + } + + if (console.isDebugEnabled() && ((_sent / _batchSize % LOG_UPATE_INTERVAL) == 0)) + { + console.debug(_slowClients.size() + " slow clients."); + } + + sleep(PAUSE_SLEEP); + } + + if (NO_CLIENTS) + { + sleep(NO_CLIENT_SLEEP); + } + + log.debug("Continuing"); + + return true; + } + else + { + if ((_sent / _batchSize % LOG_UPATE_INTERVAL) == 0) + { + console.info("Total Delay :" + _delay + " " + + ((_delayShifting == 0) ? "Stablised" : ("Not Stablised(" + _delayShifting + ")"))); + } + } + + } + + return false; + } + + /** + * Sleep normally takes micro-seconds this allows the use of a nano-second value. + * + * @param delay nanoseconds to sleep for. + */ + private void sleepLong(long delay) + { + sleep(delay / 1000000, (int) (delay % 1000000)); + } + + /** + * Sleep for the specified micro-seconds. + * @param sleep microseconds to sleep for. + */ + private void sleep(long sleep) + { + sleep(sleep, 0); + } + + /** + * Perform the sleep , swallowing any InteruptException. + * + * NOTE: If a sleep request is > 10s then reset only sleep for 5s + * + * @param milli to sleep for + * @param nano sub miliseconds to sleep for + */ + private void sleep(long milli, int nano) + { + try + { + log.debug("Sleep:" + milli + ":" + nano); + if (milli > 10000) + { + + if (_delay == milli) + { + _totalDuration = _totalReceived / _batchSize * EXPECTED_TIME_PER_BATCH; + log.error("Sleeping for more than 10 seconds adjusted to 5s!:" + (milli / 1000) + + "s. Reset _totalDuration:" + _totalDuration); + } + else + { + log.error("Sleeping for more than 10 seconds adjusted to 5s!:" + (milli / 1000) + "s"); + } + + milli = 5000; + } + + Thread.sleep(milli, nano); + } + catch (InterruptedException e) + { + // + } + } + + public void setClient(SustainedClientTestCase client) + { + _client = client; + } + } + +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCase.java b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCase.java new file mode 100644 index 0000000000..3dd1326d80 --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCase.java @@ -0,0 +1,125 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.sustained; + +import org.apache.log4j.Logger; + +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.interop.coordinator.DropInTest; +import org.apache.qpid.interop.coordinator.TestClientDetails; +import org.apache.qpid.interop.coordinator.FanOutTestCase; + +import javax.jms.JMSException; +import javax.jms.Message; + +import java.util.HashMap; +import java.util.Map; + +/** + * SustainedTestCase is a {@link FanOutTestCase} that runs the "Perf_SustainedPubSub" test case. This consists of one + * test client sending, and several receiving, and attempts to find the highest rate at which messages can be broadcast + * to the receivers. It is also a {@link DropInTest} to which more test clients may be added during a test run. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
+ *
+ */ +public class SustainedTestCase extends FanOutTestCase implements DropInTest +{ + /** Used for debugging. */ + Logger log = Logger.getLogger(SustainedTestCase.class); + + /** Holds the root name of the topic on which to send the test messages. */ + private static final String SUSTAINED_KEY = "Perf_SustainedPubSub"; + + /** + * Creates a new coordinating test case with the specified name. + * + * @param name The test case name. + */ + public SustainedTestCase(String name) + { + super(name); + } + + /** + * Performs a single test run of the sustained test. + * + * @throws Exception Any exceptions are allowed to fall through and fail the test. + */ + public void testBasicPubSub() throws Exception + { + log.debug("public void testSinglePubSubCycle(): called"); + + Map testConfig = new HashMap(); + testConfig.put("TEST_NAME", "Perf_SustainedPubSub"); + testConfig.put("SUSTAINED_KEY", SUSTAINED_KEY); + testConfig.put("SUSTAINED_NUM_RECEIVERS", Integer.getInteger("numReceives", 2)); + testConfig.put("SUSTAINED_UPDATE_INTERVAL", Integer.getInteger("batchSize", 1000)); + testConfig.put("SUSTAINED_UPDATE_KEY", SUSTAINED_KEY + ".UPDATE"); + testConfig.put("ACKNOWLEDGE_MODE", Integer.getInteger("ackMode", AMQSession.AUTO_ACKNOWLEDGE)); + + log.info("Created Config: " + testConfig.entrySet().toArray()); + + sequenceTest(testConfig); + } + + /** + * Accepts a late joining client into this test case. The client will be enlisted with a control message + * with the 'CONTROL_TYPE' field set to the value 'LATEJOIN'. It should also provide values for the fields: + * + *

+ *
CLIENT_NAME A unique name for the new client. + *
CLIENT_PRIVATE_CONTROL_KEY The key for the route on which the client receives its control messages. + *
+ * + * @param message The late joiners join message. + * + * @throws JMSException Any JMS Exception are allowed to fall through, indicating that the join failed. + */ + public void lateJoin(Message message) throws JMSException + { + // Extract the joining clients details from its join request message. + TestClientDetails clientDetails = new TestClientDetails(); + clientDetails.clientName = message.getStringProperty("CLIENT_NAME"); + clientDetails.privateControlKey = message.getStringProperty("CLIENT_PRIVATE_CONTROL_KEY"); + + // Register the joining client, but do block for confirmation as cannot do a synchronous receiver during this + // method call, as it may have been called from an 'onMessage' method. + assignReceiverRole(clientDetails, new HashMap(), false); + } + + /** + * Should provide a translation from the junit method name of a test to its test case name as known to the test + * clients that will run the test. The purpose of this is to convert the JUnit method name into the correct test + * case name to place into the test invite. For example the method "testP2P" might map onto the interop test case + * name "TC2_BasicP2P". + * + * @param methodName The name of the JUnit test method. + * + * @return The name of the corresponding interop test case. + */ + public String getTestCaseNameForTestMethod(String methodName) + { + return "Perf_SustainedPubSub"; + } +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java deleted file mode 100644 index 79707bafa5..0000000000 --- a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java +++ /dev/null @@ -1,931 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.sustained; - -import org.apache.log4j.Logger; -import org.apache.qpid.client.AMQNoConsumersException; -import org.apache.qpid.client.AMQNoRouteException; -import org.apache.qpid.interop.testclient.testcases.TestCase3BasicPubSub; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.CountDownLatch; - -/** - * Implements test case 3, basic pub/sub. Sends/received a specified number of messages to a specified route on the - * default topic exchange, using the specified number of receiver connections. Produces reports on the actual number of - * messages sent/received. - * - *

CRC Card
Responsibilities Collaborations
Supply the name - * of the test case that this implements.
Accept/Reject invites based on test parameters.
Adapt to - * assigned roles.
Send required number of test messages using pub/sub.
Generate test reports. - *
- */ -public class SustainedTestClient extends TestCase3BasicPubSub implements ExceptionListener -{ - /** Used for debugging. */ - private static final Logger debugLog = Logger.getLogger(SustainedTestClient.class); - - private static final Logger log = Logger.getLogger("SustainedTest"); - - - /** The role to be played by the test. */ - private Roles role; - - /** The number of test messages to send. */ -// private int numMessages; - - /** The number of receiver connection to use. */ - private int numReceivers; - - /** The routing key to send them to on the default direct exchange. */ - private Destination sendDestination; - - /** The routing key to send updates to on the default direct exchange. */ - private Destination sendUpdateDestination; - - - /** The connections to send/receive the test messages on. */ - private Connection[] connection; - - /** The sessions to send/receive the test messages on. */ - private Session[] session; - - /** The producer to send the test messages with. */ - MessageProducer producer; - - /** Adapter that adjusts the send rate based on the updates from clients. */ - SustainedRateAdapter _rateAdapter; - - /** */ - int _batchSize; - - - private static final long TEN_MILLI_SEC = 10000000; - private static final int DEBUG_LOG_UPATE_INTERVAL = 10; - private static final int LOG_UPATE_INTERVAL = 10; - private static final boolean SLEEP_PER_MESSAGE = Boolean.getBoolean("sleepPerMessage"); - - /** - * Should provide the name of the test case that this class implements. The exact names are defined in the interop - * testing spec. - * - * @return The name of the test case that this implements. - */ - public String getName() - { - debugLog.debug("public String getName(): called"); - - return "Perf_SustainedPubSub"; - } - - /** - * Assigns the role to be played by this test case. The test parameters are fully specified in the assignment - * message. When this method return the test case will be ready to execute. - * - * @param role The role to be played; sender or receiver. - * @param assignRoleMessage The role assingment message, contains the full test parameters. - * - * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through. - */ - public void assignRole(Roles role, Message assignRoleMessage) throws JMSException - { - debugLog.debug("public void assignRole(Roles role = " + role + ", Message assignRoleMessage = " + assignRoleMessage - + "): called"); - - // Take note of the role to be played. - this.role = role; - - // Extract and retain the test parameters. - numReceivers = assignRoleMessage.getIntProperty("SUSTAINED_NUM_RECEIVERS"); - _batchSize = assignRoleMessage.getIntProperty("SUSTAINED_UPDATE_INTERVAL"); - String sendKey = assignRoleMessage.getStringProperty("SUSTAINED_KEY"); - String sendUpdateKey = assignRoleMessage.getStringProperty("SUSTAINED_UPDATE_KEY"); - int ackMode = assignRoleMessage.getIntProperty("ACKNOWLEDGE_MODE"); - String clientName = assignRoleMessage.getStringProperty("CLIENT_NAME"); - - if (debugLog.isDebugEnabled()) - { - debugLog.debug("numReceivers = " + numReceivers); - debugLog.debug("_batchSize = " + _batchSize); - debugLog.debug("ackMode = " + ackMode); - debugLog.debug("sendKey = " + sendKey); - debugLog.debug("sendUpdateKey = " + sendUpdateKey); - debugLog.debug("role = " + role); - } - - switch (role) - { - // Check if the sender role is being assigned, and set up a single message producer if so. - case SENDER: - log.info("Creating Sender"); - // Create a new connection to pass the test messages on. - connection = new Connection[1]; - session = new Session[1]; - - connection[0] = - org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, - clientName, - org.apache.qpid.interop.testclient.TestClient.brokerUrl, - org.apache.qpid.interop.testclient.TestClient.virtualHost); - session[0] = connection[0].createSession(false, ackMode); - - // Extract and retain the test parameters. - sendDestination = session[0].createTopic(sendKey); - - connection[0].setExceptionListener(this); - - producer = session[0].createProducer(sendDestination); - - sendUpdateDestination = session[0].createTopic(sendUpdateKey); - MessageConsumer updateConsumer = session[0].createConsumer(sendUpdateDestination); - - _rateAdapter = new SustainedRateAdapter(this); - updateConsumer.setMessageListener(_rateAdapter); - - - break; - - // Otherwise the receiver role is being assigned, so set this up to listen for messages on the required number - // of receiver connections. - case RECEIVER: - log.info("Creating Receiver"); - // Create the required number of receiver connections. - connection = new Connection[numReceivers]; - session = new Session[numReceivers]; - - for (int i = 0; i < numReceivers; i++) - { - connection[i] = - org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, - clientName, - org.apache.qpid.interop.testclient.TestClient.brokerUrl, - org.apache.qpid.interop.testclient.TestClient.virtualHost); - session[i] = connection[i].createSession(false, ackMode); - - sendDestination = session[i].createTopic(sendKey); - - sendUpdateDestination = session[i].createTopic(sendUpdateKey); - - MessageConsumer consumer = session[i].createConsumer(sendDestination); - - consumer.setMessageListener(new SustainedListener(clientName + "-" + i, _batchSize, session[i], sendUpdateDestination)); - } - - break; - } - - // Start all the connection dispatcher threads running. - for (int i = 0; i < connection.length; i++) - { - connection[i].start(); - } - } - - - /** Performs the test case actions. */ - public void start() throws JMSException - { - debugLog.debug("public void start(): called"); - - // Check that the sender role is being performed. - switch (role) - { - // Check if the sender role is being assigned, and set up a single message producer if so. - case SENDER: - _rateAdapter.run(); - break; - case RECEIVER: - - } - - //return from here when you have finished the test.. this will signal the controller and - } - - public void terminate() throws JMSException, InterruptedException - { - if (_rateAdapter != null) - { - _rateAdapter.stop(); - } - } - - /** - * Gets a report on the actions performed by the test case in its assigned role. - * - * @param session The session to create the report message in. - * - * @return The report message. - * - * @throws JMSException Any JMSExceptions resulting from creating the report are allowed to fall through. - */ - public Message getReport(Session session) throws JMSException - { - debugLog.debug("public Message getReport(Session session): called"); - - // Close the test connections. - for (int i = 0; i < connection.length; i++) - { - connection[i].close(); - } - - Message report = session.createMessage(); - report.setStringProperty("CONTROL_TYPE", "REPORT"); - - return report; - } - - public void onException(JMSException jmsException) - { - Exception linked = jmsException.getLinkedException(); - - if (linked != null) - { - if (debugLog.isDebugEnabled()) - { - debugLog.debug("Linked Exception:" + linked); - } - if ((linked instanceof AMQNoRouteException) - || (linked instanceof AMQNoConsumersException)) - { - if (debugLog.isDebugEnabled()) - { - if (linked instanceof AMQNoConsumersException) - { - debugLog.warn("No clients currently available for message:" + ((AMQNoConsumersException) linked).getUndeliveredMessage()); - } - else - { - debugLog.warn("No route for message"); - } - } - - // Tell the rate adapter that there are no clients ready yet - _rateAdapter.NO_CLIENTS = true; - } - } - else - { - debugLog.warn("Exception:" + linked); - } - } - - /** - * Inner class that listens for messages and sends a report for the time taken between receiving the 'start' and - * 'end' messages. - */ - class SustainedListener implements MessageListener - { - /** Number of messages received */ - private long _received = 0; - /** The number of messages in the batch */ - private int _batchSize = 0; - /** Record of the when the 'start' messagse was sen */ - private Long _startTime; - /** Message producer to use to send reports */ - MessageProducer _updater; - /** Session to create the report message on */ - Session _session; - /** Record of the client ID used for this SustainedListnener */ - String _client; - - - /** - * Main Constructor - * - * @param clientname The _client id used to identify this connection. - * @param batchSize The number of messages that are to be sent per batch. Note: This is not used to - * control the interval between sending reports. - * @param session The session used for communication. - * @param sendDestination The destination that update reports should be sent to. - * - * @throws JMSException My occur if creatingthe Producer fails - */ - public SustainedListener(String clientname, int batchSize, Session session, Destination sendDestination) throws JMSException - { - _batchSize = batchSize; - _client = clientname; - _session = session; - _updater = session.createProducer(sendDestination); - } - - public void onMessage(Message message) - { - if (debugLog.isTraceEnabled()) - { - debugLog.trace("Message " + _received + "received in listener"); - } - - - if (message instanceof TextMessage) - { - try - { - _received++; - if (((TextMessage) message).getText().equals("start")) - { - debugLog.debug("Starting Batch"); - _startTime = System.nanoTime(); - } - else if (((TextMessage) message).getText().equals("end")) - { - if (_startTime != null) - { - long currentTime = System.nanoTime(); - sendStatus(currentTime - _startTime, _received, message.getIntProperty("BATCH")); - debugLog.debug("End Batch"); - } - } - } - catch (JMSException e) - { - //ignore error - } - } - - } - - /** - * sendStatus creates and sends the report back to the publisher - * - * @param time taken for the the last batch - * @param received Total number of messages received. - * @param batchNumber the batch number - * @throws JMSException if an error occurs during the send - */ - private void sendStatus(long time, long received, int batchNumber) throws JMSException - { - Message updateMessage = _session.createTextMessage("update"); - updateMessage.setStringProperty("CLIENT_ID", ":" + _client); - updateMessage.setStringProperty("CONTROL_TYPE", "UPDATE"); - updateMessage.setLongProperty("RECEIVED", received); - updateMessage.setIntProperty("BATCH", batchNumber); - updateMessage.setLongProperty("DURATION", time); - - if (debugLog.isInfoEnabled()) - { - debugLog.info("**** SENDING [" + batchNumber + "]**** " - + "CLIENT_ID:" + _client + " RECEIVED:" + received - + " BATCH:" + batchNumber + " DURATION:" + time); - } - - // Output on the main log.info the details of this batch - if (batchNumber % 10 == 0) - { - log.info("Sending Report [" + batchNumber + "] " - + "CLIENT_ID:" + _client + " RECEIVED:" + received - + " BATCH:" + batchNumber + " DURATION:" + time); - } - - _updater.send(updateMessage); - } - } - - - /** - * This class is used here to adjust the _delay value which in turn is used to control the number of messages/second - * that are sent through the test system. - * - * By keeping a record of the messages recevied and the average time taken to process the batch size can be - * calculated and so the delay can be adjusted to maintain that rate. - * - * Given that delays of < 10ms can be rounded up the delay is only used between messages if the _delay > 10ms * no - * messages in the batch. Otherwise the delay is used at the end of the batch. - */ - class SustainedRateAdapter implements MessageListener, Runnable - { - private SustainedTestClient _client; - private long _batchVariance = Integer.getInteger("batchVariance", 3); //no. batches to allow drifting - private long _timeVariance = TEN_MILLI_SEC * 5; // no. nanos between send and report delay (10ms) - private volatile long _delay; //in nanos - private long _sent; - private Map _slowClients = new HashMap(); - private static final long PAUSE_SLEEP = TEN_MILLI_SEC / 1000; // 10 ms - private static final long NO_CLIENT_SLEEP = 1000; // 1s - private volatile boolean NO_CLIENTS = true; - private int _delayShifting; - private final int REPORTS_WITHOUT_CHANGE = Integer.getInteger("stableReportCount", 5); - private boolean _warmedup = false; - private static final long EXPECTED_TIME_PER_BATCH = 100000L; - private int _warmUpBatches = Integer.getInteger("warmUpBatches", 10); - - - SustainedRateAdapter(SustainedTestClient client) - { - _client = client; - } - - public void onMessage(Message message) - { - if (debugLog.isDebugEnabled()) - { - debugLog.debug("SustainedRateAdapter onMessage(Message message = " + message + "): called"); - } - - try - { - String controlType = message.getStringProperty("CONTROL_TYPE"); - - // Check if the message is a test invite. - if ("UPDATE".equals(controlType)) - { - NO_CLIENTS = false; - long duration = message.getLongProperty("DURATION"); - long totalReceived = message.getLongProperty("RECEIVED"); - String client = message.getStringProperty("CLIENT_ID"); - int batchNumber = message.getIntProperty("BATCH"); - - if (debugLog.isInfoEnabled() && batchNumber % DEBUG_LOG_UPATE_INTERVAL == 0) - { - debugLog.info("Update Report: CLIENT_ID:" + client + " RECEIVED:" + totalReceived - + " Recevied BATCH:" + batchNumber + " DURATION:" + duration); - } - - recordSlow(client, totalReceived, batchNumber); - - adjustDelay(client, batchNumber, duration); - - // Warm up completes when: - // we haven't warmed up - // and the number of batches sent to each client is at least half of the required warmup batches - if (!_warmedup - && (batchNumber >= _warmUpBatches)) - { - _warmedup = true; - _warmup.countDown(); - - } - } - } - catch (JMSException e) - { - // - } - } - - CountDownLatch _warmup = new CountDownLatch(1); - - int _numBatches = 10000; - - // long[] _timings = new long[_numBatches]; - private boolean _running = true; - - - public void run() - { - log.info("Warming up"); - - doBatch(_warmUpBatches); - - try - { - //wait for warmup to complete. - _warmup.await(); - - //set delay to the average length of the batches - _delay = _totalDuration / _warmUpBatches / delays.size(); - - log.info("Warmup complete delay set : " + _delay - + " based on _totalDuration: " + _totalDuration - + " over no. batches: " + _warmUpBatches - + " with client count: " + delays.size()); - - _totalDuration = 0L; - _totalReceived = 0L; - _sent = 0L; - } - catch (InterruptedException e) - { - // - } - - - doBatch(_numBatches); - - } - - private void doBatch(int batchSize) // long[] timings, - { - TextMessage testMessage = null; - try - { - testMessage = _client.session[0].createTextMessage("start"); - - - for (int batch = 0; batch <= batchSize; batch++) -// while (_running) - { - long start = System.nanoTime(); - - testMessage.setText("start"); - testMessage.setIntProperty("BATCH", batch); - - _client.producer.send(testMessage); - _rateAdapter.sentMessage(); - - testMessage.setText("test"); - //start at 2 so start and end count as part of batch - for (int m = 2; m < _batchSize; m++) - { - _client.producer.send(testMessage); - _rateAdapter.sentMessage(); - } - - testMessage.setText("end"); - _client.producer.send(testMessage); - _rateAdapter.sentMessage(); - - long end = System.nanoTime(); - - long sendtime = end - start; - - if (debugLog.isDebugEnabled()) - { - debugLog.info("Sent batch[" + batch + "](" + _batchSize + ") in " + sendtime);//timings[batch]); - } - - if (batch % LOG_UPATE_INTERVAL == 0) - { - log.info("Sent Batch[" + batch + "](" + _batchSize + ")" + status()); - } - - _rateAdapter.sleepBatch(); - - } - } - catch (JMSException e) - { - log.error("Runner ended"); - } - } - - private String status() - { - return " TotalDuration: " + _totalDuration + " for " + delays.size() + " consumers" - + " Delay is " + _delay + " resulting in " - + ((_delay > TEN_MILLI_SEC * _batchSize) ? (_delay / _batchSize) + "/msg" : _delay + "/batch"); - } - - private void sleepBatch() - { - if (checkForSlowClients()) - {//if there werwe slow clients we have already slept so don't sleep anymore again. - return; - } - - if (!SLEEP_PER_MESSAGE) - { - //per batch sleep.. if sleep is to small to spread over the batch. - if (_delay <= TEN_MILLI_SEC * _batchSize) - { - sleepLong(_delay); - } - else - { - debugLog.info("Not sleeping _delay > ten*batch is:" + _delay); - } - } - } - - public void stop() - { - _running = false; - } - - Map delays = new HashMap(); - Long _totalReceived = 0L; - Long _totalDuration = 0L; - int _skipUpdate = 0; - - /** - * Adjust the delay for sending messages based on this update from the client - * - * @param client The client that send this update - * @param duration The time taken for the last batch of messagse - * @param batchNumber The reported batchnumber from the client - */ - private void adjustDelay(String client, int batchNumber, long duration) - { - //Retrieve the current total time taken for this client. - Long currentTime = delays.get(client); - - // Add the new duration time to this client - if (currentTime == null) - { - currentTime = duration; - } - else - { - currentTime += duration; - } - - delays.put(client, currentTime); - - long batchesSent = _sent / _batchSize; - - // ensure we don't divide by zero - if (batchesSent == 0) - { - batchesSent = 1L; - } - - _totalReceived += _batchSize; - _totalDuration += duration; - - //calculate average duration accross clients per batch - long averageDuration = _totalDuration / delays.size() / batchesSent; - - //calculate the difference between current send delay and average report delay - long diff = (duration) - averageDuration; - - if (debugLog.isInfoEnabled() && batchNumber % DEBUG_LOG_UPATE_INTERVAL == 0) - { - debugLog.info("TotalDuration:" + _totalDuration + " for " + delays.size() + " consumers." - + " on batch: " + batchesSent - + " received batch: " + batchNumber - + " Batch Duration: " + duration - + " Average: " + averageDuration - + " so diff: " + diff + " for : " + client - + " Delay is " + _delay + " resulting in " - + ((_delay > TEN_MILLI_SEC * _batchSize) - ? (_delay / _batchSize) + "/msg" : _delay + "/batch")); - } - - //if the averageDuration differs from the current by more than the specified variane then adjust delay. - if (Math.abs(diff) > _timeVariance) - { - - // if the the _delay is larger than the required duration to send report - // speed up - if (diff > TEN_MILLI_SEC) - { - _delay -= TEN_MILLI_SEC; - - if (_delay < 0) - { - _delay = 0; - debugLog.info("Reset _delay to 0"); - delayStable(); - } - else - { - delayChanged(); - } - - } - else if (diff < 0) // diff < 0 diff cannot be 0 as it is > _timeVariance - { - // the report took longer - _delay += TEN_MILLI_SEC; - delayChanged(); - } - } - else - { - delayStable(); - } - - // If we have a consumer that is behind with the batches. - if (batchesSent - batchNumber > _batchVariance) - { - debugLog.debug("Increasing _delay as sending more than receiving"); - - _delay += 2 * TEN_MILLI_SEC; - delayChanged(); - } - - - } - - /** Reset the number of iterations before we say the delay has stabilised. */ - private void delayChanged() - { - _delayShifting = REPORTS_WITHOUT_CHANGE; - } - - /** - * Record the fact that delay has stabilised If delay has stablised for REPORTS_WITHOUT_CHANGE then it will - * output Delay stabilised - */ - private void delayStable() - { - _delayShifting--; - - if (_delayShifting < 0) - { - _delayShifting = 0; - log.debug("Delay stabilised:" + _delay); - } - } - - /** - * Checks that the client has received enough messages. If the client has fallen behind then they are put in the - * _slowClients lists which will increase the delay. - * - * @param client The client identifier to check - * @param received the number of messages received by that client - * @param batchNumber - */ - private void recordSlow(String client, long received, int batchNumber) - { - if (Math.abs(batchNumber - (_sent / _batchSize)) > _batchVariance) - { - _slowClients.put(client, received); - } - else - { - _slowClients.remove(client); - } - } - - /** Incrment the number of sent messages and then sleep, if required. */ - public void sentMessage() - { - - _sent++; - - if (_delay > TEN_MILLI_SEC * _batchSize) - { - long batchDelay = _delay / _batchSize; - // less than 10ms sleep doesn't always work. - // _delay is in nano seconds -// if (batchDelay < (TEN_MILLI_SEC)) -// { -// sleep(0, (int) batchDelay); -// } -// else - { -// if (batchDelay < 30000000000L) - { - sleepLong(batchDelay); - } - } - } - else - { - if (SLEEP_PER_MESSAGE && (_delay > 0)) - { - sleepLong(_delay / _batchSize); - } - } - } - - - /** - * Check at the end of each batch and pause sending messages to allow slow clients to catch up. - * - * @return true if there were slow clients that caught up. - */ - private boolean checkForSlowClients() - { - // This will allways be true as we are running this at the end of each batchSize -// if (_sent % _batchSize == 0) - { - // Cause test to pause when we have slow - if (!_slowClients.isEmpty() || NO_CLIENTS) - { - - - while (!_slowClients.isEmpty()) - { - if (debugLog.isInfoEnabled() - && _sent / _batchSize % DEBUG_LOG_UPATE_INTERVAL == 0) - { - String clients = ""; - Iterator it = _slowClients.keySet().iterator(); - while (it.hasNext()) - { - clients += it.next(); - if (it.hasNext()) - { - clients += ", "; - } - } - debugLog.info("Pausing for slow clients:" + clients); - } - - - if (log.isDebugEnabled() - && _sent / _batchSize % LOG_UPATE_INTERVAL == 0) - { - log.debug(_slowClients.size() + " slow clients."); - } - sleep(PAUSE_SLEEP); - } - - if (NO_CLIENTS) - { - sleep(NO_CLIENT_SLEEP); - } - - debugLog.debug("Continuing"); - return true; - } - else - { - if (_sent / _batchSize % LOG_UPATE_INTERVAL == 0) - { - log.info("Total Delay :" + _delay + " " - + (_delayShifting == 0 ? "Stablised" : "Not Stablised(" + _delayShifting + ")")); - } - } - - } - - return false; - } - - /** - * Sleep normally takes micro-seconds this allows the use of a nano-second value. - * - * @param delay nanoseconds to sleep for. - */ - private void sleepLong(long delay) - { - sleep(delay / 1000000, (int) (delay % 1000000)); - } - - /** - * Sleep for the specified micro-seconds. - * @param sleep microseconds to sleep for. - */ - private void sleep(long sleep) - { - sleep(sleep, 0); - } - - /** - * Perform the sleep , swallowing any InteruptException. - * - * NOTE: If a sleep request is > 10s then reset only sleep for 5s - * - * @param milli to sleep for - * @param nano sub miliseconds to sleep for - */ - private void sleep(long milli, int nano) - { - try - { - debugLog.debug("Sleep:" + milli + ":" + nano); - if (milli > 10000) - { - - if (_delay == milli) - { - _totalDuration = _totalReceived / _batchSize * EXPECTED_TIME_PER_BATCH; - debugLog.error("Sleeping for more than 10 seconds adjusted to 5s!:" + milli / 1000 + "s. Reset _totalDuration:" + _totalDuration); - } - else - { - debugLog.error("Sleeping for more than 10 seconds adjusted to 5s!:" + milli / 1000 + "s"); - } - - milli = 5000; - } - - Thread.sleep(milli, nano); - } - catch (InterruptedException e) - { - // - } - } - - public void setClient(SustainedTestClient client) - { - _client = client; - } - } - -} - diff --git a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java deleted file mode 100644 index 0075e45a8c..0000000000 --- a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java +++ /dev/null @@ -1,222 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.sustained; - -import org.apache.log4j.Logger; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.interop.coordinator.ListeningCoordinatorTest; -import org.apache.qpid.interop.coordinator.TestClientDetails; -import org.apache.qpid.interop.coordinator.testcases.CoordinatingTestCase3BasicPubSub; -import org.apache.qpid.util.ConversationFactory; - -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.Session; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -public class SustainedTestCoordinator extends CoordinatingTestCase3BasicPubSub implements ListeningCoordinatorTest -{ - /** Used for debugging. */ - private static final Logger log = Logger.getLogger(SustainedTestCoordinator.class); - private List _receivers; - private static final String SUSTAINED_KEY = "Perf_SustainedPubSub"; - Map _testProperties; - - /** - * Creates a new coordinating test case with the specified name. - * - * @param name The test case name. - */ - public SustainedTestCoordinator(String name) - { - super(name); - _receivers = new LinkedList(); - } - - /** - * Adds a receiver to this test. - * - * @param receiver The contact details of the sending client in the test. - */ - public void setReceiver(TestClientDetails receiver) - { - _receivers.add(receiver); - } - - - /** - * Performs the a single test run - * - * @throws Exception if there was a problem running the test. - */ - public void testBasicPubSub() throws Exception - { - log.debug("public void testSinglePubSubCycle(): called"); - - Map testConfig = new HashMap(); - testConfig.put("TEST_NAME", "Perf_SustainedPubSub"); - testConfig.put("SUSTAINED_KEY", SUSTAINED_KEY); - testConfig.put("SUSTAINED_NUM_RECEIVERS", Integer.getInteger("numReceives", 2)); - testConfig.put("SUSTAINED_UPDATE_INTERVAL", Integer.getInteger("batchSize", 1000)); - testConfig.put("SUSTAINED_UPDATE_KEY", SUSTAINED_KEY + ".UPDATE"); - testConfig.put("ACKNOWLEDGE_MODE", Integer.getInteger("ackMode", AMQSession.AUTO_ACKNOWLEDGE)); - - log.info("Created Config: " + testConfig.entrySet().toArray()); - - sequenceTest(testConfig); - } - - /** - * Holds a test coordinating conversation with the test clients. This is the basic implementation of the inner loop - * of Use Case 5. It consists of assigning the test roles, begining the test and gathering the test reports from the - * participants. - * - * @param testProperties The test case definition. - * - * @return The test results from the senders and receivers. - * - * @throws javax.jms.JMSException All underlying JMSExceptions are allowed to fall through. - */ - protected Message[] sequenceTest(Map testProperties) throws JMSException - { - log.debug("protected Message[] sequenceTest(Object... testProperties = " + testProperties + "): called"); - - Session session = conversationFactory.getSession(); - Destination senderControlTopic = session.createTopic(sender.privateControlKey); - - ConversationFactory.Conversation senderConversation = conversationFactory.startConversation(); - - // Assign the sender role to the sending test client. - Message assignSender = conversationFactory.getSession().createMessage(); - setPropertiesOnMessage(assignSender, testProperties); - assignSender.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE"); - assignSender.setStringProperty("ROLE", "SENDER"); - assignSender.setStringProperty("CLIENT_NAME", "Sustained_SENDER"); - - senderConversation.send(senderControlTopic, assignSender); - - //Assign and wait for the receiver ckuebts to be ready. - _testProperties = testProperties; - - // Wait for the senders to confirm their roles. - senderConversation.receive(); - - assignReceivers(); - - // Start the test. - Message start = session.createMessage(); - start.setStringProperty("CONTROL_TYPE", "START"); - - senderConversation.send(senderControlTopic, start); - - // Wait for the test sender to return its report. - Message senderReport = senderConversation.receive(); - - try - { - Thread.sleep(500); - } - catch (InterruptedException e) - { - } - - // Ask the receiver for its report. - Message statusRequest = session.createMessage(); - statusRequest.setStringProperty("CONTROL_TYPE", "STATUS_REQUEST"); - - - return new Message[]{senderReport}; - } - - private void assignReceivers() - { - for (TestClientDetails receiver : _receivers) - { - registerReceiver(receiver); - } - } - - private void registerReceiver(TestClientDetails receiver) - { - log.info("registerReceiver called for receiver:" + receiver); - try - { - Session session = conversationFactory.getSession(); - Destination receiverControlTopic = session.createTopic(receiver.privateControlKey); - ConversationFactory.Conversation receiverConversation = conversationFactory.startConversation(); - // Assign the receiver role the receiving client. - Message assignReceiver = session.createMessage(); - setPropertiesOnMessage(assignReceiver, _testProperties); - assignReceiver.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE"); - assignReceiver.setStringProperty("ROLE", "RECEIVER"); - assignReceiver.setStringProperty("CLIENT_NAME", "Sustained_RECEIVER_" + receiver.clientName); - - receiverConversation.send(receiverControlTopic, assignReceiver); - - //Don't wait for receiver to be ready.... we can't this is being done in - // the dispatcher thread, and most likely the acceptance message we - // want is sitting in the Dispatcher._queue waiting its turn for being - // dispatched so if we block here we won't can't get the message. - // So assume consumer is ready for action. - //receiverConversation.receive(); - } - catch (JMSException e) - { - log.warn("Unable to assign receiver:" + receiver + ". Due to:" + e.getMessage()); - } - } - - public void latejoin(Message message) - { - try - { - - TestClientDetails clientDetails = new TestClientDetails(); - clientDetails.clientName = message.getStringProperty("CLIENT_NAME"); - clientDetails.privateControlKey = message.getStringProperty("CLIENT_PRIVATE_CONTROL_KEY"); - - - registerReceiver(clientDetails); - } - catch (JMSException e) - { - //swallow - } - } - - /** - * Should provide a translation from the junit method name of a test to its test case name as defined in the interop - * testing specification. For example the method "testP2P" might map onto the interop test case name - * "TC2_BasicP2P". - * - * @param methodName The name of the JUnit test method. - * - * @return The name of the corresponding interop test case. - */ - public String getTestCaseNameForTestMethod(String methodName) - { - return "Perf_SustainedPubSub"; - } -} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestClient.java b/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestClient.java deleted file mode 100644 index 44fc090410..0000000000 --- a/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestClient.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.sustained; - -import org.apache.log4j.Logger; -import org.apache.qpid.interop.testclient.InteropClientTestCase; -import org.apache.qpid.util.CommandLineParser; - -import javax.jms.JMSException; -import javax.jms.Message; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Properties; - -public class TestClient extends org.apache.qpid.interop.testclient.TestClient -{ - private static Logger log = Logger.getLogger(TestClient.class); - - /** - * Creates a new interop test client, listenting to the specified broker and virtual host, with the specified client - * identifying name. - * - * @param brokerUrl The url of the broker to connect to. - * @param virtualHost The virtual host to conect to. - * @param clientName The client name to use. - */ - public TestClient(String brokerUrl, String virtualHost, String clientName) - { - super(brokerUrl, virtualHost, clientName); - } - - /** - * The entry point for the interop test coordinator. This client accepts the following command line arguments: - * - *

-b The broker URL. Optional.
-h The virtual - * host. Optional.
-n The test client name. Optional.
name=value - * Trailing argument define name/value pairs. Added to system properties. Optional.
- * - * @param args The command line arguments. - */ - public static void main(String[] args) - { - // Use the command line parser to evaluate the command line. - CommandLineParser commandLine = - new CommandLineParser( - new String[][] - { - {"b", "The broker URL.", "broker", "false"}, - {"h", "The virtual host to use.", "virtual host", "false"}, - {"n", "The test client name.", "name", "false"}, - {"j", "Join this test client to running test.", "join", ""} - }); - - // Capture the command line arguments or display errors and correct usage and then exit. - Properties options = null; - - try - { - options = commandLine.parseCommandLine(args); - } - catch (IllegalArgumentException e) - { - System.out.println(commandLine.getErrors()); - System.out.println(commandLine.getUsage()); - System.exit(1); - } - - // Extract the command line options. - String brokerUrl = options.getProperty("b"); - String virtualHost = options.getProperty("h"); - String clientName = options.getProperty("n"); - String join = options.getProperty("j"); - - // Add all the trailing command line options (name=value pairs) to system properties. Tests may pick up - // overridden values from there. - commandLine.addCommandLineToSysProperties(); - - // Create a test client and start it running. - TestClient client = new TestClient(brokerUrl, virtualHost, (clientName == null) ? CLIENT_NAME : clientName); - - // Use a class path scanner to find all the interop test case implementations. - Collection> testCaseClasses = - new ArrayList>(); - // ClasspathScanner.getMatches(InteropClientTestCase.class, "^TestCase.*", true); - // Hard code the test classes till the classpath scanner is fixed. - Collections.addAll(testCaseClasses, - SustainedTestClient.class); - - - try - { - client.start(testCaseClasses, join); - } - catch (Exception e) - { - log.error("The test client was unable to start.", e); - System.exit(1); - } - } - - protected void start(Collection> testCaseClasses, String join) throws JMSException, ClassNotFoundException - { - super.start(testCaseClasses); - log.debug("private void start(): called"); - - if (join != null && !join.equals("")) - { - Message latejoin = session.createMessage(); - - try - { - Object test = Class.forName(join).newInstance(); - if (test instanceof InteropClientTestCase) - { - currentTestCase = (InteropClientTestCase) test; - } - else - { - throw new RuntimeException("Requested to join class '" + join + "' but this is not a InteropClientTestCase."); - } - - latejoin.setStringProperty("CONTROL_TYPE", "LATEJOIN"); - latejoin.setStringProperty("CLIENT_NAME", clientName); - latejoin.setStringProperty("CLIENT_PRIVATE_CONTROL_KEY", "iop.control." + clientName); - producer.send(session.createTopic("iop.control.test." + currentTestCase.getName()), latejoin); - } - catch (InstantiationException e) - { - log.warn("Unable to request latejoining of test:" + currentTestCase); - } - catch (IllegalAccessException e) - { - log.warn("Unable to request latejoining of test:" + currentTestCase); - } - } - } - -} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestCoordinator.java b/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestCoordinator.java deleted file mode 100644 index 7e12fe39fb..0000000000 --- a/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestCoordinator.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.sustained; - -import org.apache.qpid.interop.coordinator.Coordinator; -import org.apache.qpid.interop.coordinator.ListeningTestDecorator; -import org.apache.qpid.interop.coordinator.TestClientDetails; -import org.apache.qpid.util.CommandLineParser; -import org.apache.qpid.util.ConversationFactory; -import org.apache.log4j.Logger; - -import java.util.Properties; -import java.util.Set; - -import junit.framework.TestResult; -import uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator; - -import javax.jms.Connection; - -public class TestCoordinator extends Coordinator -{ - - private static final Logger log = Logger.getLogger(TestCoordinator.class); - - /** - * Creates an interop test coordinator on the specified broker and virtual host. - * - * @param brokerUrl The URL of the broker to connect to. - * @param virtualHost The virtual host to run all tests on. Optional, may be null. - */ - TestCoordinator(String brokerUrl, String virtualHost) - { - super(brokerUrl, virtualHost); - } - - protected WrappedSuiteTestDecorator newTestDecorator(WrappedSuiteTestDecorator targetTest, Set enlistedClients, ConversationFactory conversationFactory, Connection connection) - { - return new ListeningTestDecorator(targetTest, enlistedClients, conversationFactory, connection); - } - - - /** - * The entry point for the interop test coordinator. This client accepts the following command line arguments: - * - *

-b The broker URL. Mandatory.
-h The virtual host. - * Optional.
name=value Trailing argument define name/value pairs. Added to system properties. - * Optional.
- * - * @param args The command line arguments. - */ - public static void main(String[] args) - { - try - { - // Use the command line parser to evaluate the command line with standard handling behaviour (print errors - // and usage then exist if there are errors). - Properties options = - CommandLineParser.processCommandLine(args, - new CommandLineParser( - new String[][] - { - {"b", "The broker URL.", "broker", "false"}, - {"h", "The virtual host to use.", "virtual host", "false"}, - {"o", "The name of the directory to output test timings to.", "dir", "false"} - })); - - // Extract the command line options. - String brokerUrl = options.getProperty("b"); - String virtualHost = options.getProperty("h"); - String reportDir = options.getProperty("o"); - reportDir = (reportDir == null) ? "." : reportDir; - - - String[] testClassNames = {SustainedTestCoordinator.class.getName()}; - - // Create a coordinator and begin its test procedure. - Coordinator coordinator = new TestCoordinator(brokerUrl, virtualHost); - - coordinator.setReportDir(reportDir); - - TestResult testResult = coordinator.start(testClassNames); - - if (testResult.failureCount() > 0) - { - System.exit(FAILURE_EXIT); - } - else - { - System.exit(SUCCESS_EXIT); - } - } - catch (Exception e) - { - System.err.println(e.getMessage()); - log.error("Top level handler caught execption.", e); - System.exit(EXCEPTION_EXIT); - } - } -} -- cgit v1.2.1