From 3396c24635dd2dc387bcb23196f241a1d6b8a35f Mon Sep 17 00:00:00 2001 From: Robert Greig Date: Tue, 8 May 2007 12:04:37 +0000 Subject: Merged revisions 534903-535308,535310-535808,535810-536007,536009-536140,536142-536164 via svnmerge from https://svn.apache.org/repos/asf/incubator/qpid/branches/M2 ........ r534903 | rgreig | 2007-05-03 16:09:18 +0100 (Thu, 03 May 2007) | 1 line More interop test stuff. ........ r535254 | rgreig | 2007-05-04 15:27:53 +0100 (Fri, 04 May 2007) | 1 line First two test cases completed. Still to do pub/sub. ........ r535874 | rgreig | 2007-05-07 14:13:06 +0100 (Mon, 07 May 2007) | 1 line Added remaining test case. ........ r536163 | rgreig | 2007-05-08 12:21:35 +0100 (Tue, 08 May 2007) | 1 line Added XML logging of test results. ........ r536164 | rgreig | 2007-05-08 12:39:51 +0100 (Tue, 08 May 2007) | 1 line Added inclusion of sender and receiver names in results. ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@536167 13f79535-47bb-0310-9956-ffa450edef68 --- java/integrationtests/jar-with-dependencies.xml | 29 ++ java/integrationtests/pom.xml | 13 + .../interop/coordinator/CoordinatingTestCase.java | 125 +++++- .../qpid/interop/coordinator/Coordinator.java | 136 +++++- .../interop/coordinator/InvitingTestDecorator.java | 43 +- .../qpid/interop/coordinator/OptOutTestCase.java | 13 + .../interop/coordinator/TestClientDetails.java | 2 +- .../qpid/interop/coordinator/XMLTestListener.java | 381 ++++++++++++++++ .../testcases/CoordinatingTestCase1DummyRun.java | 64 +++ .../testcases/CoordinatingTestCase2BasicP2P.java | 31 +- .../CoordinatingTestCase3BasicPubSub.java | 71 +++ .../interop/testclient/InteropClientTestCase.java | 4 +- .../apache/qpid/interop/testclient/TestClient.java | 105 +++-- .../testclient/testcases/TestCase1DummyRun.java | 2 + .../testclient/testcases/TestCase2BasicP2P.java | 94 +++- .../testclient/testcases/TestCase3BasicPubSub.java | 224 ++++++++++ .../org/apache/qpid/util/ConversationFactory.java | 479 +++++++++++++++++++++ .../org/apache/qpid/util/ConversationHelper.java | 306 ------------- 18 files changed, 1722 insertions(+), 400 deletions(-) create mode 100644 java/integrationtests/jar-with-dependencies.xml create mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/XMLTestListener.java create mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase1DummyRun.java create mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase3BasicPubSub.java create mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java create mode 100644 java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java delete mode 100644 java/integrationtests/src/main/java/org/apache/qpid/util/ConversationHelper.java (limited to 'java/integrationtests') diff --git a/java/integrationtests/jar-with-dependencies.xml b/java/integrationtests/jar-with-dependencies.xml new file mode 100644 index 0000000000..62978ee864 --- /dev/null +++ b/java/integrationtests/jar-with-dependencies.xml @@ -0,0 +1,29 @@ + + + all-test-deps + + jar + + false + + + + + true + test + + + + + target/classes + + + + target/test-classes + + + + diff --git a/java/integrationtests/pom.xml b/java/integrationtests/pom.xml index fd7ac8b039..3afdf48204 100644 --- a/java/integrationtests/pom.xml +++ b/java/integrationtests/pom.xml @@ -103,6 +103,19 @@ + + + org.apache.maven.plugins + maven-assembly-plugin + 2.2-SNAPSHOT + + + jar-with-dependencies.xml + + target + target/assembly/work + + diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java index 36c270ef11..ef69d14be8 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java @@ -22,14 +22,17 @@ package org.apache.qpid.interop.coordinator; import java.util.Collection; +import java.util.Map; import java.util.Properties; +import java.util.concurrent.LinkedBlockingQueue; -import javax.jms.JMSException; -import javax.jms.Message; +import javax.jms.*; import junit.framework.TestCase; -import org.apache.qpid.util.ConversationHelper; +import org.apache.log4j.Logger; + +import org.apache.qpid.util.ConversationFactory; /** * An CoordinatingTestCase is a JUnit test case extension that knows how to coordinate test clients that take part in a @@ -60,19 +63,24 @@ import org.apache.qpid.util.ConversationHelper; *

*
CRC Card
Responsibilities Collaborations *
Accept notification of test case participants. {@link InvitingTestDecorator} - *
Coordinate the test sequence amongst participants. {@link ConversationHelper} + *
Accpet JMS Connection to carry out the coordination over. + *
Coordinate the test sequence amongst participants. {@link ConversationFactory} *
Supply test properties *
*/ public abstract class CoordinatingTestCase extends TestCase { + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(CoordinatingTestCase.class); + /** Holds the contact details for the sending test client. */ TestClientDetails sender; /** Holds the contact details for the receving test client. */ TestClientDetails receiver; - ConversationHelper conversation; + /** Holds the conversation factory over which to coordinate the test. */ + ConversationFactory conversationFactory; /** * Creates a new coordinating test case with the specified name. @@ -91,6 +99,8 @@ public abstract class CoordinatingTestCase extends TestCase */ public void setSender(TestClientDetails sender) { + log.debug("public void setSender(TestClientDetails sender = " + sender + "): called"); + this.sender = sender; } @@ -101,6 +111,8 @@ public abstract class CoordinatingTestCase extends TestCase */ public void setReceiver(TestClientDetails receiver) { + log.debug("public void setReceiver(TestClientDetails receiver = " + receiver + "): called"); + this.receiver = receiver; } @@ -124,6 +136,46 @@ public abstract class CoordinatingTestCase extends TestCase return receiver; } + /** + * Returns the name of the current test method of this test class, with the sending and receiving client names + * appended on to it, so that the resulting name unqiuely identifies the test and the clients that participated + * in it. + * + * @return The unique test and client name. + */ + public String getName() + { + if ((sender == null) || (receiver == null)) + { + return super.getName(); + } + else + { + return super.getName() + "_sender_" + sender.clientName + "_receiver_" + receiver.clientName; + } + } + + /** + * Should provide a translation from the junit method name of a test to its test case name as defined in the + * interop testing specification. For example the method "testP2P" might map onto the interop test case name + * "TC2_BasicP2P". + * + * @param methodName The name of the JUnit test method. + * + * @return The name of the corresponding interop test case. + */ + public abstract String getTestCaseNameForTestMethod(String methodName); + + /** + * Accepts the conversation factory over which to hold the test coordinating conversation. + * + * @param conversationFactory The conversation factory to coordinate the test over. + */ + public void setConversationFactory(ConversationFactory conversationFactory) + { + this.conversationFactory = conversationFactory; + } + /** * Holds a test coordinating conversation with the test clients. This is the basic implementation of the inner * loop of Use Case 5. It consists of assigning the test roles, begining the test and gathering the test reports @@ -135,44 +187,81 @@ public abstract class CoordinatingTestCase extends TestCase * * @throws JMSException All underlying JMSExceptions are allowed to fall through. */ - protected Message[] sequenceTest(Properties testProperties) throws JMSException + protected Message[] sequenceTest(Map testProperties) throws JMSException { + log.debug("protected Message[] sequenceTest(Object... testProperties = " + testProperties + "): called"); + + Session session = conversationFactory.getSession(); + Destination senderControlTopic = session.createTopic(sender.privateControlKey); + Destination receiverControlTopic = session.createTopic(receiver.privateControlKey); + + ConversationFactory.Conversation senderConversation = conversationFactory.startConversation(); + ConversationFactory.Conversation receiverConversation = conversationFactory.startConversation(); + // Assign the sender role to the sending test client. - Message assignSender = conversation.getSession().createMessage(); + Message assignSender = conversationFactory.getSession().createMessage(); + setPropertiesOnMessage(assignSender, testProperties); assignSender.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE"); assignSender.setStringProperty("ROLE", "SENDER"); - conversation.send(assignSender); + senderConversation.send(senderControlTopic, assignSender); // Assign the receiver role the receiving client. - Message assignReceiver = conversation.getSession().createMessage(); + Message assignReceiver = session.createMessage(); + setPropertiesOnMessage(assignReceiver, testProperties); assignReceiver.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE"); assignReceiver.setStringProperty("ROLE", "RECEIVER"); - conversation.send(assignReceiver); + receiverConversation.send(receiverControlTopic, assignReceiver); // Wait for the senders and receivers to confirm their roles. - conversation.receive(); - conversation.receive(); + senderConversation.receive(); + receiverConversation.receive(); // Start the test. - Message start = conversation.getSession().createMessage(); + Message start = session.createMessage(); start.setStringProperty("CONTROL_TYPE", "START"); - conversation.send(start); + senderConversation.send(senderControlTopic, start); // Wait for the test sender to return its report. - Message senderReport = conversation.receive(); + Message senderReport = senderConversation.receive(); + + try + { + Thread.sleep(500); + } + catch (InterruptedException e) + { } // Ask the receiver for its report. - Message statusRequest = conversation.getSession().createMessage(); + Message statusRequest = session.createMessage(); statusRequest.setStringProperty("CONTROL_TYPE", "STATUS_REQUEST"); - conversation.send(statusRequest); + receiverConversation.send(receiverControlTopic, statusRequest); // Wait for the receiver to send its report. - Message receiverReport = conversation.receive(); + Message receiverReport = receiverConversation.receive(); return new Message[] { senderReport, receiverReport }; } + + /** + * Sets properties of different types on a JMS Message. + * + * @param message The message to set properties on. + * @param properties The property name/value pairs to set. + * + * @throws JMSException All underlying JMSExceptions are allowed to fall through. + */ + public void setPropertiesOnMessage(Message message, Map properties) throws JMSException + { + for (Map.Entry entry : properties.entrySet()) + { + String name = entry.getKey(); + Object value = entry.getValue(); + + message.setObjectProperty(name, value); + } + } } diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java index 5e0f5b4941..de5faeac0f 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.interop.coordinator; +import java.io.*; import java.util.*; import java.util.concurrent.LinkedBlockingQueue; @@ -31,18 +32,18 @@ import junit.framework.TestSuite; import org.apache.log4j.Logger; +import org.apache.qpid.interop.coordinator.testcases.CoordinatingTestCase1DummyRun; import org.apache.qpid.interop.coordinator.testcases.CoordinatingTestCase2BasicP2P; -import org.apache.qpid.interop.testclient.InteropClientTestCase; +import org.apache.qpid.interop.coordinator.testcases.CoordinatingTestCase3BasicPubSub; import org.apache.qpid.interop.testclient.TestClient; -import org.apache.qpid.interop.testclient.testcases.TestCase1DummyRun; -import org.apache.qpid.interop.testclient.testcases.TestCase2BasicP2P; -import org.apache.qpid.util.ClasspathScanner; import org.apache.qpid.util.CommandLineParser; -import org.apache.qpid.util.ConversationHelper; +import org.apache.qpid.util.ConversationFactory; import org.apache.qpid.util.PrettyPrintingUtils; -import uk.co.thebadgerset.junit.extensions.TestRunnerImprovedErrorHandling; +import uk.co.thebadgerset.junit.extensions.TKTestResult; +import uk.co.thebadgerset.junit.extensions.TKTestRunner; import uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator; +import uk.co.thebadgerset.junit.extensions.util.TestContextProperties; /** *

Implements the coordinator client described in the interop testing specification @@ -51,13 +52,13 @@ import uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator; * *

*
CRC Card
Responsibilities Collaborations - *
Find out what test clients are available. {@link ConversationHelper} + *
Find out what test clients are available. {@link ConversationFactory} *
Decorate available tests to run all available clients. {@link InvitingTestDecorator} *
Attach XML test result logger. *
Terminate the interop testing framework. *
*/ -public class Coordinator extends TestRunnerImprovedErrorHandling +public class Coordinator extends TKTestRunner { private static final Logger log = Logger.getLogger(Coordinator.class); @@ -73,9 +74,20 @@ public class Coordinator extends TestRunnerImprovedErrorHandling Set enlistedClients = new HashSet(); /** Holds the conversation helper for the control conversation. */ - private ConversationHelper conversation; + private ConversationFactory conversationFactory; + + /** Holds the connection that the coordinating messages are sent over. */ private Connection connection; + /** + * Holds the name of the class of the test currently being run. Ideally passed into the {@link #createTestResult} + * method, but as the signature is already fixed for this, the current value gets pushed here as a member variable. + */ + private String currentTestClassName; + + /** Holds the path of the directory to output test results too, if one is defined. */ + private static String reportDir; + /** * Creates an interop test coordinator on the specified broker and virtual host. * @@ -114,19 +126,26 @@ public class Coordinator extends TestRunnerImprovedErrorHandling new String[][] { { "b", "The broker URL.", "broker", "false" }, - { "h", "The virtual host to use.", "virtual host", "false" } + { "h", "The virtual host to use.", "virtual host", "false" }, + { "o", "The name of the directory to output test timings to.", "dir", "false" } })); // Extract the command line options. String brokerUrl = options.getProperty("b"); String virtualHost = options.getProperty("h"); + reportDir = options.getProperty("o"); // Scan for available test cases using a classpath scanner. Collection> testCaseClasses = new ArrayList>(); // ClasspathScanner.getMatches(CoordinatingTestCase.class, "^Test.*", true); // Hard code the test classes till the classpath scanner is fixed. - Collections.addAll(testCaseClasses, new Class[] { CoordinatingTestCase2BasicP2P.class }); + Collections.addAll(testCaseClasses, + new Class[] + { + CoordinatingTestCase1DummyRun.class, CoordinatingTestCase2BasicP2P.class, + CoordinatingTestCase3BasicPubSub.class + }); // Check that some test classes were actually found. if ((testCaseClasses == null) || testCaseClasses.isEmpty()) @@ -145,9 +164,12 @@ public class Coordinator extends TestRunnerImprovedErrorHandling // Create a coordinator and begin its test procedure. Coordinator coordinator = new Coordinator(brokerUrl, virtualHost); + + boolean failure = false; + TestResult testResult = coordinator.start(testClassNames); - if (!testResult.wasSuccessful()) + if (failure) { System.exit(FAILURE_EXIT); } @@ -176,7 +198,7 @@ public class Coordinator extends TestRunnerImprovedErrorHandling public TestResult start(String[] testClassNames) throws Exception { log.debug("public TestResult start(String[] testClassNames = " + PrettyPrintingUtils.printArray(testClassNames) - + "): called"); + + ": called"); // Connect to the broker. connection = TestClient.createConnection(DEFAULT_CONNECTION_PROPS_RESOURCE, brokerUrl, virtualHost); @@ -185,28 +207,39 @@ public class Coordinator extends TestRunnerImprovedErrorHandling Destination controlTopic = session.createTopic("iop.control"); Destination responseQueue = session.createQueue("coordinator"); - conversation = new ConversationHelper(connection, controlTopic, responseQueue, LinkedBlockingQueue.class); + conversationFactory = new ConversationFactory(connection, responseQueue, LinkedBlockingQueue.class); + ConversationFactory.Conversation conversation = conversationFactory.startConversation(); + + connection.start(); // Broadcast the compulsory invitation to find out what clients are available to test. Message invite = session.createMessage(); invite.setStringProperty("CONTROL_TYPE", "INVITE"); invite.setJMSReplyTo(responseQueue); - conversation.send(invite); + conversation.send(controlTopic, invite); // Wait for a short time, to give test clients an opportunity to reply to the invitation. - Collection enlists = conversation.receiveAll(0, 10000); + Collection enlists = conversation.receiveAll(0, 3000); enlistedClients = extractEnlists(enlists); - // Run all of the tests in the suite using JUnit. - TestResult result = super.start(testClassNames); + // Run the test in the suite using JUnit. + TestResult result = null; + + for (String testClassName : testClassNames) + { + // Record the current test class, so that the test results can be output to a file incorporating this name. + this.currentTestClassName = testClassName; + + result = super.start(new String[] { testClassName }); + } // At this point in time, all tests have completed. Broadcast the shutdown message. Message terminate = session.createMessage(); terminate.setStringProperty("CONTROL_TYPE", "TERMINATE"); - conversation.send(terminate); + conversation.send(controlTopic, terminate); return result; } @@ -283,8 +316,69 @@ public class Coordinator extends TestRunnerImprovedErrorHandling } // Wrap the tests in an inviting test decorator, to perform the invite/test cycle. - targetTest = new InvitingTestDecorator(targetTest, enlistedClients, conversation); + targetTest = new InvitingTestDecorator(targetTest, enlistedClients, conversationFactory, connection); + + TestSuite suite = new TestSuite(); + suite.addTest(targetTest); + + // Wrap the tests in a scaled test decorator to them them as a 'batch' in one thread. + // targetTest = new ScaledTestDecorator(targetTest, new int[] { 1 }); - return super.doRun(targetTest, wait); + return super.doRun(suite, wait); + } + + /** + * Creates the TestResult object to be used for test runs. + * + * @return An instance of the test result object. + */ + protected TestResult createTestResult() + { + log.debug("protected TestResult createTestResult(): called"); + + TKTestResult result = new TKTestResult(fPrinter.getWriter(), delay, verbose, testCaseName); + + // Check if a directory to output reports to has been specified and attach test listeners if so. + if (reportDir != null) + { + // Create the report directory if it does not already exist. + File reportDirFile = new File(reportDir); + + if (!reportDirFile.exists()) + { + reportDirFile.mkdir(); + } + + // Create the timings file (make the name of this configurable as a command line parameter). + Writer timingsWriter = null; + + try + { + File timingsFile = new File(reportDirFile, "TEST." + currentTestClassName + ".xml"); + timingsWriter = new BufferedWriter(new FileWriter(timingsFile), 20000); + } + catch (IOException e) + { + throw new RuntimeException("Unable to create the log file to write test results to: " + e, e); + } + + // Set up a CSV results listener to output the timings to the results file. + XMLTestListener listener = new XMLTestListener(timingsWriter, currentTestClassName); + result.addListener(listener); + result.addTKTestListener(listener); + + // Register the results listeners shutdown hook to flush its data if the test framework is shutdown + // prematurely. + // registerShutdownHook(listener); + + // Record the start time of the batch. + // result.notifyStartBatch(); + + // At this point in time the test class has been instantiated, giving it an opportunity to read its parameters. + // Inform any test listers of the test properties. + result.notifyTestProperties(TestContextProperties.getAccessedProps()); + } + + return result; } } diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java index 2975082631..858ed1a589 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java @@ -23,6 +23,8 @@ package org.apache.qpid.interop.coordinator; import java.util.*; import java.util.concurrent.LinkedBlockingQueue; +import javax.jms.Connection; +import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; @@ -32,14 +34,14 @@ import junit.framework.TestSuite; import org.apache.log4j.Logger; -import org.apache.qpid.util.ConversationHelper; +import org.apache.qpid.util.ConversationFactory; import uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator; /** *

*
CRC Card
Responsibilities Collaborations - *
Broadcast test invitations and collect enlists. {@link ConversationHelper}. + *
Broadcast test invitations and collect enlists. {@link ConversationFactory}. *
Output test failures for clients unwilling to run the test case. {@link Coordinator} *
Execute coordinated test cases. {@link CoordinatingTestCase} *
@@ -52,7 +54,10 @@ public class InvitingTestDecorator extends WrappedSuiteTestDecorator Set allClients; /** Holds the conversation helper for the control level conversation for coordinating the test through. */ - ConversationHelper conversation; + ConversationFactory conversationFactory; + + /** Holds the connection that the control conversation is held over. */ + Connection connection; /** Holds the underlying {@link CoordinatingTestCase}s that this decorator wraps. */ WrappedSuiteTestDecorator testSuite; @@ -61,11 +66,12 @@ public class InvitingTestDecorator extends WrappedSuiteTestDecorator * Creates a wrapped suite test decorator from another one. * * @param suite The test suite. - * @param availableClients The list of all clients that responded to the compulsory invite. + * @param availableClients The list of all clients that responded to the compulsory invite. * @param controlConversation The conversation helper for the control level, test coordination conversation. + * @param controlConnection The connection that the coordination messages are sent over. */ public InvitingTestDecorator(WrappedSuiteTestDecorator suite, Set availableClients, - ConversationHelper controlConversation) + ConversationFactory controlConversation, Connection controlConnection) { super(suite); @@ -74,7 +80,8 @@ public class InvitingTestDecorator extends WrappedSuiteTestDecorator testSuite = suite; allClients = availableClients; - conversation = controlConversation; + conversationFactory = controlConversation; + connection = controlConnection; } /** @@ -103,14 +110,17 @@ public class InvitingTestDecorator extends WrappedSuiteTestDecorator Set enlists = null; try { - Message invite = conversation.getSession().createMessage(); + Message invite = conversationFactory.getSession().createMessage(); + Destination controlTopic = conversationFactory.getSession().createTopic("iop.control"); + ConversationFactory.Conversation conversation = conversationFactory.startConversation(); + invite.setStringProperty("CONTROL_TYPE", "INVITE"); - invite.setStringProperty("TEST_NAME", coordTest.getName()); + invite.setStringProperty("TEST_NAME", coordTest.getTestCaseNameForTestMethod(coordTest.getName())); - conversation.send(invite); + conversation.send(controlTopic, invite); // Wait for a short time, to give test clients an opportunity to reply to the invitation. - Collection replies = conversation.receiveAll(allClients.size(), 10000); + Collection replies = conversation.receiveAll(allClients.size(), 3000); enlists = Coordinator.extractEnlists(replies); } catch (JMSException e) @@ -143,12 +153,25 @@ public class InvitingTestDecorator extends WrappedSuiteTestDecorator coordTest.setSender(enlistedPair.get(0)); coordTest.setReceiver(enlistedPair.get(1)); + // Pass down the connection to hold the coordination conversation over. + coordTest.setConversationFactory(conversationFactory); + // Execute the test case. coordTest.run(testResult); } } } + /** + * Prints a string summarizing this test decorator, mainly for debugging purposes. + * + * @return String representation for debugging purposes. + */ + public String toString() + { + return "InvitingTestDecorator: [ testSuite = " + testSuite + " ]"; + } + /** * Produces all pairs of combinations of elements from two sets. The ordering of the elements in the pair is * important, that is the pair is distinct from ; both pairs are generated. For any element, i, in diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/OptOutTestCase.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/OptOutTestCase.java index 27e2c42a9a..42a382a898 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/OptOutTestCase.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/OptOutTestCase.java @@ -49,4 +49,17 @@ public class OptOutTestCase extends CoordinatingTestCase { Assert.fail("One of " + getSender() + " and " + getReceiver() + " opted out of the test."); } + + /** + * Should provide a translation from the junit method name of a test to its test case name as defined in the + * interop testing specification. For example the method "testP2P" might map onto the interop test case name + * "TC2_BasicP2P". + * + * @param methodName The name of the JUnit test method. + * @return The name of the corresponding interop test case. + */ + public String getTestCaseNameForTestMethod(String methodName) + { + return "OptOutTest"; + } } diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/TestClientDetails.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/TestClientDetails.java index 80abd4d4c9..c4a9d39cd8 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/TestClientDetails.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/TestClientDetails.java @@ -82,6 +82,6 @@ public class TestClientDetails */ public String toString() { - return "clientName = " + clientName + ", privateControlKey = " + privateControlKey; + return "TestClientDetails: [ clientName = " + clientName + ", privateControlKey = " + privateControlKey + " ]"; } } diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/XMLTestListener.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/XMLTestListener.java new file mode 100644 index 0000000000..6c7a2a5d52 --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/XMLTestListener.java @@ -0,0 +1,381 @@ +package org.apache.qpid.interop.coordinator; + +import java.io.IOException; +import java.io.PrintWriter; +import java.io.Writer; +import java.util.*; + +import junit.framework.AssertionFailedError; +import junit.framework.Test; +import junit.framework.TestCase; + +import org.apache.log4j.Logger; + +import uk.co.thebadgerset.junit.extensions.listeners.TKTestListener; + +/** + * Listens for test results for a named test and outputs these in the standard JUnit XML format to the specified + * writer. + * + *

The API for this listener accepts notifications about different aspects of a tests results through different + * methods, so some assumption needs to be made as to which test result a notification refers to. For example + * {@link #startTest} will be called, then possibly {@link #timing} will be called, even though the test instance is + * passed in both cases, it is not enough to distinguish a particular run of the test, as the test case instance may + * be being shared between multiple threads, or being run a repeated number of times, and can therfore be re-used + * between calls. The listeners make the assumption that, for every test, a unique thread will call {@link #startTest} + * and {@link #endTest} to delimit each test. All calls to set test parameters, timings, state and so on, will occur + * between the start and end and will be given with the same thread id as the start and end, so the thread id provides + * a unqiue value to identify a particular test run against. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
+ * + * @todo Merge this class with CSV test listener, making the collection of results common to both, and only factoring + * out the results printing code into sub-classes. Provide a simple XML results formatter with the same format as + * the ant XML formatter, and a more structured one for outputing results with timings and summaries from + * performance tests. + */ +public class XMLTestListener implements TKTestListener +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(XMLTestListener.class); + + /** The results file writer. */ + protected Writer writer; + + /** Holds the results for individual tests. */ + // protected Map results = new LinkedHashMap(); + // protected List results = new ArrayList(); + + /** + * Map for holding results on a per thread basis as they come in. A ThreadLocal is not used as sometimes an + * explicit thread id must be used, where notifications come from different threads than the ones that called + * the test method. + */ + Map threadLocalResults = Collections.synchronizedMap(new LinkedHashMap()); + + /** + * Holds results for tests that have ended. Transferring these results here from the per-thread results map, means + * that the thread id is freed for the thread to generate more results. + */ + List results = new ArrayList(); + + /** Holds the overall error count. */ + protected int errors = 0; + + /** Holds the overall failure count. */ + protected int failures = 0; + + /** Holds the overall tests run count. */ + protected int runs = 0; + + /** Holds the name of the class that tests are being run for. */ + String testClassName; + + /** + * Creates a new XML results output listener that writes to the specified location. + * + * @param writer The location to write results to. + */ + public XMLTestListener(Writer writer, String testClassName) + { + log.debug("public XMLTestListener(Writer writer, String testClassName = " + testClassName + "): called"); + + this.writer = writer; + this.testClassName = testClassName; + } + + /** + * Resets the test results to the default state of time zero, memory usage zero, parameter zero, test passed. + * + * @param test The test to resest any results for. + * @param threadId Optional thread id if not calling from thread that started the test method. May be null. + */ + public void reset(Test test, Long threadId) + { + log.debug("public void reset(Test test = " + test + ", Long threadId = " + threadId + "): called"); + + XMLTestListener.Result r = + (threadId == null) ? threadLocalResults.get(Thread.currentThread().getId()) : threadLocalResults.get(threadId); + + r.error = null; + r.failure = null; + + } + + /** + * A test started. + */ + public void startTest(Test test) + { + log.debug("public void startTest(Test test = " + test + "): called"); + + Result newResult = new Result(test.getClass().getName(), ((TestCase) test).getName()); + + // Initialize the thread local test results. + threadLocalResults.put(Thread.currentThread().getId(), newResult); + runs++; + } + + /** + * Should be called every time a test completes with the run time of that test. + * + * @param test The name of the test. + * @param nanos The run time of the test in nanoseconds. + * @param threadId Optional thread id if not calling from thread that started the test method. May be null. + */ + public void timing(Test test, long nanos, Long threadId) + { } + + /** + * Should be called every time a test completed with the amount of memory used before and after the test was run. + * + * @param test The test which memory was measured for. + * @param memStart The total JVM memory used before the test was run. + * @param memEnd The total JVM memory used after the test was run. + * @param threadId Optional thread id if not calling from thread that started the test method. May be null. + */ + public void memoryUsed(Test test, long memStart, long memEnd, Long threadId) + { } + + /** + * Should be called every time a parameterized test completed with the int value of its test parameter. + * + * @param test The test which memory was measured for. + * @param parameter The int parameter value. + * @param threadId Optional thread id if not calling from thread that started the test method. May be null. + */ + public void parameterValue(Test test, int parameter, Long threadId) + { } + + /** + * Should be called every time a test completes with the current number of test threads running. + * + * @param test The test for which the measurement is being generated. + * @param threads The number of tests being run concurrently. + * @param threadId Optional thread id if not calling from thread that started the test method. May be null. + */ + public void concurrencyLevel(Test test, int threads, Long threadId) + { } + + /** + * Notifies listeners of the tests read/set properties. + * + * @param properties The tests read/set properties. + */ + public void properties(Properties properties) + { } + + /** + * A test ended. + */ + public void endTest(Test test) + { + log.debug("public void endTest(Test test = " + test + "): called"); + + // Move complete test results into the completed tests list. + Result r = threadLocalResults.get(Thread.currentThread().getId()); + results.add(r); + + // Clear all the test results for the thread. + threadLocalResults.remove(Thread.currentThread().getId()); + } + + /** + * Called when a test completes. Success, failure and errors. This method should be used when registering an + * end test from a different thread than the one that started the test. + * + * @param test The test which completed. + * @param threadId Optional thread id if not calling from thread that started the test method. May be null. + */ + public void endTest(Test test, Long threadId) + { + log.debug("public void endTest(Test test = " + test + ", Long threadId = " + threadId + "): called"); + + // Move complete test results into the completed tests list. + Result r = + (threadId == null) ? threadLocalResults.get(Thread.currentThread().getId()) : threadLocalResults.get(threadId); + results.add(r); + + // Clear all the test results for the thread. + threadLocalResults.remove(Thread.currentThread().getId()); + } + + /** + * An error occurred. + */ + public void addError(Test test, Throwable t) + { + log.debug("public void addError(Test test = " + test + ", Throwable t = " + t + "): called"); + + Result r = threadLocalResults.get(Thread.currentThread().getId()); + r.error = t; + errors++; + } + + /** + * A failure occurred. + */ + public void addFailure(Test test, AssertionFailedError t) + { + log.debug("public void addFailure(Test test = " + test + ", AssertionFailedError t = " + t + "): called"); + + Result r = threadLocalResults.get(Thread.currentThread().getId()); + r.failure = t; + failures++; + } + + /** + * Called when a test completes to mark it as a test fail. This method should be used when registering a + * failure from a different thread than the one that started the test. + * + * @param test The test which failed. + * @param e The assertion that failed the test. + * @param threadId Optional thread id if not calling from thread that started the test method. May be null. + */ + public void addFailure(Test test, AssertionFailedError e, Long threadId) + { + log.debug("public void addFailure(Test test, AssertionFailedError e, Long threadId): called"); + + Result r = + (threadId == null) ? threadLocalResults.get(Thread.currentThread().getId()) : threadLocalResults.get(threadId); + r.failure = e; + failures++; + } + + /** + * Notifies listeners of the start of a complete run of tests. + */ + public void startBatch() + { + log.debug("public void startBatch(): called"); + + // Reset all results counts. + threadLocalResults = Collections.synchronizedMap(new HashMap()); + errors = 0; + failures = 0; + runs = 0; + + // Write out the file header. + try + { + writer.write("\n"); + } + catch (IOException e) + { + throw new RuntimeException("Unable to write the test results.", e); + } + } + + /** + * Notifies listeners of the end of a complete run of tests. + * + * @param parameters The optional test parameters to log out with the batch results. + */ + public void endBatch(Properties parameters) + { + log.debug("public void endBatch(Properties parameters = " + parameters + "): called"); + + // Write out the results. + try + { + // writer.write("\n"); + writer.write("\n"); + + for (Result result : results) + { + writer.write(" \n"); + + if (result.error != null) + { + writer.write(" "); + result.error.printStackTrace(new PrintWriter(writer)); + writer.write(" "); + } + else if (result.failure != null) + { + writer.write(" "); + result.failure.printStackTrace(new PrintWriter(writer)); + writer.write(" "); + } + + writer.write(" \n"); + } + + writer.write("\n"); + writer.flush(); + } + catch (IOException e) + { + throw new RuntimeException("Unable to write the test results.", e); + } + } + + /** + * Used to capture the results of a particular test run. + */ + protected static class Result + { + public Result(String testClass, String testName) + { + this.testClass = testClass; + this.testName = testName; + } + + public String testClass; + public String testName; + + /** Holds the exception that caused error in this test. */ + public Throwable error; + + /** Holds the assertion exception that caused failure in this test. */ + public AssertionFailedError failure; + + /** Holds the error count for this test. */ + // public int errors = 0; + + /** Holds the failure count for this tests. */ + // public int failures = 0; + + /** Holds the overall tests run count for this test. */ + // public int runs = 0; + + /*public boolean equals(Object o) + { + if (this == o) + { + return true; + } + + if (!(o instanceof Result)) + { + return false; + } + + final Result result = (Result) o; + + if ((testClass != null) ? (!testClass.equals(result.testClass)) : (result.testClass != null)) + { + return false; + } + + if ((testName != null) ? (!testName.equals(result.testName)) : (result.testName != null)) + { + return false; + } + + return true; + } + + public int hashCode() + { + int result; + result = ((testClass != null) ? testClass.hashCode() : 0); + result = (29 * result) + ((testName != null) ? testName.hashCode() : 0); + + return result; + }*/ + } +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase1DummyRun.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase1DummyRun.java new file mode 100644 index 0000000000..e20f702dd1 --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase1DummyRun.java @@ -0,0 +1,64 @@ +package org.apache.qpid.interop.coordinator.testcases; + +import java.util.HashMap; +import java.util.Map; + +import javax.jms.Message; + +import junit.framework.Assert; + +import org.apache.log4j.Logger; + +import org.apache.qpid.interop.coordinator.CoordinatingTestCase; + +/** + *

+ *
CRC Card
Responsibilities Collaborations + *
Exercises the interop testing framework without actually sending any test messages. + * {@link org.apache.qpid.interop.coordinator.CoordinatingTestCase} + *
+ */ +public class CoordinatingTestCase1DummyRun extends CoordinatingTestCase +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(CoordinatingTestCase1DummyRun.class); + + /** + * Creates a new coordinating test case with the specified name. + * + * @param name The test case name. + */ + public CoordinatingTestCase1DummyRun(String name) + { + super(name); + } + + /** + * Performs the basic P2P test case, "Test Case 2" in the specification. + */ + public void testDummyRun() throws Exception + { + log.debug("public void testDummyRun(): called"); + + Map testConfig = new HashMap(); + testConfig.put("TEST_NAME", "TC1_DummyRun"); + + Message[] reports = sequenceTest(testConfig); + + // Compare sender and receiver reports. + Assert.assertEquals("Expected to get 2 dummy reports.", 2, reports.length); + } + + /** + * Should provide a translation from the junit method name of a test to its test case name as defined in the + * interop testing specification. For example the method "testP2P" might map onto the interop test case name + * "TC2_BasicP2P". + * + * @param methodName The name of the JUnit test method. + * @return The name of the corresponding interop test case. + */ + public String getTestCaseNameForTestMethod(String methodName) + { + return "TC1_DummyRun"; + } +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase2BasicP2P.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase2BasicP2P.java index 4fb9729c4a..d66d42dd51 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase2BasicP2P.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase2BasicP2P.java @@ -1,11 +1,14 @@ package org.apache.qpid.interop.coordinator.testcases; -import java.util.Properties; +import java.util.HashMap; +import java.util.Map; import javax.jms.Message; import junit.framework.Assert; +import org.apache.log4j.Logger; + import org.apache.qpid.interop.coordinator.CoordinatingTestCase; /** @@ -16,6 +19,9 @@ import org.apache.qpid.interop.coordinator.CoordinatingTestCase; */ public class CoordinatingTestCase2BasicP2P extends CoordinatingTestCase { + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(CoordinatingTestCase2BasicP2P.class); + /** * Creates a new coordinating test case with the specified name. * @@ -31,10 +37,12 @@ public class CoordinatingTestCase2BasicP2P extends CoordinatingTestCase */ public void testBasicP2P() throws Exception { - Properties testConfig = new Properties(); - testConfig.setProperty("TEST_CASE", "TC2_BasicP2P"); - testConfig.setProperty("P2P_QUEUE_AND_KEY_NAME", "tc2queue"); - testConfig.setProperty("P2P_NUM_MESSAGES", "50"); + log.debug("public void testBasicP2P(): called"); + + Map testConfig = new HashMap(); + testConfig.put("TEST_NAME", "TC2_BasicP2P"); + testConfig.put("P2P_QUEUE_AND_KEY_NAME", "tc2queue"); + testConfig.put("P2P_NUM_MESSAGES", 50); Message[] reports = sequenceTest(testConfig); @@ -45,4 +53,17 @@ public class CoordinatingTestCase2BasicP2P extends CoordinatingTestCase Assert.assertEquals("The requested number of messages were not sent.", 50, messagesSent); Assert.assertEquals("Sender and receiver messages sent did not match up.", messagesSent, messagesReceived); } + + /** + * Should provide a translation from the junit method name of a test to its test case name as defined in the + * interop testing specification. For example the method "testP2P" might map onto the interop test case name + * "TC2_BasicP2P". + * + * @param methodName The name of the JUnit test method. + * @return The name of the corresponding interop test case. + */ + public String getTestCaseNameForTestMethod(String methodName) + { + return "TC2_BasicP2P"; + } } diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase3BasicPubSub.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase3BasicPubSub.java new file mode 100644 index 0000000000..ce46bb87ba --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase3BasicPubSub.java @@ -0,0 +1,71 @@ +package org.apache.qpid.interop.coordinator.testcases; + +import java.util.HashMap; +import java.util.Map; + +import javax.jms.Message; + +import junit.framework.Assert; + +import org.apache.log4j.Logger; + +import org.apache.qpid.interop.coordinator.CoordinatingTestCase; + +/** + *

+ *
CRC Card
Responsibilities Collaborations + *
Setup pub/sub test parameters and compare with test output. {@link CoordinatingTestCase} + *
+ */ +public class CoordinatingTestCase3BasicPubSub extends CoordinatingTestCase +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(CoordinatingTestCase3BasicPubSub.class); + + /** + * Creates a new coordinating test case with the specified name. + * + * @param name The test case name. + */ + public CoordinatingTestCase3BasicPubSub(String name) + { + super(name); + } + + /** + * Performs the basic P2P test case, "Test Case 2" in the specification. + */ + public void testBasicPubSub() throws Exception + { + log.debug("public void testBasicPubSub(): called"); + + Map testConfig = new HashMap(); + testConfig.put("TEST_NAME", "TC3_BasicPubSub"); + testConfig.put("PUBSUB_KEY", "tc3route"); + testConfig.put("PUBSUB_NUM_MESSAGES", 10); + testConfig.put("PUBSUB_NUM_RECEIVERS", 5); + + Message[] reports = sequenceTest(testConfig); + + // Compare sender and receiver reports. + int messagesSent = reports[0].getIntProperty("MESSAGE_COUNT"); + int messagesReceived = reports[1].getIntProperty("MESSAGE_COUNT"); + + Assert.assertEquals("The requested number of messages were not sent.", 10, messagesSent); + Assert.assertEquals("Received messages did not match up to num sent * num receivers.", messagesSent * 5, + messagesReceived); + } + + /** + * Should provide a translation from the junit method name of a test to its test case name as defined in the + * interop testing specification. For example the method "testP2P" might map onto the interop test case name + * "TC2_BasicP2P". + * + * @param methodName The name of the JUnit test method. + * @return The name of the corresponding interop test case. + */ + public String getTestCaseNameForTestMethod(String methodName) + { + return "TC3_BasicPubSub"; + } +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java index 57726285f9..9f769822de 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java @@ -79,8 +79,10 @@ public interface InteropClientTestCase extends MessageListener /** * Performs the test case actions. + * + * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through. */ - public void start(); + public void start() throws JMSException; /** * Gets a report on the actions performed by the test case in its assigned role. diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java index 6d7ab9c978..0b9c72e1b6 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java @@ -21,10 +21,7 @@ package org.apache.qpid.interop.testclient; import java.io.IOException; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; +import java.util.*; import javax.jms.*; import javax.naming.Context; @@ -33,6 +30,9 @@ import javax.naming.NamingException; import org.apache.log4j.Logger; +import org.apache.qpid.interop.testclient.testcases.TestCase1DummyRun; +import org.apache.qpid.interop.testclient.testcases.TestCase2BasicP2P; +import org.apache.qpid.interop.testclient.testcases.TestCase3BasicPubSub; import org.apache.qpid.util.ClasspathScanner; import org.apache.qpid.util.CommandLineParser; import org.apache.qpid.util.PropertiesUtils; @@ -61,41 +61,53 @@ public class TestClient implements MessageListener { private static Logger log = Logger.getLogger(TestClient.class); + public static final String CONNECTION_PROPERTY = "connectionfactory.broker"; + public static final String CONNECTION_NAME = "broker"; + public static final String CLIENT_NAME = "java"; + public static final String DEFAULT_CONNECTION_PROPS_RESOURCE = "org/apache/qpid/interop/connection.properties"; + /** Holds the URL of the broker to run the tests on. */ - String brokerUrl; + public static String brokerUrl; /** Holds the virtual host to run the tests on. If null, then the default virtual host is used. */ - String virtualHost; + public static String virtualHost; /** Holds all the test cases loaded from the classpath. */ Map testCases = new HashMap(); InteropClientTestCase currentTestCase; - public static final String CONNECTION_PROPERTY = "connectionfactory.broker"; - public static final String CONNECTION_NAME = "broker"; - public static final String CLIENT_NAME = "java"; - public static final String DEFAULT_CONNECTION_PROPS_RESOURCE = "org/apache/qpid/interop/connection.properties"; - private MessageProducer producer; private Session session; - public TestClient(String brokerUrl, String virtualHost) + private String clientName = CLIENT_NAME; + + /** + * Creates a new interop test client, listenting to the specified broker and virtual host, with the specified + * client identifying name. + * + * @param brokerUrl The url of the broker to connect to. + * @param virtualHost The virtual host to conect to. + * @param clientName The client name to use. + */ + public TestClient(String brokerUrl, String virtualHost, String clientName) { log.debug("public TestClient(String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost - + "): called"); + + ", String clientName = " + clientName + "): called"); // Retain the connection parameters. this.brokerUrl = brokerUrl; this.virtualHost = virtualHost; + this.clientName = clientName; } /** * The entry point for the interop test coordinator. This client accepts the following command line arguments: * *

- *
-b The broker URL. Mandatory. - *
-h The virtual host. Optional. + *
-b The broker URL. Optional. + *
-h The virtual host. Optional. + *
-n The test client name. Optional. *
name=value Trailing argument define name/value pairs. Added to system properties. Optional. *
* @@ -105,11 +117,13 @@ public class TestClient implements MessageListener { // Use the command line parser to evaluate the command line. CommandLineParser commandLine = - new CommandLineParser(new String[][] - { - { "b", "The broker URL.", "broker", "false" }, - { "h", "The virtual host to use.", "virtual host", "false" } - }); + new CommandLineParser( + new String[][] + { + { "b", "The broker URL.", "broker", "false" }, + { "h", "The virtual host to use.", "virtual host", "false" }, + { "n", "The test client name.", "name", "false" } + }); // Capture the command line arguments or display errors and correct usage and then exit. Properties options = null; @@ -128,13 +142,14 @@ public class TestClient implements MessageListener // Extract the command line options. String brokerUrl = options.getProperty("b"); String virtualHost = options.getProperty("h"); + String clientName = options.getProperty("n"); // Add all the trailing command line options (name=value pairs) to system properties. Tests may pick up // overridden values from there. commandLine.addCommandLineToSysProperties(); // Create a test client and start it running. - TestClient client = new TestClient(brokerUrl, virtualHost); + TestClient client = new TestClient(brokerUrl, virtualHost, (clientName == null) ? CLIENT_NAME : clientName); try { @@ -147,13 +162,22 @@ public class TestClient implements MessageListener } } + /** + * Starts the interop test client running. This causes it to start listening for incoming test invites. + * + * @throws JMSException Any underlying JMSExceptions are allowed to fall through. + */ private void start() throws JMSException { log.debug("private void start(): called"); // Use a class path scanner to find all the interop test case implementations. Collection> testCaseClasses = - ClasspathScanner.getMatches(InteropClientTestCase.class, "^TestCase.*", true); + new ArrayList>(); + // ClasspathScanner.getMatches(InteropClientTestCase.class, "^TestCase.*", true); + // Hard code the test classes till the classpath scanner is fixed. + Collections.addAll(testCaseClasses, + new Class[] { TestCase1DummyRun.class, TestCase2BasicP2P.class, TestCase3BasicPubSub.class }); // Create all the test case implementations and index them by the test names. for (Class nextClass : testCaseClasses) @@ -181,9 +205,12 @@ public class TestClient implements MessageListener session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Set this up to listen for control messages. - MessageConsumer consumer = session.createConsumer(session.createTopic("iop.control." + CLIENT_NAME)); + MessageConsumer consumer = session.createConsumer(session.createTopic("iop.control." + clientName)); consumer.setMessageListener(this); + MessageConsumer consumer2 = session.createConsumer(session.createTopic("iop.control")); + consumer2.setMessageListener(this); + // Create a producer to send replies with. producer = session.createProducer(null); @@ -209,13 +236,13 @@ public class TestClient implements MessageListener public static Connection createConnection(String connectionPropsResource, String brokerUrl, String virtualHost) { log.debug("public static Connection createConnection(String connectionPropsResource = " + connectionPropsResource - + ", String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost + "): called"); + + ", String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost + "): called"); try { Properties connectionProps = PropertiesUtils.getProperties(TestClient.class.getClassLoader().getResourceAsStream( - connectionPropsResource)); + connectionPropsResource)); if (brokerUrl != null) { @@ -270,6 +297,8 @@ public class TestClient implements MessageListener if (testCaseName != null) { + log.debug("Got an invite to test: " + testCaseName); + // Check if the requested test case is available. InteropClientTestCase testCase = testCases.get(testCaseName); @@ -282,6 +311,8 @@ public class TestClient implements MessageListener } else { + log.debug("Got a compulsory invite."); + enlist = true; } @@ -290,8 +321,8 @@ public class TestClient implements MessageListener // Reply with the client name in an Enlist message. Message enlistMessage = session.createMessage(); enlistMessage.setStringProperty("CONTROL_TYPE", "ENLIST"); - enlistMessage.setStringProperty("CLIENT_NAME", CLIENT_NAME); - enlistMessage.setStringProperty("CLIENT_PRIVATE_CONTROL_KEY", "iop.control." + CLIENT_NAME); + enlistMessage.setStringProperty("CLIENT_NAME", clientName); + enlistMessage.setStringProperty("CLIENT_PRIVATE_CONTROL_KEY", "iop.control." + clientName); enlistMessage.setJMSCorrelationID(message.getJMSCorrelationID()); producer.send(message.getJMSReplyTo(), enlistMessage); @@ -300,7 +331,10 @@ public class TestClient implements MessageListener else if ("ASSIGN_ROLE".equals(controlType)) { // Assign the role to the current test case. - String roleName = message.getStringProperty(""); + String roleName = message.getStringProperty("ROLE"); + + log.debug("Got a role assignment to role: " + roleName); + InteropClientTestCase.Roles role = Enum.valueOf(InteropClientTestCase.Roles.class, roleName); currentTestCase.assignRole(role, message); @@ -316,9 +350,15 @@ public class TestClient implements MessageListener { if ("START".equals(controlType)) { + log.debug("Got a start notification."); + // Start the current test case. currentTestCase.start(); } + else + { + log.debug("Got a status request."); + } // Generate the report from the test case and reply with it as a Report message. Message reportMessage = currentTestCase.getReport(session); @@ -327,10 +367,17 @@ public class TestClient implements MessageListener producer.send(message.getJMSReplyTo(), reportMessage); } + else if ("TERMINATE".equals(controlType)) + { + System.out.println("Received termination instruction from coordinator."); + + // Is a cleaner shutdown needed? + System.exit(0); + } else { // Log a warning about this but otherwise ignore it. - log.warn("Got an unknown control message: " + message); + log.warn("Got an unknown control message, controlType = " + controlType + ", message = " + message); } } catch (JMSException e) diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java index aa31a59b06..874f86daa9 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java @@ -84,6 +84,8 @@ public class TestCase1DummyRun implements InteropClientTestCase public void onMessage(Message message) { + log.debug("public void onMessage(Message message = " + message + "): called"); + // Ignore any messages. } } diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java index c3c05d8fd9..ea62b46451 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java @@ -20,11 +20,12 @@ */ package org.apache.qpid.interop.testclient.testcases; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.Session; +import javax.jms.*; + +import org.apache.log4j.Logger; import org.apache.qpid.interop.testclient.InteropClientTestCase; +import org.apache.qpid.interop.testclient.TestClient; /** * Implements test case 2, basic P2P. Sends/received a specified number of messages to a specified route on the @@ -41,6 +42,30 @@ import org.apache.qpid.interop.testclient.InteropClientTestCase; */ public class TestCase2BasicP2P implements InteropClientTestCase { + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(TestCase2BasicP2P.class); + + /** Holds the count of test messages received. */ + private int messageCount; + + /** The role to be played by the test. */ + private Roles role; + + /** The number of test messages to send. */ + private int numMessages; + + /** The routing key to send them to on the default direct exchange. */ + private Destination sendDestination; + + /** The connection to send the test messages on. */ + private Connection connection; + + /** The session to send the test messages on. */ + private Session session; + + /** The producer to send the test messages with. */ + MessageProducer producer; + /** * Should provide the name of the test case that this class implements. The exact names are defined in the * interop testing spec. @@ -49,6 +74,8 @@ public class TestCase2BasicP2P implements InteropClientTestCase */ public String getName() { + log.debug("public String getName(): called"); + return "TC2_BasicP2P"; } @@ -63,6 +90,8 @@ public class TestCase2BasicP2P implements InteropClientTestCase */ public boolean acceptInvite(Message inviteMessage) throws JMSException { + log.debug("public boolean acceptInvite(Message inviteMessage = " + inviteMessage + "): called"); + // All invites are acceptable. return true; } @@ -79,27 +108,65 @@ public class TestCase2BasicP2P implements InteropClientTestCase */ public void assignRole(Roles role, Message assignRoleMessage) throws JMSException { + log.debug("public void assignRole(Roles role = " + role + ", Message assignRoleMessage = " + assignRoleMessage + + "): called"); + + // Reset the message count for a new test. + messageCount = 0; + // Take note of the role to be played. + this.role = role; + + // Create a new connection to pass the test messages on. + connection = + TestClient.createConnection(TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, TestClient.brokerUrl, + TestClient.virtualHost); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Extract and retain the test parameters. + numMessages = assignRoleMessage.getIntProperty("P2P_NUM_MESSAGES"); + sendDestination = session.createQueue(assignRoleMessage.getStringProperty("P2P_QUEUE_AND_KEY_NAME")); - // Create a new connection to pass the test messages on. + log.debug("numMessages = " + numMessages); + log.debug("sendDestination = " + sendDestination); + log.debug("role = " + role); - // Check if the sender role is being assigned, and set up a message producer if so. + switch (role) { - } + // Check if the sender role is being assigned, and set up a message producer if so. + case SENDER: + producer = session.createProducer(sendDestination); + break; + // Otherwise the receiver role is being assigned, so set this up to listen for messages. - { + case RECEIVER: + MessageConsumer consumer = session.createConsumer(sendDestination); + consumer.setMessageListener(this); + break; } + + connection.start(); } /** * Performs the test case actions. */ - public void start() + public void start() throws JMSException { + log.debug("public void start(): called"); + // Check that the sender role is being performed. + if (role.equals(Roles.SENDER)) { + Message testMessage = session.createTextMessage("test"); + + for (int i = 0; i < numMessages; i++) + { + producer.send(testMessage); + + // Increment the message count. + messageCount++; + } } } @@ -114,11 +181,17 @@ public class TestCase2BasicP2P implements InteropClientTestCase */ public Message getReport(Session session) throws JMSException { + log.debug("public Message getReport(Session session): called"); + // Close the test connection. + connection.close(); // Generate a report message containing the count of the number of messages passed. + Message report = session.createMessage(); + report.setStringProperty("CONTROL_TYPE", "REPORT"); + report.setIntProperty("MESSAGE_COUNT", messageCount); - return null; + return report; } /** @@ -128,6 +201,9 @@ public class TestCase2BasicP2P implements InteropClientTestCase */ public void onMessage(Message message) { + log.debug("public void onMessage(Message message = " + message + "): called"); + // Increment the message count. + messageCount++; } } diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java new file mode 100644 index 0000000000..2773cad3f3 --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java @@ -0,0 +1,224 @@ +package org.apache.qpid.interop.testclient.testcases; + +import javax.jms.*; + +import org.apache.log4j.Logger; + +import org.apache.qpid.interop.testclient.InteropClientTestCase; +import org.apache.qpid.interop.testclient.TestClient; + +/** + * Implements test case 3, basic pub/sub. Sends/received a specified number of messages to a specified route on the + * default topic exchange, using the specified number of receiver connections. Produces reports on the actual number of + * messages sent/received. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Supply the name of the test case that this implements. + *
Accept/Reject invites based on test parameters. + *
Adapt to assigned roles. + *
Send required number of test messages using pub/sub. + *
Generate test reports. + *
+ */ +public class TestCase3BasicPubSub implements InteropClientTestCase +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(TestCase3BasicPubSub.class); + + /** Holds the count of test messages received. */ + private int messageCount; + + /** The role to be played by the test. */ + private Roles role; + + /** The number of test messages to send. */ + private int numMessages; + + /** The number of receiver connection to use. */ + private int numReceivers; + + /** The routing key to send them to on the default direct exchange. */ + private Destination sendDestination; + + /** The connections to send/receive the test messages on. */ + private Connection[] connection; + + /** The sessions to send/receive the test messages on. */ + private Session[] session; + + /** The producer to send the test messages with. */ + MessageProducer producer; + + /** + * Should provide the name of the test case that this class implements. The exact names are defined in the + * interop testing spec. + * + * @return The name of the test case that this implements. + */ + public String getName() + { + log.debug("public String getName(): called"); + + return "TC3_BasicPubSub"; + } + + /** + * Determines whether the test invite that matched this test case is acceptable. + * + * @param inviteMessage The invitation to accept or reject. + * + * @return true to accept the invitation, false to reject it. + * + * @throws javax.jms.JMSException Any JMSException resulting from reading the message are allowed to fall through. + */ + public boolean acceptInvite(Message inviteMessage) throws JMSException + { + log.debug("public boolean acceptInvite(Message inviteMessage = " + inviteMessage + "): called"); + + // All invites are acceptable. + return true; + } + + /** + * Assigns the role to be played by this test case. The test parameters are fully specified in the + * assignment message. When this method return the test case will be ready to execute. + * + * @param role The role to be played; sender or receiver. + * + * @param assignRoleMessage The role assingment message, contains the full test parameters. + * + * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through. + */ + public void assignRole(Roles role, Message assignRoleMessage) throws JMSException + { + log.debug("public void assignRole(Roles role = " + role + ", Message assignRoleMessage = " + assignRoleMessage + + "): called"); + + // Reset the message count for a new test. + messageCount = 0; + + // Take note of the role to be played. + this.role = role; + + // Extract and retain the test parameters. + numMessages = assignRoleMessage.getIntProperty("PUBSUB_NUM_MESSAGES"); + numReceivers = assignRoleMessage.getIntProperty("PUBSUB_NUM_RECEIVERS"); + String sendKey = assignRoleMessage.getStringProperty("PUBSUB_KEY"); + + log.debug("numMessages = " + numMessages); + log.debug("numReceivers = " + numReceivers); + log.debug("sendKey = " + sendKey); + log.debug("role = " + role); + + switch (role) + { + // Check if the sender role is being assigned, and set up a single message producer if so. + case SENDER: + // Create a new connection to pass the test messages on. + connection = new Connection[1]; + session = new Session[1]; + + connection[0] = + TestClient.createConnection(TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, TestClient.brokerUrl, + TestClient.virtualHost); + session[0] = connection[0].createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Extract and retain the test parameters. + sendDestination = session[0].createTopic(sendKey); + + producer = session[0].createProducer(sendDestination); + break; + + // Otherwise the receiver role is being assigned, so set this up to listen for messages on the required number + // of receiver connections. + case RECEIVER: + // Create the required number of receiver connections. + connection = new Connection[numReceivers]; + session = new Session[numReceivers]; + + for (int i = 0; i < numReceivers; i++) + { + connection[i] = + TestClient.createConnection(TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, TestClient.brokerUrl, + TestClient.virtualHost); + session[i] = connection[i].createSession(false, Session.AUTO_ACKNOWLEDGE); + + sendDestination = session[i].createTopic(sendKey); + + MessageConsumer consumer = session[i].createConsumer(sendDestination); + consumer.setMessageListener(this); + } + + break; + } + + // Start all the connection dispatcher threads running. + for (int i = 0; i < connection.length; i++) + { + connection[i].start(); + } + } + + /** + * Performs the test case actions. + */ + public void start() throws JMSException + { + log.debug("public void start(): called"); + + // Check that the sender role is being performed. + if (role.equals(Roles.SENDER)) + { + Message testMessage = session[0].createTextMessage("test"); + + for (int i = 0; i < numMessages; i++) + { + producer.send(testMessage); + + // Increment the message count. + messageCount++; + } + } + } + + /** + * Gets a report on the actions performed by the test case in its assigned role. + * + * @param session The session to create the report message in. + * + * @return The report message. + * + * @throws JMSException Any JMSExceptions resulting from creating the report are allowed to fall through. + */ + public Message getReport(Session session) throws JMSException + { + log.debug("public Message getReport(Session session): called"); + + // Close the test connections. + for (int i = 0; i < connection.length; i++) + { + connection[i].close(); + } + + // Generate a report message containing the count of the number of messages passed. + Message report = session.createMessage(); + report.setStringProperty("CONTROL_TYPE", "REPORT"); + report.setIntProperty("MESSAGE_COUNT", messageCount); + + return report; + } + + /** + * Counts incoming test messages. + * + * @param message The incoming test message. + */ + public void onMessage(Message message) + { + log.debug("public void onMessage(Message message = " + message + "): called"); + + // Increment the message count. + messageCount++; + } +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java b/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java new file mode 100644 index 0000000000..4ca2fe8ff5 --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java @@ -0,0 +1,479 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.util; + +import java.util.*; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import javax.jms.*; + +import org.apache.log4j.Logger; + +/** + * A conversation helper, uses a message correlation id pattern to match up sent and received messages as a conversation + * over JMS messaging. Incoming message traffic is divided up by correlation id. Each id has a queue (behaviour dependant + * on the queue implementation). Clients of this de-multiplexer can wait on messages, defined by message correlation ids. + * + *

One use of this is as a conversation synchronizer where multiple threads are carrying out conversations over a + * multiplexed messaging route. This can be usefull, as JMS sessions are not multi-threaded. Setting up the conversation + * with synchronous queues will allow these threads to be written in a synchronous style, but with their execution order + * governed by the asynchronous message flow. For example, something like the following code could run a multi-threaded + * conversation (the conversation methods can be called many times in parallel): + * + *

+ * class Initiator
+ * {
+ * ConversationHelper conversation = new ConversationHelper(connection, null,
+ *                                                          java.util.concurrent.LinkedBlockingQueue.class);
+ *
+ * initiateConversation()
+ * {
+ *  try {
+ *   // Exchange greetings.
+ *   conversation.send(sendDestination, conversation.getSession().createTextMessage("Hello."));
+ *   Message greeting = conversation.receive();
+ *
+ *   // Exchange goodbyes.
+ *   conversation.send(conversation.getSession().createTextMessage("Goodbye."));
+ *   Message goodbye = conversation.receive();
+ *  } finally {
+ *   conversation.end();
+ *  }
+ * }
+ * }
+ *
+ * class Responder
+ * {
+ * ConversationHelper conversation = new ConversationHelper(connection, receiveDestination,
+ *                                                          java.util.concurrent.LinkedBlockingQueue.class);
+ *
+ * respondToConversation()
+ * {
+ *   try {
+ *   // Exchange greetings.
+ *   Message greeting = conversation.receive();
+ *   conversation.send(conversation.getSession().createTextMessage("Hello."));
+ *
+ *   // Exchange goodbyes.
+ *   Message goodbye = conversation.receive();
+ *   conversation.send(conversation.getSession().createTextMessage("Goodbye."));
+ *  } finally {
+ *   conversation.end();
+ *  }
+ * }
+ * }
+ * 
+ * + *

Conversation correlation id's are generated on a per thread basis. + * + *

The same session is shared amongst all conversations. Calls to send are therefore synchronized because JMS + * sessions are not multi-threaded. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Associate messages to an ongoing conversation using correlation ids. + *
Auto manage sessions for conversations. + *
Store messages not in a conversation in dead letter box. + *
+ */ +public class ConversationFactory +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(ConversationFactory.class); + + /** Holds a map from correlation id's to queues. */ + private Map> idsToQueues = new HashMap>(); + + /** Holds the connection over which the conversation is conducted. */ + private Connection connection; + + /** Holds the session over which the conversation is conduxted. */ + private Session session; + + /** The message consumer for incoming messages. */ + MessageConsumer consumer; + + /** The message producer for outgoing messages. */ + MessageProducer producer; + + /** The well-known or temporary destination to receive replies on. */ + Destination receiveDestination; + + /** Holds the queue implementation class for the reply queue. */ + Class queueClass; + + /** Used to hold any replies that are received outside of the context of a conversation. */ + BlockingQueue deadLetterBox = new LinkedBlockingQueue(); + + /* Used to hold conversation state on a per thread basis. */ + /* + ThreadLocal threadLocals = + new ThreadLocal() + { + protected Conversation initialValue() + { + Conversation settings = new Conversation(); + settings.conversationId = conversationIdGenerator.getAndIncrement(); + + return settings; + } + }; + */ + + /** Generates new coversation id's as needed. */ + AtomicLong conversationIdGenerator = new AtomicLong(); + + /** + * Creates a conversation helper on the specified connection with the default sending destination, and listening + * to the specified receiving destination. + * + * @param connection The connection to build the conversation helper on. + * @param receiveDestination The destination to listen to for incoming messages. This may be null to use a temporary + * queue. + * @param queueClass The queue implementation class. + * + * @throws JMSException All undelying JMSExceptions are allowed to fall through. + */ + public ConversationFactory(Connection connection, Destination receiveDestination, + Class queueClass) throws JMSException + { + log.debug("public ConversationFactory(Connection connection, Destination receiveDestination = " + receiveDestination + + ", Class queueClass = " + queueClass + "): called"); + + this.connection = connection; + this.queueClass = queueClass; + + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Check if a well-known receive destination has been provided, or use a temporary queue if not. + this.receiveDestination = (receiveDestination != null) ? receiveDestination : session.createTemporaryQueue(); + + consumer = session.createConsumer(receiveDestination); + producer = session.createProducer(null); + + consumer.setMessageListener(new Receiver()); + } + + /** + * Creates a new conversation context. + * + * @return A new conversation context. + */ + public Conversation startConversation() + { + log.debug("public Conversation startConversation(): called"); + + Conversation conversation = new Conversation(); + conversation.conversationId = conversationIdGenerator.getAndIncrement(); + + return conversation; + } + + /** + * Ensures that the reply queue for a conversation exists. + * + * @param conversationId The conversation correlation id. + */ + private void initQueueForId(long conversationId) + { + if (!idsToQueues.containsKey(conversationId)) + { + idsToQueues.put(conversationId, ReflectionUtils.newInstance(queueClass)); + } + } + + /** + * Clears the dead letter box, returning all messages that were in it. + * + * @return All messages in the dead letter box. + */ + public Collection emptyDeadLetterBox() + { + log.debug("public Collection emptyDeadLetterBox(): called"); + + Collection result = new ArrayList(); + deadLetterBox.drainTo(result); + + return result; + } + + /** + * Gets the session over which the conversation is conducted. + * + * @return The session over which the conversation is conducted. + */ + public Session getSession() + { + // Conversation settings = threadLocals.get(); + + return session; + } + + /** + * Used to hold a conversation context. This consists of a correlating id for the conversation, and a reply + * destination automatically updated to the last received reply-to destination. + */ + public class Conversation + { + /** Holds the correlation id for the context. */ + long conversationId; + + /** + * Holds the send destination for the context. This will automatically be updated to the most recently received + * reply-to destination. + */ + Destination sendDestination; + + /** + * Sends a message to the default sending location. The correlation id of the message will be assigned by this + * method, overriding any previously set value. + * + * @param sendDestination The destination to send to. This may be null to use the last received reply-to + * destination. + * @param message The message to send. + * + * @throws JMSException All undelying JMSExceptions are allowed to fall through. This will also be thrown if no + * send destination is specified and there is no most recent reply-to destination available + * to use. + */ + public void send(Destination sendDestination, Message message) throws JMSException + { + log.debug("public void send(Destination sendDestination = " + sendDestination + ", Message message = " + message + + "): called"); + + // Conversation settings = threadLocals.get(); + // long conversationId = conversationId; + message.setJMSCorrelationID(Long.toString(conversationId)); + message.setJMSReplyTo(receiveDestination); + + // Ensure that the reply queue for this conversation exists. + initQueueForId(conversationId); + + // Check if an overriding send to destination has been set or use the last reply-to if not. + Destination sendTo = null; + + if (sendDestination != null) + { + sendTo = sendDestination; + } + else if (sendDestination != null) + { + sendTo = sendDestination; + } + else + { + throw new JMSException("The send destination was specified, and no most recent reply-to available to use."); + } + + // Send the message. + synchronized (this) + { + producer.send(sendTo, message); + } + } + + /** + * Gets the next message in an ongoing conversation. This method may block until such a message is received. + * + * @return The next incoming message in the conversation. + * + * @throws JMSException All undelying JMSExceptions are allowed to fall through. Thrown if the received message + * did not have its reply-to destination set up. + */ + public Message receive() throws JMSException + { + log.debug("public Message receive(): called"); + + // Conversation settings = threadLocals.get(); + // long conversationId = settings.conversationId; + + // Ensure that the reply queue for this conversation exists. + initQueueForId(conversationId); + + BlockingQueue queue = idsToQueues.get(conversationId); + + try + { + Message result = queue.take(); + + // Keep the reply-to destination to send replies to. + sendDestination = result.getJMSReplyTo(); + + return result; + } + catch (InterruptedException e) + { + return null; + } + } + + /** + * Gets many messages in an ongoing conversation. If a limit is specified, then once that many messages are + * received they will be returned. If a timeout is specified, then all messages up to the limit, received within + * that timespan will be returned. At least one of the message count or timeout should be set to a value of + * 1 or greater. + * + * @param num The number of messages to receive, or all if this is less than 1. + * @param timeout The timeout in milliseconds to receive the messages in, or forever if this is less than 1. + * + * @return All messages received within the count limit and the timeout. + * + * @throws JMSException All undelying JMSExceptions are allowed to fall through. + */ + public Collection receiveAll(int num, long timeout) throws JMSException + { + log.debug("public Collection receiveAll(int num = " + num + ", long timeout = " + timeout + + "): called"); + + // Check that a timeout or message count was set. + if ((num < 1) && (timeout < 1)) + { + throw new IllegalArgumentException("At least one of message count (num) or timeout must be set."); + } + + // Ensure that the reply queue for this conversation exists. + initQueueForId(conversationId); + BlockingQueue queue = idsToQueues.get(conversationId); + + // Used to collect the received messages in. + Collection result = new ArrayList(); + + // Used to indicate when the timeout or message count has expired. + boolean receiveMore = true; + + int messageCount = 0; + + // Receive messages until the timeout or message count expires. + do + { + try + { + Message next = null; + + // Try to receive the message with a timeout if one has been set. + if (timeout > 0) + { + next = queue.poll(timeout, TimeUnit.MILLISECONDS); + + // Check if the timeout expired, and stop receiving if so. + if (next == null) + { + receiveMore = false; + } + } + // Receive the message without a timeout. + else + { + next = queue.take(); + } + + // Increment the message count if a message was received. + messageCount += (next != null) ? 1 : 0; + + // Check if all the requested messages were received, and stop receiving if so. + if ((num > 0) && (messageCount >= num)) + { + receiveMore = false; + } + + // Keep the reply-to destination to send replies to. + sendDestination = (next != null) ? next.getJMSReplyTo() : sendDestination; + + if (next != null) + { + result.add(next); + } + } + catch (InterruptedException e) + { + // Restore the threads interrupted status. + Thread.currentThread().interrupt(); + + // Stop receiving but return the messages received so far. + receiveMore = false; + } + } + while (receiveMore); + + return result; + } + + /** + * Completes the conversation. Any correlation id's pertaining to the conversation are no longer valid, and any + * incoming messages using them will go to the dead letter box. + */ + public void end() + { + log.debug("public void end(): called"); + + // Ensure that the thread local for the current thread is cleaned up. + // Conversation settings = threadLocals.get(); + // long conversationId = settings.conversationId; + // threadLocals.remove(); + + // Ensure that its queue is removed from the queue map. + BlockingQueue queue = idsToQueues.remove(conversationId); + + // Move any outstanding messages on the threads conversation id into the dead letter box. + queue.drainTo(deadLetterBox); + } + } + + /** + * Implements the message listener for this conversation handler. + */ + protected class Receiver implements MessageListener + { + /** + * Handles all incoming messages in the ongoing conversations. These messages are split up by correaltion id + * and placed into queues. + * + * @param message The incoming message. + */ + public void onMessage(Message message) + { + log.debug("public void onMessage(Message message = " + message + "): called"); + + try + { + Long conversationId = Long.parseLong(message.getJMSCorrelationID()); + + // Find the converstaion queue to place the message on. If there is no conversation for the message id, + // the the dead letter box queue is used. + BlockingQueue queue = idsToQueues.get(conversationId); + queue = (queue == null) ? deadLetterBox : queue; + + queue.put(message); + } + catch (JMSException e) + { + throw new RuntimeException(e); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + } +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationHelper.java b/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationHelper.java deleted file mode 100644 index 1fd1fee377..0000000000 --- a/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationHelper.java +++ /dev/null @@ -1,306 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.util; - -import java.util.*; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicLong; - -import javax.jms.*; - -/** - * A conversation helper, uses a message correlation id pattern to match up sent and received messages as a conversation - * over JMS messaging. Incoming message traffic is divided up by correlation id. Each id has a queue (behaviour dependant - * on the queue implementation). Clients of this de-multiplexer can wait on messages, defined by message correlation ids. - * - *

One use of this is as a conversation synchronizer where multiple threads are carrying out conversations over a - * multiplexed messaging route. This can be usefull, as JMS sessions are not multi-threaded. Setting up the conversation - * with synchronous queues will allow these threads to be written in a synchronous style, but with their execution order - * governed by the asynchronous message flow. For example, something like the following code could run a multi-threaded - * conversation (the conversation methods can be called many times in parallel): - * - *

- * ConversationHelper conversation = new ConversationHelper(connection, sendDesitination, replyDestination,
- *                                                          java.util.concurrent.LinkedBlockingQueue.class);
- *
- * initiateConversation()
- * {
- *  try {
- *   // Exchange greetings.
- *   conversation.send(conversation.getSession().createTextMessage("Hello."));
- *   Message greeting = conversation.receive();
- *
- *   // Exchange goodbyes.
- *   conversation.send(conversation.getSession().createTextMessage("Goodbye."));
- *   Message goodbye = conversation.receive();
- *  } finally {
- *   conversation.end();
- *  }
- * }
- *
- * respondToConversation()
- * {
- *   try {
- *   // Exchange greetings.
- *   Message greeting = conversation.receive();
- *   conversation.send(conversation.getSession().createTextMessage("Hello."));
- *
- *   // Exchange goodbyes.
- *   Message goodbye = conversation.receive();
- *   conversation.send(conversation.getSession().createTextMessage("Goodbye."));
- *  } finally {
- *   conversation.end();
- *  }
- * }
- *
- * 
- * - *

- *
CRC Card
Responsibilities Collaborations - *
Associate messages to an ongoing conversation using correlation ids. - *
Auto manage sessions for conversations. - *
Store messages not in a conversation in dead letter box. - *
- * - * @todo Non-transactional, can use shared session. Transactional, must have session per-thread. Session pool? In - * transactional mode, commits must happen before receiving, or no replies will come in. (unless there were some - * pending on the queue?). Also, having received on a particular session, must ensure that session is used for all - * subsequent sends and receive at least until the transaction is committed. So a message selector must be used - * to restrict receives on that session to prevent it picking up messages bound for other conversations. Or use - * a temporary response queue, with only that session listening to it. - * - * @todo Want something convenient that hides details. Write out some example use cases to get the best feel for - * it. Pass in connection, send destination, receive destination. Provide endConvo, send, receive - * methods. Bind corrId, session etc. on thread locals? Clean on endConvo. Provide deadLetter box, that - * uncorrelated or late messages go in. Provide time-out on wait methods, and global time-out. - * PingPongProducer provides a good use-case example (sends messages, waits for replies). - * - * @todo New correlationId on every send? or correlation id per conversation? or callers choice. - */ -public class ConversationHelper -{ - /** Holds a map from correlation id's to queues. */ - private Map> idsToQueues = new HashMap>(); - - private Session session; - private MessageProducer producer; - private MessageConsumer consumer; - - Class queueClass; - - BlockingQueue deadLetterBox = new LinkedBlockingQueue(); - - ThreadLocal threadLocals = - new ThreadLocal() - { - protected PerThreadSettings initialValue() - { - PerThreadSettings settings = new PerThreadSettings(); - settings.conversationId = conversationIdGenerator.getAndIncrement(); - - return settings; - } - }; - - /** Generates new coversation id's as needed. */ - AtomicLong conversationIdGenerator = new AtomicLong(); - - /** - * Creates a conversation helper on the specified connection with the default sending destination, and listening - * to the specified receiving destination. - * - * @param connection The connection to build the conversation helper on. - * @param sendDestination The default sending destiation for all messages. - * @param receiveDestination The destination to listen to for incoming messages. - * @param queueClass The queue implementation class. - * - * @throws JMSException All undelying JMSExceptions are allowed to fall through. - */ - public ConversationHelper(Connection connection, Destination sendDestination, Destination receiveDestination, - Class queueClass) throws JMSException - { - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - producer = session.createProducer(sendDestination); - consumer = session.createConsumer(receiveDestination); - - consumer.setMessageListener(new Receiver()); - - this.queueClass = queueClass; - } - - /** - * Sends a message to the default sending location. The correlation id of the message will be assigned by this - * method, overriding any previously set value. - * - * @param message The message to send. - * - * @throws JMSException All undelying JMSExceptions are allowed to fall through. - */ - public void send(Message message) throws JMSException - { - PerThreadSettings settings = threadLocals.get(); - long conversationId = settings.conversationId; - message.setJMSCorrelationID(Long.toString(conversationId)); - - // Ensure that the reply queue for this conversation exists. - initQueueForId(conversationId); - - producer.send(message); - } - - /** - * Ensures that the reply queue for a conversation exists. - * - * @param conversationId The conversation correlation id. - */ - private void initQueueForId(long conversationId) - { - if (!idsToQueues.containsKey(conversationId)) - { - idsToQueues.put(conversationId, ReflectionUtils.newInstance(queueClass)); - } - } - - /** - * Gets the next message in an ongoing conversation. This method may block until such a message is received. - * - * @return The next incoming message in the conversation. - */ - public Message receive() - { - PerThreadSettings settings = threadLocals.get(); - long conversationId = settings.conversationId; - - // Ensure that the reply queue for this conversation exists. - initQueueForId(conversationId); - - BlockingQueue queue = idsToQueues.get(conversationId); - - try - { - return queue.take(); - } - catch (InterruptedException e) - { - return null; - } - } - - /** - * Gets many messages in an ongoing conversation. If a limit is specified, then once that many messages are - * received they will be returned. If a timeout is specified, then all messages up to the limit, received within - * that timespan will be returned. - * - * @param num The number of messages to receive, or all if this is less than 1. - * @param timeout The timeout in milliseconds to receive the messages in, or forever if this is less than 1. - * - * @return All messages received within the count limit and the timeout. - */ - public Collection receiveAll(int num, long timeout) - { - Collection result = new ArrayList(); - - for (int i = 0; i < num; i++) - { - result.add(receive()); - } - - return result; - } - - /** - * Completes the conversation. Any open transactions are committed. Any correlation id's pertaining to the - * conversation are no longer valid, and any incoming messages using them will go to the dead letter box. - */ - public void end() - { - // Ensure that the thread local for the current thread is cleaned up. - PerThreadSettings settings = threadLocals.get(); - long conversationId = settings.conversationId; - threadLocals.remove(); - - // Ensure that its queue is removed from the queue map. - BlockingQueue queue = idsToQueues.remove(conversationId); - - // Move any outstanding messages on the threads conversation id into the dead letter box. - queue.drainTo(deadLetterBox); - } - - /** - * Clears the dead letter box, returning all messages that were in it. - * - * @return All messages in the dead letter box. - */ - public Collection emptyDeadLetterBox() - { - Collection result = new ArrayList(); - deadLetterBox.drainTo(result); - - return result; - } - - /** - * Implements the message listener for this conversation handler. - */ - protected class Receiver implements MessageListener - { - /** - * Handles all incoming messages in the ongoing conversations. These messages are split up by correaltion id - * and placed into queues. - * - * @param message The incoming message. - */ - public void onMessage(Message message) - { - try - { - Long conversationId = Long.parseLong(message.getJMSCorrelationID()); - - // Find the converstaion queue to place the message on. If there is no conversation for the message id, - // the the dead letter box queue is used. - BlockingQueue queue = idsToQueues.get(conversationId); - queue = (queue == null) ? deadLetterBox : queue; - - queue.put(message); - } - catch (JMSException e) - { - throw new RuntimeException(e); - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } - } - } - - protected class PerThreadSettings - { - /** Holds the correlation id for the current threads conversation. */ - long conversationId; - } - - public Session getSession() - { - return session; - } -} -- cgit v1.2.1