From 902481c5caf3a72538586a68cb779ddb9aa60c58 Mon Sep 17 00:00:00 2001 From: "Rafael H. Schloming" Date: Tue, 31 Jul 2007 22:34:12 +0000 Subject: Rolled back revision 561365 and commented out some broken code in ClientSession.java. The trunk should now build. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@561578 13f79535-47bb-0310-9956-ffa450edef68 --- .../docs/RunningSustainedTests.txt | 14 +- java/integrationtests/pom.xml | 6 +- .../interop/clienttestcases/TestCase1DummyRun.java | 133 --- .../interop/clienttestcases/TestCase2BasicP2P.java | 207 ----- .../clienttestcases/TestCase3BasicPubSub.java | 237 ------ .../interop/coordinator/CoordinatingTestCase.java | 263 ++++++ .../qpid/interop/coordinator/Coordinator.java | 388 +++++++++ .../interop/coordinator/InvitingTestDecorator.java | 220 +++++ .../coordinator/ListeningCoordinatorTest.java | 28 + .../coordinator/ListeningTestDecorator.java | 200 +++++ .../qpid/interop/coordinator/OptOutTestCase.java | 65 ++ .../interop/coordinator/TestClientDetails.java | 87 ++ .../qpid/interop/coordinator/XMLTestListener.java | 402 +++++++++ .../testcases/CoordinatingTestCase1DummyRun.java | 85 ++ .../testcases/CoordinatingTestCase2BasicP2P.java | 90 ++ .../CoordinatingTestCase3BasicPubSub.java | 92 +++ .../java/org/apache/qpid/interop/old/Listener.java | 291 +++++++ .../org/apache/qpid/interop/old/Publisher.java | 244 ++++++ .../qpid/interop/testcases/CircuitTestCase.java | 101 --- .../testcases/InteropTestCase1DummyRun.java | 84 -- .../testcases/InteropTestCase2BasicP2P.java | 90 -- .../testcases/InteropTestCase3BasicPubSub.java | 88 -- .../interop/testclient/InteropClientTestCase.java | 104 +++ .../apache/qpid/interop/testclient/TestClient.java | 422 ++++++++++ .../testclient/testcases/TestCase1DummyRun.java | 96 +++ .../testclient/testcases/TestCase2BasicP2P.java | 214 +++++ .../testclient/testcases/TestCase3BasicPubSub.java | 249 ++++++ .../qpid/sustained/SustainedClientTestCase.java | 905 --------------------- .../apache/qpid/sustained/SustainedTestCase.java | 126 --- .../apache/qpid/sustained/SustainedTestClient.java | 8 +- .../qpid/sustained/SustainedTestCoordinator.java | 222 +++++ .../java/org/apache/qpid/sustained/TestClient.java | 157 ++++ .../org/apache/qpid/sustained/TestCoordinator.java | 117 +++ .../distributedtesting/InteropClientTestCase.java | 101 --- .../framework/distributedtesting/TestClient.java | 377 --------- .../org/apache/qpid/util/ClasspathScanner.java | 234 ++++++ .../org/apache/qpid/util/ConversationFactory.java | 479 +++++++++++ .../org/apache/qpid/interop/connection.properties | 20 + 38 files changed, 4780 insertions(+), 2466 deletions(-) delete mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/clienttestcases/TestCase1DummyRun.java delete mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/clienttestcases/TestCase2BasicP2P.java delete mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/clienttestcases/TestCase3BasicPubSub.java create mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java create mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java create mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java create mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningCoordinatorTest.java create mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningTestDecorator.java create mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/OptOutTestCase.java create mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/TestClientDetails.java create mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/XMLTestListener.java create mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase1DummyRun.java create mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase2BasicP2P.java create mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase3BasicPubSub.java create mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/old/Listener.java create mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/old/Publisher.java delete mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/CircuitTestCase.java delete mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/InteropTestCase1DummyRun.java delete mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/InteropTestCase2BasicP2P.java delete mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/InteropTestCase3BasicPubSub.java create mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java create mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java create mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java create mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java create mode 100644 java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java delete mode 100644 java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedClientTestCase.java delete mode 100644 java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCase.java create mode 100644 java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java create mode 100644 java/integrationtests/src/main/java/org/apache/qpid/sustained/TestClient.java create mode 100644 java/integrationtests/src/main/java/org/apache/qpid/sustained/TestCoordinator.java delete mode 100644 java/integrationtests/src/main/java/org/apache/qpid/test/framework/distributedtesting/InteropClientTestCase.java delete mode 100644 java/integrationtests/src/main/java/org/apache/qpid/test/framework/distributedtesting/TestClient.java create mode 100644 java/integrationtests/src/main/java/org/apache/qpid/util/ClasspathScanner.java create mode 100644 java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java create mode 100644 java/integrationtests/src/resources/org/apache/qpid/interop/connection.properties (limited to 'java/integrationtests') diff --git a/java/integrationtests/docs/RunningSustainedTests.txt b/java/integrationtests/docs/RunningSustainedTests.txt index db4405a32d..2b37f4c5a7 100644 --- a/java/integrationtests/docs/RunningSustainedTests.txt +++ b/java/integrationtests/docs/RunningSustainedTests.txt @@ -1,17 +1,15 @@ In addition to the integration tests the framework provided by this package also allows for sustained tests to be run. Currently avaible tests: -- org.apache.qpid.sustained.SustainedClientTestCase : Pub Sub test to determine steady state throughput. +- org.apache.qpid.sustained.SustainedTestClient : Pub Sub test to determine steady state throughput. Running Tests. Run the tests as per the integration tests. -- Start a broker -- Start at least one test client [java org.apache.qpid.interop.TestClient], ensuring unique naming. - -- Start the test coordinator with the 'fanout' engine, on the sustained test case [java org.apache.qpid.test.framework.distributedtesting.Coordinator] - -- Additional Test clients can be started and joined into the running test: [java org.apache.qpid.interop.TestClient -j] - + - Start a broker + - Start at least one Client [java org.apache.qpid.sustained.TestClient], ensuring unique naming + - Start Test Controller [java org.apache.qpid.sustained.TestCoordinator] + - Additional Test clients can be started: + [java org.apache.qpid.sustained.TestClient -j org.apache.qpid.sustained.SustainedTestClient] diff --git a/java/integrationtests/pom.xml b/java/integrationtests/pom.xml index 89fd5ede28..9ccd153f54 100644 --- a/java/integrationtests/pom.xml +++ b/java/integrationtests/pom.xml @@ -40,16 +40,12 @@ + org.apache.qpid qpid-client - - org.apache.qpid - qpid-systests - - org.slf4j slf4j-log4j12 diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/clienttestcases/TestCase1DummyRun.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/clienttestcases/TestCase1DummyRun.java deleted file mode 100644 index b119d13a3d..0000000000 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/clienttestcases/TestCase1DummyRun.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.interop.clienttestcases; - -import org.apache.log4j.Logger; - -import org.apache.qpid.test.framework.distributedtesting.InteropClientTestCase; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.Session; - -/** - * Implements tet case 1, dummy run. This test case sends no test messages, it exists to confirm that the test harness - * is interacting with the coordinator correctly. - * - *

- *
CRC Card
Responsibilities Collaborations - *
Supply the name of the test case that this implements. - *
Accept/Reject invites based on test parameters. - *
Adapt to assigned roles. - *
Perform test case actions. - *
Generate test reports. - *
- */ -public class TestCase1DummyRun implements InteropClientTestCase -{ - /** Used for debugging. */ - private static final Logger log = Logger.getLogger(TestCase1DummyRun.class); - - /** - * Should provide the name of the test case that this class implements. The exact names are defined in the - * interop testing spec. - * - * @return The name of the test case that this implements. - */ - public String getName() - { - log.debug("public String getName(): called"); - - return "TC1_DummyRun"; - } - - /** - * Determines whether the test invite that matched this test case is acceptable. - * - * @param inviteMessage The invitation to accept or reject. - * - * @return true to accept the invitation, false to reject it. - * - * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through. - */ - public boolean acceptInvite(Message inviteMessage) throws JMSException - { - log.debug("public boolean acceptInvite(Message inviteMessage): called"); - - // Test parameters don't matter, accept all invites. - return true; - } - - /** - * Assigns the role to be played by this test case. The test parameters are fully specified in the - * assignment message. When this method return the test case will be ready to execute. - * - * @param role The role to be played; sender or receivers. - * @param assignRoleMessage The role assingment message, contains the full test parameters. - * - * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through. - */ - public void assignRole(Roles role, Message assignRoleMessage) throws JMSException - { - log.debug("public void assignRole(Roles role, Message assignRoleMessage): called"); - - // Do nothing, both roles are the same. - } - - /** - * Performs the test case actions. Returning from here, indicates that the sending role has completed its test. - */ - public void start() - { - log.debug("public void start(): called"); - - // Do nothing. - } - - /** - * Gets a report on the actions performed by the test case in its assigned role. - * - * @param session The session to create the report message in. - * - * @return The report message. - * - * @throws JMSException Any JMSExceptions resulting from creating the report are allowed to fall through. - */ - public Message getReport(Session session) throws JMSException - { - log.debug("public Message getReport(Session session): called"); - - // Generate a dummy report, the coordinator expects a report but doesn't care what it is. - return session.createTextMessage("Dummy Run, Ok."); - } - - /** - * Handles incoming test messages. Does nothing. - * - * @param message The incoming test message. - */ - public void onMessage(Message message) - { - log.debug("public void onMessage(Message message = " + message + "): called"); - - // Ignore any messages. - } -} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/clienttestcases/TestCase2BasicP2P.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/clienttestcases/TestCase2BasicP2P.java deleted file mode 100644 index 080bd846ee..0000000000 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/clienttestcases/TestCase2BasicP2P.java +++ /dev/null @@ -1,207 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.interop.clienttestcases; - -import org.apache.log4j.Logger; - -import org.apache.qpid.test.framework.distributedtesting.InteropClientTestCase; -import org.apache.qpid.test.framework.distributedtesting.TestClient; -import org.apache.qpid.test.framework.TestUtils; - -import javax.jms.*; - -/** - * Implements test case 2, basic P2P. Sends/received a specified number of messages to a specified route on the - * default direct exchange. Produces reports on the actual number of messages sent/received. - * - *

- *
CRC Card
Responsibilities Collaborations - *
Supply the name of the test case that this implements. - *
Accept/Reject invites based on test parameters. - *
Adapt to assigned roles. - *
Send required number of test messages. - *
Generate test reports. - *
- */ -public class TestCase2BasicP2P implements InteropClientTestCase -{ - /** Used for debugging. */ - private static final Logger log = Logger.getLogger(TestCase2BasicP2P.class); - - /** Holds the count of test messages received. */ - private int messageCount; - - /** The role to be played by the test. */ - private Roles role; - - /** The number of test messages to send. */ - private int numMessages; - - /** The connection to send the test messages on. */ - private Connection connection; - - /** The session to send the test messages on. */ - private Session session; - - /** The producer to send the test messages with. */ - MessageProducer producer; - - /** - * Should provide the name of the test case that this class implements. The exact names are defined in the - * interop testing spec. - * - * @return The name of the test case that this implements. - */ - public String getName() - { - log.debug("public String getName(): called"); - - return "TC2_BasicP2P"; - } - - /** - * Determines whether the test invite that matched this test case is acceptable. - * - * @param inviteMessage The invitation to accept or reject. - * - * @return true to accept the invitation, false to reject it. - * - * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through. - */ - public boolean acceptInvite(Message inviteMessage) throws JMSException - { - log.debug("public boolean acceptInvite(Message inviteMessage = " + inviteMessage + "): called"); - - // All invites are acceptable. - return true; - } - - /** - * Assigns the role to be played by this test case. The test parameters are fully specified in the - * assignment message. When this method return the test case will be ready to execute. - * - * @param role The role to be played; sender or receivers. - * - * @param assignRoleMessage The role assingment message, contains the full test parameters. - * - * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through. - */ - public void assignRole(Roles role, Message assignRoleMessage) throws JMSException - { - log.debug("public void assignRole(Roles role = " + role + ", Message assignRoleMessage = " + assignRoleMessage - + "): called"); - - // Reset the message count for a new test. - messageCount = 0; - - // Take note of the role to be played. - this.role = role; - - // Create a new connection to pass the test messages on. - connection = TestUtils.createConnection(TestClient.testContextProperties); - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - // Extract and retain the test parameters. - numMessages = assignRoleMessage.getIntProperty("P2P_NUM_MESSAGES"); - Destination sendDestination = session.createQueue(assignRoleMessage.getStringProperty("P2P_QUEUE_AND_KEY_NAME")); - - log.debug("numMessages = " + numMessages); - log.debug("sendDestination = " + sendDestination); - log.debug("role = " + role); - - switch (role) - { - // Check if the sender role is being assigned, and set up a message producer if so. - case SENDER: - producer = session.createProducer(sendDestination); - break; - - // Otherwise the receivers role is being assigned, so set this up to listen for messages. - case RECEIVER: - MessageConsumer consumer = session.createConsumer(sendDestination); - consumer.setMessageListener(this); - break; - } - - connection.start(); - } - - /** - * Performs the test case actions. Returning from here, indicates that the sending role has completed its test. - * - * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through. - */ - public void start() throws JMSException - { - log.debug("public void start(): called"); - - // Check that the sender role is being performed. - if (role.equals(Roles.SENDER)) - { - Message testMessage = session.createTextMessage("test"); - - for (int i = 0; i < numMessages; i++) - { - producer.send(testMessage); - - // Increment the message count. - messageCount++; - } - } - } - - /** - * Gets a report on the actions performed by the test case in its assigned role. - * - * @param session The session to create the report message in. - * - * @return The report message. - * - * @throws JMSException Any JMSExceptions resulting from creating the report are allowed to fall through. - */ - public Message getReport(Session session) throws JMSException - { - log.debug("public Message getReport(Session session): called"); - - // Close the test connection. - connection.close(); - - // Generate a report message containing the count of the number of messages passed. - Message report = session.createMessage(); - report.setStringProperty("CONTROL_TYPE", "REPORT"); - report.setIntProperty("MESSAGE_COUNT", messageCount); - - return report; - } - - /** - * Counts incoming test messages. - * - * @param message The incoming test message. - */ - public void onMessage(Message message) - { - log.debug("public void onMessage(Message message = " + message + "): called"); - - // Increment the message count. - messageCount++; - } -} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/clienttestcases/TestCase3BasicPubSub.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/clienttestcases/TestCase3BasicPubSub.java deleted file mode 100644 index a11d045e89..0000000000 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/clienttestcases/TestCase3BasicPubSub.java +++ /dev/null @@ -1,237 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.interop.clienttestcases; - -import org.apache.log4j.Logger; - -import org.apache.qpid.test.framework.distributedtesting.InteropClientTestCase; -import org.apache.qpid.test.framework.distributedtesting.TestClient; -import org.apache.qpid.test.framework.TestUtils; - -import javax.jms.*; - -/** - * Implements test case 3, basic pub/sub. Sends/received a specified number of messages to a specified route on the - * default topic exchange, using the specified number of receivers connections. Produces reports on the actual number of - * messages sent/received. - * - *

- *
CRC Card
Responsibilities Collaborations - *
Supply the name of the test case that this implements. - *
Accept/Reject invites based on test parameters. - *
Adapt to assigned roles. - *
Send required number of test messages using pub/sub. - *
Generate test reports. - *
- */ -public class TestCase3BasicPubSub implements InteropClientTestCase -{ - /** Used for debugging. */ - private static final Logger log = Logger.getLogger(TestCase3BasicPubSub.class); - - /** Holds the count of test messages received. */ - private int messageCount; - - /** The role to be played by the test. */ - private Roles role; - - /** The number of test messages to send. */ - private int numMessages; - - /** The connections to send/receive the test messages on. */ - private Connection[] connection; - - /** The sessions to send/receive the test messages on. */ - private Session[] session; - - /** The producer to send the test messages with. */ - MessageProducer producer; - - /** - * Should provide the name of the test case that this class implements. The exact names are defined in the - * interop testing spec. - * - * @return The name of the test case that this implements. - */ - public String getName() - { - log.debug("public String getName(): called"); - - return "TC3_BasicPubSub"; - } - - /** - * Determines whether the test invite that matched this test case is acceptable. - * - * @param inviteMessage The invitation to accept or reject. - * - * @return true to accept the invitation, false to reject it. - * - * @throws javax.jms.JMSException Any JMSException resulting from reading the message are allowed to fall through. - */ - public boolean acceptInvite(Message inviteMessage) throws JMSException - { - log.debug("public boolean acceptInvite(Message inviteMessage = " + inviteMessage + "): called"); - - // All invites are acceptable. - return true; - } - - /** - * Assigns the role to be played by this test case. The test parameters are fully specified in the - * assignment message. When this method return the test case will be ready to execute. - * - * @param role The role to be played; sender or receivers. - * - * @param assignRoleMessage The role assingment message, contains the full test parameters. - * - * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through. - */ - public void assignRole(Roles role, Message assignRoleMessage) throws JMSException - { - log.debug("public void assignRole(Roles role = " + role + ", Message assignRoleMessage = " + assignRoleMessage - + "): called"); - - // Reset the message count for a new test. - messageCount = 0; - - // Take note of the role to be played. - this.role = role; - - // Extract and retain the test parameters. - numMessages = assignRoleMessage.getIntProperty("PUBSUB_NUM_MESSAGES"); - int numReceivers = assignRoleMessage.getIntProperty("PUBSUB_NUM_RECEIVERS"); - String sendKey = assignRoleMessage.getStringProperty("PUBSUB_KEY"); - - log.debug("numMessages = " + numMessages); - log.debug("numReceivers = " + numReceivers); - log.debug("sendKey = " + sendKey); - log.debug("role = " + role); - - switch (role) - { - // Check if the sender role is being assigned, and set up a single message producer if so. - case SENDER: - // Create a new connection to pass the test messages on. - connection = new Connection[1]; - session = new Session[1]; - - connection[0] = TestUtils.createConnection(TestClient.testContextProperties); - session[0] = connection[0].createSession(false, Session.AUTO_ACKNOWLEDGE); - - // Extract and retain the test parameters. - Destination sendDestination = session[0].createTopic(sendKey); - - producer = session[0].createProducer(sendDestination); - break; - - // Otherwise the receivers role is being assigned, so set this up to listen for messages on the required number - // of receivers connections. - case RECEIVER: - // Create the required number of receivers connections. - connection = new Connection[numReceivers]; - session = new Session[numReceivers]; - - for (int i = 0; i < numReceivers; i++) - { - connection[i] = TestUtils.createConnection(TestClient.testContextProperties); - session[i] = connection[i].createSession(false, Session.AUTO_ACKNOWLEDGE); - - sendDestination = session[i].createTopic(sendKey); - - MessageConsumer consumer = session[i].createConsumer(sendDestination); - consumer.setMessageListener(this); - } - - break; - } - - // Start all the connection dispatcher threads running. - for (Connection conn : connection) - { - conn.start(); - } - } - - /** - * Performs the test case actions. Returning from here, indicates that the sending role has completed its test. - * - * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through. - */ - public void start() throws JMSException - { - log.debug("public void start(): called"); - - // Check that the sender role is being performed. - if (role.equals(Roles.SENDER)) - { - Message testMessage = session[0].createTextMessage("test"); - - for (int i = 0; i < numMessages; i++) - { - producer.send(testMessage); - - // Increment the message count. - messageCount++; - } - } - } - - /** - * Gets a report on the actions performed by the test case in its assigned role. - * - * @param session The session to create the report message in. - * - * @return The report message. - * - * @throws JMSException Any JMSExceptions resulting from creating the report are allowed to fall through. - */ - public Message getReport(Session session) throws JMSException - { - log.debug("public Message getReport(Session session): called"); - - // Close the test connections. - for (Connection conn : connection) - { - conn.close(); - } - - // Generate a report message containing the count of the number of messages passed. - Message report = session.createMessage(); - report.setStringProperty("CONTROL_TYPE", "REPORT"); - report.setIntProperty("MESSAGE_COUNT", messageCount); - - return report; - } - - /** - * Counts incoming test messages. - * - * @param message The incoming test message. - */ - public void onMessage(Message message) - { - log.debug("public void onMessage(Message message = " + message + "): called"); - - // Increment the message count. - messageCount++; - } -} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java new file mode 100644 index 0000000000..d2042be741 --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java @@ -0,0 +1,263 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.interop.coordinator; + +import junit.framework.TestCase; + +import org.apache.log4j.Logger; + +import org.apache.qpid.util.ConversationFactory; + +import javax.jms.*; + +import java.util.Map; + +/** + * A CoordinatingTestCase is a JUnit test case extension that knows how to coordinate test clients that take part in a + * test case as defined in the interop testing specification + * (http://cwiki.apache.org/confluence/display/qpid/Interop+Testing+Specification). + * + *

The real logic of the test cases built on top of this, is embeded in the comparison of the sender and receiver + * reports. An example test method might look like: + * + *

+ * public void testExample()
+ * {
+ *   Properties testConfig = new Properties();
+ *   testConfig.add("TEST_CASE", "example");
+ *   ...
+ *
+ *   Report[] reports = sequenceTest(testConfig);
+ *
+ *   // Compare sender and receiver reports.
+ *   if (report[0] ... report[1] ...)
+ *   {
+ *     Assert.fail("Sender and receiver reports did not match up.");
+ *   }
+ * }
+ *
+ * 
+ * + *

+ *
CRC Card
Responsibilities Collaborations + *
Accept notification of test case participants. {@link InvitingTestDecorator} + *
Accpet JMS Connection to carry out the coordination over. + *
Coordinate the test sequence amongst participants. {@link ConversationFactory} + *
Supply test properties + *
+ */ +public abstract class CoordinatingTestCase extends TestCase +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(CoordinatingTestCase.class); + + /** Holds the contact details for the sending test client. */ + protected TestClientDetails sender; + + /** Holds the contact details for the receving test client. */ + protected TestClientDetails receiver; + + /** Holds the conversation factory over which to coordinate the test. */ + protected ConversationFactory conversationFactory; + + /** + * Creates a new coordinating test case with the specified name. + * + * @param name The test case name. + */ + public CoordinatingTestCase(String name) + { + super(name); + } + + /** + * Sets the sender test client to coordinate the test with. + * + * @param sender The contact details of the sending client in the test. + */ + public void setSender(TestClientDetails sender) + { + log.debug("public void setSender(TestClientDetails sender = " + sender + "): called"); + + this.sender = sender; + } + + /** + * Sets the receiving test client to coordinate the test with. + * + * @param receiver The contact details of the sending client in the test. + */ + public void setReceiver(TestClientDetails receiver) + { + log.debug("public void setReceiver(TestClientDetails receiver = " + receiver + "): called"); + + this.receiver = receiver; + } + + /** + * Supplies the sending test client. + * + * @return The sending test client. + */ + public TestClientDetails getSender() + { + return sender; + } + + /** + * Supplies the receiving test client. + * + * @return The receiving test client. + */ + public TestClientDetails getReceiver() + { + return receiver; + } + + /** + * Returns the name of the current test method of this test class, with the sending and receiving client names + * appended on to it, so that the resulting name unqiuely identifies the test and the clients that participated + * in it. + * + * @return The unique test and client name. + */ + public String getName() + { + if ((sender == null) || (receiver == null)) + { + return super.getName(); + } + else + { + return super.getName() + "_sender_" + sender.clientName + "_receiver_" + receiver.clientName; + } + } + + /** + * Should provide a translation from the junit method name of a test to its test case name as defined in the + * interop testing specification. For example the method "testP2P" might map onto the interop test case name + * "TC2_BasicP2P". + * + * @param methodName The name of the JUnit test method. + * + * @return The name of the corresponding interop test case. + */ + public abstract String getTestCaseNameForTestMethod(String methodName); + + /** + * Accepts the conversation factory over which to hold the test coordinating conversation. + * + * @param conversationFactory The conversation factory to coordinate the test over. + */ + public void setConversationFactory(ConversationFactory conversationFactory) + { + this.conversationFactory = conversationFactory; + } + + /** + * Holds a test coordinating conversation with the test clients. This is the basic implementation of the inner + * loop of Use Case 5. It consists of assigning the test roles, begining the test and gathering the test reports + * from the participants. + * + * @param testProperties The test case definition. + * + * @return The test results from the senders and receivers. + * + * @throws JMSException All underlying JMSExceptions are allowed to fall through. + */ + protected Message[] sequenceTest(Map testProperties) throws JMSException + { + log.debug("protected Message[] sequenceTest(Object... testProperties = " + testProperties + "): called"); + + Session session = conversationFactory.getSession(); + Destination senderControlTopic = session.createTopic(sender.privateControlKey); + Destination receiverControlTopic = session.createTopic(receiver.privateControlKey); + + ConversationFactory.Conversation senderConversation = conversationFactory.startConversation(); + ConversationFactory.Conversation receiverConversation = conversationFactory.startConversation(); + + // Assign the sender role to the sending test client. + Message assignSender = conversationFactory.getSession().createMessage(); + setPropertiesOnMessage(assignSender, testProperties); + assignSender.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE"); + assignSender.setStringProperty("ROLE", "SENDER"); + + senderConversation.send(senderControlTopic, assignSender); + + // Assign the receiver role the receiving client. + Message assignReceiver = session.createMessage(); + setPropertiesOnMessage(assignReceiver, testProperties); + assignReceiver.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE"); + assignReceiver.setStringProperty("ROLE", "RECEIVER"); + + receiverConversation.send(receiverControlTopic, assignReceiver); + + // Wait for the senders and receivers to confirm their roles. + senderConversation.receive(); + receiverConversation.receive(); + + // Start the test. + Message start = session.createMessage(); + start.setStringProperty("CONTROL_TYPE", "START"); + + senderConversation.send(senderControlTopic, start); + + // Wait for the test sender to return its report. + Message senderReport = senderConversation.receive(); + + try + { + Thread.sleep(500); + } + catch (InterruptedException e) + { } + + // Ask the receiver for its report. + Message statusRequest = session.createMessage(); + statusRequest.setStringProperty("CONTROL_TYPE", "STATUS_REQUEST"); + + receiverConversation.send(receiverControlTopic, statusRequest); + + // Wait for the receiver to send its report. + Message receiverReport = receiverConversation.receive(); + + return new Message[] { senderReport, receiverReport }; + } + + /** + * Sets properties of different types on a JMS Message. + * + * @param message The message to set properties on. + * @param properties The property name/value pairs to set. + * + * @throws JMSException All underlying JMSExceptions are allowed to fall through. + */ + public void setPropertiesOnMessage(Message message, Map properties) throws JMSException + { + for (Map.Entry entry : properties.entrySet()) + { + String name = entry.getKey(); + Object value = entry.getValue(); + + message.setObjectProperty(name, value); + } + } +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java new file mode 100644 index 0000000000..6eec20769a --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java @@ -0,0 +1,388 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.interop.coordinator; + +import java.io.*; +import java.util.*; +import java.util.concurrent.LinkedBlockingQueue; +import javax.jms.*; +import junit.framework.Test; +import junit.framework.TestResult; +import junit.framework.TestSuite; +import org.apache.log4j.Logger; +import org.apache.qpid.interop.coordinator.testcases.CoordinatingTestCase1DummyRun; +import org.apache.qpid.interop.coordinator.testcases.CoordinatingTestCase2BasicP2P; +import org.apache.qpid.interop.coordinator.testcases.CoordinatingTestCase3BasicPubSub; +import org.apache.qpid.interop.testclient.TestClient; +import org.apache.qpid.util.CommandLineParser; +import org.apache.qpid.util.ConversationFactory; +import org.apache.qpid.util.PrettyPrintingUtils; +import uk.co.thebadgerset.junit.extensions.TKTestResult; +import uk.co.thebadgerset.junit.extensions.TKTestRunner; +import uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator; +import uk.co.thebadgerset.junit.extensions.util.TestContextProperties; + +/** + *

Implements the coordinator client described in the interop testing specification + * (http://cwiki.apache.org/confluence/display/qpid/Interop+Testing+Specification). This coordinator is built on + * top of the JUnit testing framework. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Find out what test clients are available. {@link ConversationFactory} + *
Decorate available tests to run all available clients. {@link InvitingTestDecorator} + *
Attach XML test result logger. + *
Terminate the interop testing framework. + *
+ */ +public class Coordinator extends TKTestRunner +{ + private static final Logger log = Logger.getLogger(Coordinator.class); + + public static final String DEFAULT_CONNECTION_PROPS_RESOURCE = "org/apache/qpid/interop/connection.properties"; + + /** Holds the URL of the broker to coordinate the tests on. */ + protected String brokerUrl; + + /** Holds the virtual host to coordinate the tests on. If null, then the default virtual host is used. */ + protected String virtualHost; + + /** Holds the list of all clients that enlisted, when the compulsory invite was issued. */ + protected Set enlistedClients = new HashSet(); + + /** Holds the conversation helper for the control conversation. */ + protected ConversationFactory conversationFactory; + + /** Holds the connection that the coordinating messages are sent over. */ + protected Connection connection; + + /** + * Holds the name of the class of the test currently being run. Ideally passed into the {@link #createTestResult} + * method, but as the signature is already fixed for this, the current value gets pushed here as a member variable. + */ + protected String currentTestClassName; + + /** Holds the path of the directory to output test results too, if one is defined. */ + protected static String _reportDir; + + /** + * Creates an interop test coordinator on the specified broker and virtual host. + * + * @param brokerUrl The URL of the broker to connect to. + * @param virtualHost The virtual host to run all tests on. Optional, may be null. + */ + public Coordinator(String brokerUrl, String virtualHost) + { + log.debug("Coordinator(String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost + "): called"); + + // Retain the connection parameters. + this.brokerUrl = brokerUrl; + this.virtualHost = virtualHost; + } + + /** + * The entry point for the interop test coordinator. This client accepts the following command line arguments: + * + *

+ *
-b The broker URL. Mandatory. + *
-h The virtual host. Optional. + *
name=value Trailing argument define name/value pairs. Added to system properties. Optional. + *
+ * + * @param args The command line arguments. + */ + public static void main(String[] args) + { + try + { + // Use the command line parser to evaluate the command line with standard handling behaviour (print errors + // and usage then exit if there are errors). + Properties options = + CommandLineParser.processCommandLine(args, + new CommandLineParser( + new String[][] + { + {"b", "The broker URL.", "broker", "false"}, + {"h", "The virtual host to use.", "virtual host", "false"}, + {"o", "The name of the directory to output test timings to.", "dir", "false"} + })); + + // Extract the command line options. + String brokerUrl = options.getProperty("b"); + String virtualHost = options.getProperty("h"); + _reportDir = options.getProperty("o"); + _reportDir = (_reportDir == null) ? "." : _reportDir; + + // Scan for available test cases using a classpath scanner. + Collection> testCaseClasses = + new ArrayList>(); + // ClasspathScanner.getMatches(CoordinatingTestCase.class, "^Test.*", true); + // Hard code the test classes till the classpath scanner is fixed. + Collections.addAll(testCaseClasses, + CoordinatingTestCase1DummyRun.class, + CoordinatingTestCase2BasicP2P.class, + CoordinatingTestCase3BasicPubSub.class); + + // Check that some test classes were actually found. + if (testCaseClasses.isEmpty()) + { + throw new RuntimeException( + "No test classes implementing CoordinatingTestCase were found on the class path."); + } + + int i = 0; + String[] testClassNames = new String[testCaseClasses.size()]; + + for (Class testClass : testCaseClasses) + { + testClassNames[i++] = testClass.getName(); + } + + // Create a coordinator and begin its test procedure. + Coordinator coordinator = new Coordinator(brokerUrl, virtualHost); + + boolean failure = false; + + TestResult testResult = coordinator.start(testClassNames); + + if (failure) + { + System.exit(FAILURE_EXIT); + } + else + { + System.exit(SUCCESS_EXIT); + } + } + catch (Exception e) + { + System.err.println(e.getMessage()); + log.error("Top level handler caught execption.", e); + System.exit(EXCEPTION_EXIT); + } + } + + /** + * Starts all of the test classes to be run by this coordinator running. + * + * @param testClassNames An array of all the coordinating test case implementations. + * + * @return A JUnit TestResult to run the tests with. + * + * @throws Exception Any underlying exceptions are allowed to fall through, and fail the test process. + */ + public TestResult start(String[] testClassNames) throws Exception + { + log.debug("public TestResult start(String[] testClassNames = " + PrettyPrintingUtils.printArray(testClassNames) + + ": called"); + + // Connect to the broker. + connection = TestClient.createConnection(DEFAULT_CONNECTION_PROPS_RESOURCE, brokerUrl, virtualHost); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Destination controlTopic = session.createTopic("iop.control"); + Destination responseQueue = session.createQueue("coordinator"); + + conversationFactory = new ConversationFactory(connection, responseQueue, LinkedBlockingQueue.class); + ConversationFactory.Conversation conversation = conversationFactory.startConversation(); + + connection.start(); + + // Broadcast the compulsory invitation to find out what clients are available to test. + Message invite = session.createMessage(); + invite.setStringProperty("CONTROL_TYPE", "INVITE"); + invite.setJMSReplyTo(responseQueue); + + conversation.send(controlTopic, invite); + + // Wait for a short time, to give test clients an opportunity to reply to the invitation. + Collection enlists = conversation.receiveAll(0, 3000); + + enlistedClients = extractEnlists(enlists); + + // Run the test in the suite using JUnit. + TestResult result = null; + + for (String testClassName : testClassNames) + { + // Record the current test class, so that the test results can be output to a file incorporating this name. + this.currentTestClassName = testClassName; + + result = super.start(new String[]{testClassName}); + } + + // At this point in time, all tests have completed. Broadcast the shutdown message. + Message terminate = session.createMessage(); + terminate.setStringProperty("CONTROL_TYPE", "TERMINATE"); + + conversation.send(controlTopic, terminate); + + return result; + } + + /** + * For a collection of enlist messages, this method pulls out of the client details for the enlisting clients. + * + * @param enlists The enlist messages. + * + * @return A set of enlisting clients, extracted from the enlist messages. + * + * @throws JMSException Any underlying JMSException is allowed to fall through. + */ + public static Set extractEnlists(Collection enlists) throws JMSException + { + log.debug("public static Set extractEnlists(Collection enlists = " + enlists + + "): called"); + + Set enlistedClients = new HashSet(); + + // Retain the list of all available clients. + for (Message enlist : enlists) + { + TestClientDetails clientDetails = new TestClientDetails(); + clientDetails.clientName = enlist.getStringProperty("CLIENT_NAME"); + clientDetails.privateControlKey = enlist.getStringProperty("CLIENT_PRIVATE_CONTROL_KEY"); + + enlistedClients.add(clientDetails); + } + + return enlistedClients; + } + + /** + * Runs a test or suite of tests, using the super class implemenation. This method wraps the test to be run + * in any test decorators needed to add in the coordinators ability to invite test clients to participate in + * tests. + * + * @param test The test to run. + * @param wait Undocumented. Nothing in the JUnit javadocs to say what this is for. + * + * @return The results of the test run. + */ + public TestResult doRun(Test test, boolean wait) + { + log.debug("public TestResult doRun(Test \"" + test + "\", boolean " + wait + "): called"); + + // Wrap all tests in the test suite with WrappedSuiteTestDecorators. This is quite ugly and a bit baffling, + // but the reason it is done is because the JUnit implementation of TestDecorator has some bugs in it. + WrappedSuiteTestDecorator targetTest = null; + + if (test instanceof TestSuite) + { + log.debug("targetTest is a TestSuite"); + + TestSuite suite = (TestSuite) test; + + int numTests = suite.countTestCases(); + log.debug("There are " + numTests + " in the suite."); + + for (int i = 0; i < numTests; i++) + { + Test nextTest = suite.testAt(i); + log.debug("suite.testAt(" + i + ") = " + nextTest); + + if (nextTest instanceof CoordinatingTestCase) + { + log.debug("nextTest is a CoordinatingTestCase"); + } + } + + targetTest = new WrappedSuiteTestDecorator(suite); + log.debug("Wrapped with a WrappedSuiteTestDecorator."); + } + // Wrap the tests in an inviting test decorator, to perform the invite/test cycle. + + targetTest = newTestDecorator(targetTest, enlistedClients, conversationFactory, connection); + + TestSuite suite = new TestSuite(); + suite.addTest(targetTest); + + // Wrap the tests in a scaled test decorator to them them as a 'batch' in one thread. + // targetTest = new ScaledTestDecorator(targetTest, new int[] { 1 }); + + return super.doRun(suite, wait); + } + + protected WrappedSuiteTestDecorator newTestDecorator(WrappedSuiteTestDecorator targetTest, Set enlistedClients, ConversationFactory conversationFactory, Connection connection) + { + return new InvitingTestDecorator(targetTest, enlistedClients, conversationFactory, connection); + } + + /** + * Creates the TestResult object to be used for test runs. + * + * @return An instance of the test result object. + */ + protected TestResult createTestResult() + { + log.debug("protected TestResult createTestResult(): called"); + + TKTestResult result = new TKTestResult(fPrinter.getWriter(), delay, verbose, testCaseName); + + // Check if a directory to output reports to has been specified and attach test listeners if so. + if (_reportDir != null) + { + // Create the report directory if it does not already exist. + File reportDirFile = new File(_reportDir); + + if (!reportDirFile.exists()) + { + reportDirFile.mkdir(); + } + + // Create the timings file (make the name of this configurable as a command line parameter). + Writer timingsWriter = null; + + try + { + File timingsFile = new File(reportDirFile, "TEST." + currentTestClassName + ".xml"); + timingsWriter = new BufferedWriter(new FileWriter(timingsFile), 20000); + } + catch (IOException e) + { + throw new RuntimeException("Unable to create the log file to write test results to: " + e, e); + } + + // Set up a CSV results listener to output the timings to the results file. + XMLTestListener listener = new XMLTestListener(timingsWriter, currentTestClassName); + result.addListener(listener); + result.addTKTestListener(listener); + + // Register the results listeners shutdown hook to flush its data if the test framework is shutdown + // prematurely. + // registerShutdownHook(listener); + + // Record the start time of the batch. + // result.notifyStartBatch(); + + // At this point in time the test class has been instantiated, giving it an opportunity to read its parameters. + // Inform any test listers of the test properties. + result.notifyTestProperties(TestContextProperties.getAccessedProps()); + } + + return result; + } + + public void setReportDir(String reportDir) + { + _reportDir = reportDir; + } +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java new file mode 100644 index 0000000000..8695f7f66f --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java @@ -0,0 +1,220 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.interop.coordinator; + +import java.util.*; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; + +import junit.framework.Test; +import junit.framework.TestResult; + +import org.apache.log4j.Logger; + +import org.apache.qpid.util.ConversationFactory; + +import uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator; + +/** + *

+ *
CRC Card
Responsibilities Collaborations + *
Broadcast test invitations and collect enlists. {@link ConversationFactory}. + *
Output test failures for clients unwilling to run the test case. {@link Coordinator} + *
Execute coordinated test cases. {@link CoordinatingTestCase} + *
+ */ +public class InvitingTestDecorator extends WrappedSuiteTestDecorator +{ + private static final Logger log = Logger.getLogger(InvitingTestDecorator.class); + + /** Holds the contact information for all test clients that are available and that may take part in the test. */ + Set allClients; + + /** Holds the conversation helper for the control level conversation for coordinating the test through. */ + ConversationFactory conversationFactory; + + /** Holds the connection that the control conversation is held over. */ + Connection connection; + + /** Holds the underlying {@link CoordinatingTestCase}s that this decorator wraps. */ + WrappedSuiteTestDecorator testSuite; + + /** + * Creates a wrapped suite test decorator from another one. + * + * @param suite The test suite. + * @param availableClients The list of all clients that responded to the compulsory invite. + * @param controlConversation The conversation helper for the control level, test coordination conversation. + * @param controlConnection The connection that the coordination messages are sent over. + */ + public InvitingTestDecorator(WrappedSuiteTestDecorator suite, Set availableClients, + ConversationFactory controlConversation, Connection controlConnection) + { + super(suite); + + log.debug("public InvitingTestDecorator(WrappedSuiteTestDecorator suite, Set allClients = " + + availableClients + ", ConversationHelper controlConversation = " + controlConversation + "): called"); + + testSuite = suite; + allClients = availableClients; + conversationFactory = controlConversation; + connection = controlConnection; + } + + /** + * Broadcasts a test invitation and accetps enlisting from participating clients. The wrapped test case is + * then repeated for every combination of test clients (provided the wrapped test case extends + * {@link CoordinatingTestCase}. + * + *

Any JMSExceptions during the invite/enlist conversation will be allowed to fall through as runtime exceptions, + * resulting in the non-completion of the test run. + * + * @todo Better error recovery for failure of the invite/enlist conversation could be added. + * + * @param testResult The the results object to monitor the test results with. + */ + public void run(TestResult testResult) + { + log.debug("public void run(TestResult testResult): called"); + + Collection tests = testSuite.getAllUnderlyingTests(); + + for (Test test : tests) + { + CoordinatingTestCase coordTest = (CoordinatingTestCase) test; + + // Broadcast the invitation to find out what clients are available to test. + Set enlists; + try + { + Message invite = conversationFactory.getSession().createMessage(); + Destination controlTopic = conversationFactory.getSession().createTopic("iop.control"); + ConversationFactory.Conversation conversation = conversationFactory.startConversation(); + + invite.setStringProperty("CONTROL_TYPE", "INVITE"); + invite.setStringProperty("TEST_NAME", coordTest.getTestCaseNameForTestMethod(coordTest.getName())); + + conversation.send(controlTopic, invite); + + // Wait for a short time, to give test clients an opportunity to reply to the invitation. + Collection replies = conversation.receiveAll(allClients.size(), 3000); + enlists = Coordinator.extractEnlists(replies); + } + catch (JMSException e) + { + throw new RuntimeException("There was a JMSException during the invite/enlist conversation.", e); + } + + // Compare the list of willing clients to the list of all available. + Set optOuts = new HashSet(allClients); + optOuts.removeAll(enlists); + + // Output test failures for clients that will not particpate in the test. + Set> failPairs = allPairs(optOuts, allClients); + + for (List failPair : failPairs) + { + CoordinatingTestCase failTest = new OptOutTestCase("testOptOut"); + failTest.setSender(failPair.get(0)); + failTest.setReceiver(failPair.get(1)); + + failTest.run(testResult); + } + + // Loop over all combinations of clients, willing to run the test. + Set> enlistedPairs = allPairs(enlists, enlists); + + for (List enlistedPair : enlistedPairs) + { + // Set the sending and receiving client details on the test case. + coordTest.setSender(enlistedPair.get(0)); + coordTest.setReceiver(enlistedPair.get(1)); + + // Pass down the connection to hold the coordination conversation over. + coordTest.setConversationFactory(conversationFactory); + + // Execute the test case. + coordTest.run(testResult); + } + } + } + + /** + * Prints a string summarizing this test decorator, mainly for debugging purposes. + * + * @return String representation for debugging purposes. + */ + public String toString() + { + return "InvitingTestDecorator: [ testSuite = " + testSuite + " ]"; + } + + /** + * Produces all pairs of combinations of elements from two sets. The ordering of the elements in the pair is + * important, that is the pair is distinct from ; both pairs are generated. For any element, i, in + * both the left and right sets, the reflexive pair is not generated. + * + * @param left The left set. + * @param right The right set. + * + * @return All pairs formed from the permutations of all elements of the left and right sets. + */ + private Set> allPairs(Set left, Set right) + { + log.debug("private Set> allPairs(Set left = " + left + ", Set right = " + right + "): called"); + + Set> results = new HashSet>(); + + // Form all pairs from left to right. + // Form all pairs from right to left. + for (E le : left) + { + for (E re : right) + { + if (!le.equals(re)) + { + results.add(new Pair(le, re)); + results.add(new Pair(re, le)); + } + } + } + + log.debug("results = " + results); + + return results; + } + + /** + * A simple implementation of a pair, using a list. + */ + private class Pair extends ArrayList + { + public Pair(T first, T second) + { + super(); + super.add(first); + super.add(second); + } + } +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningCoordinatorTest.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningCoordinatorTest.java new file mode 100644 index 0000000000..1b4461f8c2 --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningCoordinatorTest.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.interop.coordinator; + +import javax.jms.Message; + +public interface ListeningCoordinatorTest +{ + public void latejoin(Message message); +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningTestDecorator.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningTestDecorator.java new file mode 100644 index 0000000000..4312dfbcc6 --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningTestDecorator.java @@ -0,0 +1,200 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.interop.coordinator; + +import junit.framework.Test; +import junit.framework.TestResult; +import org.apache.log4j.Logger; +import org.apache.qpid.util.ConversationFactory; +import uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import java.util.Collection; +import java.util.Iterator; +import java.util.Set; + +/** + *

CRC Card
Responsibilities Collaborations
Broadcast test + * invitations and collect enlists. {@link ConversationFactory}.
Output test failures for clients + * unwilling to run the test case. {@link Coordinator}
Execute coordinated test cases. {@link + * CoordinatingTestCase}
+ */ +public class ListeningTestDecorator extends WrappedSuiteTestDecorator implements MessageListener +{ + private static final Logger log = Logger.getLogger(ListeningTestDecorator.class); + + /** Holds the contact information for all test clients that are available and that may take part in the test. */ + Set allClients; + + /** Holds the conversation helper for the control level conversation for coordinating the test through. */ + ConversationFactory conversationFactory; + + /** Holds the connection that the control conversation is held over. */ + Connection connection; + + /** Holds the underlying {@link CoordinatingTestCase}s that this decorator wraps. */ + WrappedSuiteTestDecorator testSuite; + + /** Hold the current running test case. */ + CoordinatingTestCase _currentTest = null; + + /** + * Creates a wrapped suite test decorator from another one. + * + * @param suite The test suite. + * @param availableClients The list of all clients that responded to the compulsory invite. + * @param controlConversation The conversation helper for the control level, test coordination conversation. + * @param controlConnection The connection that the coordination messages are sent over. + */ + public ListeningTestDecorator(WrappedSuiteTestDecorator suite, Set availableClients, + ConversationFactory controlConversation, Connection controlConnection) + { + super(suite); + + log.debug("public InvitingTestDecorator(WrappedSuiteTestDecorator suite, Set allClients = " + + availableClients + ", ConversationHelper controlConversation = " + controlConversation + "): called"); + + testSuite = suite; + allClients = availableClients; + conversationFactory = controlConversation; + connection = controlConnection; + } + + /** + * Broadcasts a test invitation and accetps enlisting from participating clients. The wrapped test case is then + * repeated for every combination of test clients (provided the wrapped test case extends {@link + * CoordinatingTestCase}. + * + *

Any JMSExceptions during the invite/enlist conversation will be allowed to fall through as runtime + * exceptions, resulting in the non-completion of the test run. + * + * @param testResult The the results object to monitor the test results with. + * + * @todo Better error recovery for failure of the invite/enlist conversation could be added. + */ + public void run(TestResult testResult) + { + log.debug("public void run(TestResult testResult): called"); + + Collection tests = testSuite.getAllUnderlyingTests(); + + for (Test test : tests) + { + CoordinatingTestCase coordTest = (CoordinatingTestCase) test; + + Set enlists = signupClients(coordTest); + + if (enlists.size() == 0) + { + throw new RuntimeException("No clients to test with"); + } + + Iterator clients = enlists.iterator(); + coordTest.setSender(clients.next()); + + while (clients.hasNext()) + { + // Set the sending and receiving client details on the test case. + coordTest.setReceiver(clients.next()); + } + + // Pass down the connection to hold the coordination conversation over. + coordTest.setConversationFactory(conversationFactory); + + + if (coordTest instanceof ListeningCoordinatorTest) + { + _currentTest = coordTest; + } + // Execute the test case. + coordTest.run(testResult); + + _currentTest = null; + } + } + + private Set signupClients(CoordinatingTestCase coordTest) + { + // Broadcast the invitation to find out what clients are available to test. + Set enlists; + try + { + Message invite = conversationFactory.getSession().createMessage(); + Destination controlTopic = conversationFactory.getSession().createTopic("iop.control"); + ConversationFactory.Conversation conversation = conversationFactory.startConversation(); + + invite.setStringProperty("CONTROL_TYPE", "INVITE"); + invite.setStringProperty("TEST_NAME", coordTest.getTestCaseNameForTestMethod(coordTest.getName())); + + conversation.send(controlTopic, invite); + + // Wait for a short time, to give test clients an opportunity to reply to the invitation. + Collection replies = conversation.receiveAll(allClients.size(), 5000); + + log.debug("Received " + replies.size() + " enlist replies"); + + enlists = Coordinator.extractEnlists(replies); + + //Create topic to listen on for latejoiners + Destination listenTopic = conversationFactory.getSession().createTopic("iop.control.test." + coordTest.getTestCaseNameForTestMethod(coordTest.getName())); + + //Listen for joiners + conversationFactory.getSession().createConsumer(listenTopic).setMessageListener(this); + log.debug("Created consumer on :" + listenTopic); + } + catch (JMSException e) + { + throw new RuntimeException("There was a JMSException during the invite/enlist conversation.", e); + } + + return enlists; + } + + /** + * Prints a string summarizing this test decorator, mainly for debugging purposes. + * + * @return String representation for debugging purposes. + */ + public String toString() + { + return "ListeningTestDecorator: [ testSuite = " + testSuite + " ]"; + } + + + public void onMessage(Message message) + { + try + { + if (message.getStringProperty("CONTROL_TYPE").equals("LATEJOIN")) + { + ((ListeningCoordinatorTest) _currentTest).latejoin(message); + } + } + catch (JMSException e) + { + log.debug("Unable to process message:" + message); + } + } +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/OptOutTestCase.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/OptOutTestCase.java new file mode 100644 index 0000000000..42a382a898 --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/OptOutTestCase.java @@ -0,0 +1,65 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.interop.coordinator; + +import junit.framework.Assert; + +/** + * An OptOutTestCase is a test case that automatically fails. It is used when a list of test clients has been generated + * from a compulsory invite, but only some of those clients have responded to a specific test case invite. The clients + * that did not respond, are automatically given a fail for the test. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Fail the test with a suitable reason. + *
+ */ +public class OptOutTestCase extends CoordinatingTestCase +{ + /** + * Creates a new coordinating test case with the specified name. + * + * @param name The test case name. + */ + public OptOutTestCase(String name) + { + super(name); + } + + /** Generates an appropriate test failure assertion. */ + public void testOptOut() + { + Assert.fail("One of " + getSender() + " and " + getReceiver() + " opted out of the test."); + } + + /** + * Should provide a translation from the junit method name of a test to its test case name as defined in the + * interop testing specification. For example the method "testP2P" might map onto the interop test case name + * "TC2_BasicP2P". + * + * @param methodName The name of the JUnit test method. + * @return The name of the corresponding interop test case. + */ + public String getTestCaseNameForTestMethod(String methodName) + { + return "OptOutTest"; + } +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/TestClientDetails.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/TestClientDetails.java new file mode 100644 index 0000000000..c4a9d39cd8 --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/TestClientDetails.java @@ -0,0 +1,87 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.interop.coordinator; + +/** + *

+ *
CRC Card
Responsibilities Collaborations + *
+ */ +public class TestClientDetails +{ + /** The test clients name. */ + public String clientName; + + /* The test clients unique sequence number. Not currently used. */ + + /** The routing key of the test clients control topic. */ + public String privateControlKey; + + /** + * Two TestClientDetails are considered to be equal, iff they have the same client name. + * + * @param o The object to compare to. + * + * @return If the object to compare to is a TestClientDetails equal to this one, false otherwise. + */ + public boolean equals(Object o) + { + if (this == o) + { + return true; + } + + if (!(o instanceof TestClientDetails)) + { + return false; + } + + final TestClientDetails testClientDetails = (TestClientDetails) o; + + if ((clientName != null) ? (!clientName.equals(testClientDetails.clientName)) + : (testClientDetails.clientName != null)) + { + return false; + } + + return true; + } + + /** + * Computes a hash code compatible with the equals method; based on the client name alone. + * + * @return A hash code for this. + */ + public int hashCode() + { + return ((clientName != null) ? clientName.hashCode() : 0); + } + + /** + * Outputs the client name and address details. Mostly used for debugging purposes. + * + * @return The client name and address. + */ + public String toString() + { + return "TestClientDetails: [ clientName = " + clientName + ", privateControlKey = " + privateControlKey + " ]"; + } +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/XMLTestListener.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/XMLTestListener.java new file mode 100644 index 0000000000..747ba0dd0b --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/XMLTestListener.java @@ -0,0 +1,402 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.interop.coordinator; + +import java.io.IOException; +import java.io.PrintWriter; +import java.io.Writer; +import java.util.*; + +import junit.framework.AssertionFailedError; +import junit.framework.Test; +import junit.framework.TestCase; + +import org.apache.log4j.Logger; + +import uk.co.thebadgerset.junit.extensions.listeners.TKTestListener; + +/** + * Listens for test results for a named test and outputs these in the standard JUnit XML format to the specified + * writer. + * + *

The API for this listener accepts notifications about different aspects of a tests results through different + * methods, so some assumption needs to be made as to which test result a notification refers to. For example + * {@link #startTest} will be called, then possibly {@link #timing} will be called, even though the test instance is + * passed in both cases, it is not enough to distinguish a particular run of the test, as the test case instance may + * be being shared between multiple threads, or being run a repeated number of times, and can therfore be re-used + * between calls. The listeners make the assumption that, for every test, a unique thread will call {@link #startTest} + * and {@link #endTest} to delimit each test. All calls to set test parameters, timings, state and so on, will occur + * between the start and end and will be given with the same thread id as the start and end, so the thread id provides + * a unqiue value to identify a particular test run against. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
+ * + * @todo Merge this class with CSV test listener, making the collection of results common to both, and only factoring + * out the results printing code into sub-classes. Provide a simple XML results formatter with the same format as + * the ant XML formatter, and a more structured one for outputing results with timings and summaries from + * performance tests. + */ +public class XMLTestListener implements TKTestListener +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(XMLTestListener.class); + + /** The results file writer. */ + protected Writer writer; + + /** Holds the results for individual tests. */ + // protected Map results = new LinkedHashMap(); + // protected List results = new ArrayList(); + + /** + * Map for holding results on a per thread basis as they come in. A ThreadLocal is not used as sometimes an + * explicit thread id must be used, where notifications come from different threads than the ones that called + * the test method. + */ + Map threadLocalResults = Collections.synchronizedMap(new LinkedHashMap()); + + /** + * Holds results for tests that have ended. Transferring these results here from the per-thread results map, means + * that the thread id is freed for the thread to generate more results. + */ + List results = new ArrayList(); + + /** Holds the overall error count. */ + protected int errors = 0; + + /** Holds the overall failure count. */ + protected int failures = 0; + + /** Holds the overall tests run count. */ + protected int runs = 0; + + /** Holds the name of the class that tests are being run for. */ + String testClassName; + + /** + * Creates a new XML results output listener that writes to the specified location. + * + * @param writer The location to write results to. + */ + public XMLTestListener(Writer writer, String testClassName) + { + log.debug("public XMLTestListener(Writer writer, String testClassName = " + testClassName + "): called"); + + this.writer = writer; + this.testClassName = testClassName; + } + + /** + * Resets the test results to the default state of time zero, memory usage zero, parameter zero, test passed. + * + * @param test The test to resest any results for. + * @param threadId Optional thread id if not calling from thread that started the test method. May be null. + */ + public void reset(Test test, Long threadId) + { + log.debug("public void reset(Test test = " + test + ", Long threadId = " + threadId + "): called"); + + XMLTestListener.Result r = + (threadId == null) ? threadLocalResults.get(Thread.currentThread().getId()) : threadLocalResults.get(threadId); + + r.error = null; + r.failure = null; + + } + + /** + * A test started. + */ + public void startTest(Test test) + { + log.debug("public void startTest(Test test = " + test + "): called"); + + Result newResult = new Result(test.getClass().getName(), ((TestCase) test).getName()); + + // Initialize the thread local test results. + threadLocalResults.put(Thread.currentThread().getId(), newResult); + runs++; + } + + /** + * Should be called every time a test completes with the run time of that test. + * + * @param test The name of the test. + * @param nanos The run time of the test in nanoseconds. + * @param threadId Optional thread id if not calling from thread that started the test method. May be null. + */ + public void timing(Test test, long nanos, Long threadId) + { } + + /** + * Should be called every time a test completed with the amount of memory used before and after the test was run. + * + * @param test The test which memory was measured for. + * @param memStart The total JVM memory used before the test was run. + * @param memEnd The total JVM memory used after the test was run. + * @param threadId Optional thread id if not calling from thread that started the test method. May be null. + */ + public void memoryUsed(Test test, long memStart, long memEnd, Long threadId) + { } + + /** + * Should be called every time a parameterized test completed with the int value of its test parameter. + * + * @param test The test which memory was measured for. + * @param parameter The int parameter value. + * @param threadId Optional thread id if not calling from thread that started the test method. May be null. + */ + public void parameterValue(Test test, int parameter, Long threadId) + { } + + /** + * Should be called every time a test completes with the current number of test threads running. + * + * @param test The test for which the measurement is being generated. + * @param threads The number of tests being run concurrently. + * @param threadId Optional thread id if not calling from thread that started the test method. May be null. + */ + public void concurrencyLevel(Test test, int threads, Long threadId) + { } + + /** + * Notifies listeners of the tests read/set properties. + * + * @param properties The tests read/set properties. + */ + public void properties(Properties properties) + { } + + /** + * A test ended. + */ + public void endTest(Test test) + { + log.debug("public void endTest(Test test = " + test + "): called"); + + // Move complete test results into the completed tests list. + Result r = threadLocalResults.get(Thread.currentThread().getId()); + results.add(r); + + // Clear all the test results for the thread. + threadLocalResults.remove(Thread.currentThread().getId()); + } + + /** + * Called when a test completes. Success, failure and errors. This method should be used when registering an + * end test from a different thread than the one that started the test. + * + * @param test The test which completed. + * @param threadId Optional thread id if not calling from thread that started the test method. May be null. + */ + public void endTest(Test test, Long threadId) + { + log.debug("public void endTest(Test test = " + test + ", Long threadId = " + threadId + "): called"); + + // Move complete test results into the completed tests list. + Result r = + (threadId == null) ? threadLocalResults.get(Thread.currentThread().getId()) : threadLocalResults.get(threadId); + results.add(r); + + // Clear all the test results for the thread. + threadLocalResults.remove(Thread.currentThread().getId()); + } + + /** + * An error occurred. + */ + public void addError(Test test, Throwable t) + { + log.debug("public void addError(Test test = " + test + ", Throwable t = " + t + "): called"); + + Result r = threadLocalResults.get(Thread.currentThread().getId()); + r.error = t; + errors++; + } + + /** + * A failure occurred. + */ + public void addFailure(Test test, AssertionFailedError t) + { + log.debug("public void addFailure(Test test = " + test + ", AssertionFailedError t = " + t + "): called"); + + Result r = threadLocalResults.get(Thread.currentThread().getId()); + r.failure = t; + failures++; + } + + /** + * Called when a test completes to mark it as a test fail. This method should be used when registering a + * failure from a different thread than the one that started the test. + * + * @param test The test which failed. + * @param e The assertion that failed the test. + * @param threadId Optional thread id if not calling from thread that started the test method. May be null. + */ + public void addFailure(Test test, AssertionFailedError e, Long threadId) + { + log.debug("public void addFailure(Test test, AssertionFailedError e, Long threadId): called"); + + Result r = + (threadId == null) ? threadLocalResults.get(Thread.currentThread().getId()) : threadLocalResults.get(threadId); + r.failure = e; + failures++; + } + + /** + * Notifies listeners of the start of a complete run of tests. + */ + public void startBatch() + { + log.debug("public void startBatch(): called"); + + // Reset all results counts. + threadLocalResults = Collections.synchronizedMap(new HashMap()); + errors = 0; + failures = 0; + runs = 0; + + // Write out the file header. + try + { + writer.write("\n"); + } + catch (IOException e) + { + throw new RuntimeException("Unable to write the test results.", e); + } + } + + /** + * Notifies listeners of the end of a complete run of tests. + * + * @param parameters The optional test parameters to log out with the batch results. + */ + public void endBatch(Properties parameters) + { + log.debug("public void endBatch(Properties parameters = " + parameters + "): called"); + + // Write out the results. + try + { + // writer.write("\n"); + writer.write("\n"); + + for (Result result : results) + { + writer.write(" \n"); + + if (result.error != null) + { + writer.write(" "); + result.error.printStackTrace(new PrintWriter(writer)); + writer.write(" "); + } + else if (result.failure != null) + { + writer.write(" "); + result.failure.printStackTrace(new PrintWriter(writer)); + writer.write(" "); + } + + writer.write(" \n"); + } + + writer.write("\n"); + writer.flush(); + } + catch (IOException e) + { + throw new RuntimeException("Unable to write the test results.", e); + } + } + + /** + * Used to capture the results of a particular test run. + */ + protected static class Result + { + public Result(String testClass, String testName) + { + this.testClass = testClass; + this.testName = testName; + } + + public String testClass; + public String testName; + + /** Holds the exception that caused error in this test. */ + public Throwable error; + + /** Holds the assertion exception that caused failure in this test. */ + public AssertionFailedError failure; + + /** Holds the error count for this test. */ + // public int errors = 0; + + /** Holds the failure count for this tests. */ + // public int failures = 0; + + /** Holds the overall tests run count for this test. */ + // public int runs = 0; + + /*public boolean equals(Object o) + { + if (this == o) + { + return true; + } + + if (!(o instanceof Result)) + { + return false; + } + + final Result result = (Result) o; + + if ((testClass != null) ? (!testClass.equals(result.testClass)) : (result.testClass != null)) + { + return false; + } + + if ((testName != null) ? (!testName.equals(result.testName)) : (result.testName != null)) + { + return false; + } + + return true; + } + + public int hashCode() + { + int result; + result = ((testClass != null) ? testClass.hashCode() : 0); + result = (29 * result) + ((testName != null) ? testName.hashCode() : 0); + + return result; + }*/ + } +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase1DummyRun.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase1DummyRun.java new file mode 100644 index 0000000000..e642ef792b --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase1DummyRun.java @@ -0,0 +1,85 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.interop.coordinator.testcases; + +import java.util.HashMap; +import java.util.Map; + +import javax.jms.Message; + +import junit.framework.Assert; + +import org.apache.log4j.Logger; + +import org.apache.qpid.interop.coordinator.CoordinatingTestCase; + +/** + *

+ *
CRC Card
Responsibilities Collaborations + *
Exercises the interop testing framework without actually sending any test messages. + * {@link org.apache.qpid.interop.coordinator.CoordinatingTestCase} + *
+ */ +public class CoordinatingTestCase1DummyRun extends CoordinatingTestCase +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(CoordinatingTestCase1DummyRun.class); + + /** + * Creates a new coordinating test case with the specified name. + * + * @param name The test case name. + */ + public CoordinatingTestCase1DummyRun(String name) + { + super(name); + } + + /** + * Performs the basic P2P test case, "Test Case 2" in the specification. + */ + public void testDummyRun() throws Exception + { + log.debug("public void testDummyRun(): called"); + + Map testConfig = new HashMap(); + testConfig.put("TEST_NAME", "TC1_DummyRun"); + + Message[] reports = sequenceTest(testConfig); + + // Compare sender and receiver reports. + Assert.assertEquals("Expected to get 2 dummy reports.", 2, reports.length); + } + + /** + * Should provide a translation from the junit method name of a test to its test case name as defined in the + * interop testing specification. For example the method "testP2P" might map onto the interop test case name + * "TC2_BasicP2P". + * + * @param methodName The name of the JUnit test method. + * @return The name of the corresponding interop test case. + */ + public String getTestCaseNameForTestMethod(String methodName) + { + return "TC1_DummyRun"; + } +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase2BasicP2P.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase2BasicP2P.java new file mode 100644 index 0000000000..b1b2d9f847 --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase2BasicP2P.java @@ -0,0 +1,90 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.interop.coordinator.testcases; + +import java.util.HashMap; +import java.util.Map; + +import javax.jms.Message; + +import junit.framework.Assert; + +import org.apache.log4j.Logger; + +import org.apache.qpid.interop.coordinator.CoordinatingTestCase; + +/** + *

+ *
CRC Card
Responsibilities Collaborations + *
Setup p2p test parameters and compare with test output. {@link CoordinatingTestCase} + *
+ */ +public class CoordinatingTestCase2BasicP2P extends CoordinatingTestCase +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(CoordinatingTestCase2BasicP2P.class); + + /** + * Creates a new coordinating test case with the specified name. + * + * @param name The test case name. + */ + public CoordinatingTestCase2BasicP2P(String name) + { + super(name); + } + + /** + * Performs the basic P2P test case, "Test Case 2" in the specification. + */ + public void testBasicP2P() throws Exception + { + log.debug("public void testBasicP2P(): called"); + + Map testConfig = new HashMap(); + testConfig.put("TEST_NAME", "TC2_BasicP2P"); + testConfig.put("P2P_QUEUE_AND_KEY_NAME", "tc2queue"); + testConfig.put("P2P_NUM_MESSAGES", 50); + + Message[] reports = sequenceTest(testConfig); + + // Compare sender and receiver reports. + int messagesSent = reports[0].getIntProperty("MESSAGE_COUNT"); + int messagesReceived = reports[1].getIntProperty("MESSAGE_COUNT"); + + Assert.assertEquals("The requested number of messages were not sent.", 50, messagesSent); + Assert.assertEquals("Sender and receiver messages sent did not match up.", messagesSent, messagesReceived); + } + + /** + * Should provide a translation from the junit method name of a test to its test case name as defined in the + * interop testing specification. For example the method "testP2P" might map onto the interop test case name + * "TC2_BasicP2P". + * + * @param methodName The name of the JUnit test method. + * @return The name of the corresponding interop test case. + */ + public String getTestCaseNameForTestMethod(String methodName) + { + return "TC2_BasicP2P"; + } +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase3BasicPubSub.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase3BasicPubSub.java new file mode 100644 index 0000000000..702c240e9a --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/CoordinatingTestCase3BasicPubSub.java @@ -0,0 +1,92 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.interop.coordinator.testcases; + +import java.util.HashMap; +import java.util.Map; + +import javax.jms.Message; + +import junit.framework.Assert; + +import org.apache.log4j.Logger; + +import org.apache.qpid.interop.coordinator.CoordinatingTestCase; + +/** + *

+ *
CRC Card
Responsibilities Collaborations + *
Setup pub/sub test parameters and compare with test output. {@link CoordinatingTestCase} + *
+ */ +public class CoordinatingTestCase3BasicPubSub extends CoordinatingTestCase +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(CoordinatingTestCase3BasicPubSub.class); + + /** + * Creates a new coordinating test case with the specified name. + * + * @param name The test case name. + */ + public CoordinatingTestCase3BasicPubSub(String name) + { + super(name); + } + + /** + * Performs the basic P2P test case, "Test Case 2" in the specification. + */ + public void testBasicPubSub() throws Exception + { + log.debug("public void testBasicPubSub(): called"); + + Map testConfig = new HashMap(); + testConfig.put("TEST_NAME", "TC3_BasicPubSub"); + testConfig.put("PUBSUB_KEY", "tc3route"); + testConfig.put("PUBSUB_NUM_MESSAGES", 10); + testConfig.put("PUBSUB_NUM_RECEIVERS", 5); + + Message[] reports = sequenceTest(testConfig); + + // Compare sender and receiver reports. + int messagesSent = reports[0].getIntProperty("MESSAGE_COUNT"); + int messagesReceived = reports[1].getIntProperty("MESSAGE_COUNT"); + + Assert.assertEquals("The requested number of messages were not sent.", 10, messagesSent); + Assert.assertEquals("Received messages did not match up to num sent * num receivers.", messagesSent * 5, + messagesReceived); + } + + /** + * Should provide a translation from the junit method name of a test to its test case name as defined in the + * interop testing specification. For example the method "testP2P" might map onto the interop test case name + * "TC2_BasicP2P". + * + * @param methodName The name of the JUnit test method. + * @return The name of the corresponding interop test case. + */ + public String getTestCaseNameForTestMethod(String methodName) + { + return "TC3_BasicPubSub"; + } +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/old/Listener.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/old/Listener.java new file mode 100644 index 0000000000..5545f8d2dc --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/old/Listener.java @@ -0,0 +1,291 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.interop.old; + +import java.util.Random; + +import javax.jms.*; + +import org.apache.log4j.Logger; +import org.apache.log4j.NDC; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.url.URLSyntaxException; + +/** + * Listener implements the listening end of the Qpid interop tests. It is capable of being run as a standalone listener + * that responds to the test messages send by the publishing end of the tests implemented by {@link org.apache.qpid.interop.old.Publisher}. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Count messages received on a topic. {@link org.apache.qpid.interop.old.Publisher} + *
Send reports on messages received, when requested to. {@link org.apache.qpid.interop.old.Publisher} + *
Shutdown, when requested to. {@link org.apache.qpid.interop.old.Publisher} + *
+ * + * @todo This doesn't implement the interop test spec yet. Its a port of the old topic tests but has been adapted with + * interop spec in mind. + * + * @todo I've added lots of field table types in the report message, just to check if the other end can decode them + * correctly. Not really the right place to test this, so remove them from {@link #sendReport()} once a better + * test exists. + */ +public class Listener implements MessageListener +{ + private static Logger log = Logger.getLogger(Listener.class); + + /** The default AMQ connection URL to use for tests. */ + public static final String DEFAULT_URI = "amqp://guest:guest@default/test?brokerlist='tcp://localhost:5672'"; + + /** Holds the name of (routing key for) the topic to receive test messages on. */ + public static final String CONTROL_TOPIC = "topic_control"; + + /** Holds the name of (routing key for) the queue to send reports to. */ + public static final String RESPONSE_QUEUE = "response"; + + /** Holds the JMS Topic to receive test messages on. */ + private final Topic _topic; + + /** Holds the JMS Queue to send reports to. */ + private final Queue _response; + + /** Holds the connection to listen on. */ + private final Connection _connection; + + /** Holds the producer to send control messages on. */ + private final MessageProducer _controller; + + /** Holds the JMS session. */ + private final javax.jms.Session _session; + + /** Holds a flag to indicate that a timer has begun on the first message. Reset when report is sent. */ + private boolean init; + + /** Holds the count of messages received by this listener. */ + private int count; + + /** Used to hold the start time of the first message. */ + private long start; + + /** + * Creates a topic listener using the specified broker URL. + * + * @param connectionUrl The broker URL to listen on. + * + * @throws AMQException If the broker connection cannot be established. + * @throws URLSyntaxException If the broker URL syntax is not correct. + * @throws JMSException Any underlying JMSException is allowed to fall through. + */ + Listener(String connectionUrl) throws AMQException, JMSException, URLSyntaxException + { + log.debug("Listener(String connectionUrl = " + connectionUrl + "): called"); + + // Create a connection to the broker. + _connection = new AMQConnection(connectionUrl); + + // Establish a session on the broker. + _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Set up the destinations to listen for test and control messages on. + _topic = _session.createTopic(CONTROL_TOPIC); + _response = _session.createQueue(RESPONSE_QUEUE); + + // Set this listener up to listen for incoming messages on the test topic. + _session.createConsumer(_topic).setMessageListener(this); + + // Set up this listener with a producer to send the reports on. + _controller = _session.createProducer(_response); + + _connection.start(); + System.out.println("Waiting for messages..."); + } + + /** + * Starts a test subscriber. The broker URL must be specified as the first command line argument. + * + * @param argv The command line arguments, ignored. + * + * @todo Add command line arguments to configure all aspects of the test. + */ + public static void main(String[] argv) + { + try + { + new Listener(DEFAULT_URI); + } + catch (Exception e) + { + e.printStackTrace(); + } + } + + /** + * Handles all message received by this listener. Test messages are counted, report messages result in a report being sent and + * shutdown messages result in this listener being terminated. + * + * @param message The received message. + */ + public void onMessage(Message message) + { + log.debug("public void onMessage(Message message = " + message + "): called"); + + // Take the start time of the first message if this is the first message. + if (!init) + { + start = System.nanoTime() / 1000000; + count = 0; + init = true; + } + + try + { + // Check if the message is a control message telling this listener to shut down. + if (isShutdown(message)) + { + log.debug("Got a shutdown message."); + shutdown(); + } + // Check if the message is a report request message asking this listener to respond with the message count. + else if (isReport(message)) + { + log.debug("Got a report request message."); + + // Send the message count report. + sendReport(); + + // Reset the initialization flag so that the next message is considered to be the first. + init = false; + } + // Otherwise it is an ordinary test message, so increment the message count. + else + { + count++; + } + } + catch (JMSException e) + { + log.warn("There was a JMSException during onMessage.", e); + } + } + + /** + * Checks a message to see if it is a termination request control message. + * + * @param m The message to check. + * + * @return true if it is a termination request control message, false otherwise. + * + * @throws JMSException Any underlying JMSException is allowed to fall through. + */ + boolean isShutdown(Message m) throws JMSException + { + boolean result = checkTextField(m, "TYPE", "TERMINATION_REQUEST"); + + return result; + } + + /** + * Checks a message to see if it is a report request control message. + * + * @param m The message to check. + * + * @return true if it is a report request control message, false otherwise. + * + * @throws JMSException Any underlying JMSException is allowed to fall through. + */ + boolean isReport(Message m) throws JMSException + { + boolean result = checkTextField(m, "TYPE", "REPORT_REQUEST"); + + return result; + } + + /** + * Checks whether or not a text field on a message has the specified value. + * + * @param m The message to check. + * @param fieldName The name of the field to check. + * @param value The expected value of the field to compare with. + * + * @return trueIf the specified field has the specified value, fals otherwise. + * + * @throws JMSException Any JMSExceptions are allowed to fall through. + */ + private static boolean checkTextField(Message m, String fieldName, String value) throws JMSException + { + //log.debug("private static boolean checkTextField(Message m = " + m + ", String fieldName = " + fieldName + // + ", String value = " + value + "): called"); + + String comp = m.getStringProperty(fieldName); + //log.debug("comp = " + comp); + + boolean result = (comp != null) && comp.equals(value); + //log.debug("result = " + result); + + return result; + } + + /** + * Closes down the connection to the broker. + * + * @throws JMSException Any underlying JMSException is allowed to fall through. + */ + private void shutdown() throws JMSException + { + _session.close(); + _connection.stop(); + _connection.close(); + } + + /** + * Send the report message to the response queue. + * + * @throws JMSException Any underlying JMSException is allowed to fall through. + */ + private void sendReport() throws JMSException + { + log.debug("private void report(): called"); + + // Create the report message. + long time = ((System.nanoTime() / 1000000) - start); + String msg = "Received " + count + " in " + time + "ms"; + Message message = _session.createTextMessage(msg); + + // Shove some more field table types in the message just to see if the other end can handle it. + message.setBooleanProperty("BOOLEAN", true); + //message.setByteProperty("BYTE", (byte) 5); + message.setDoubleProperty("DOUBLE", Math.PI); + message.setFloatProperty("FLOAT", 1.0f); + message.setIntProperty("INT", 1); + message.setShortProperty("SHORT", (short) 1); + message.setLongProperty("LONG", (long) 1827361278); + message.setStringProperty("STRING", "hello"); + + // Send the report message. + _controller.send(message); + log.debug("Sent report: " + msg); + } +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/old/Publisher.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/old/Publisher.java new file mode 100644 index 0000000000..f3a545f580 --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/old/Publisher.java @@ -0,0 +1,244 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.interop.old; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.*; + +import org.apache.log4j.Logger; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.url.URLSyntaxException; + +/** + * Publisher is the sending end of Qpid interop tests. It is capable of being run as a standalone publisher + * that sends test messages to the listening end of the tests implemented by {@link org.apache.qpid.interop.old.Listener}. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
+ * + * @todo This doesn't implement the interop test spec yet. Its a port of the old topic tests but has been adapted with + * interop spec in mind. + * + * @todo I've added lots of field table types in the report request message, just to check if the other end can decode + * them correctly. Not really the right place to test this, so remove them from {@link #doTest()} once a better + * test exists. + */ +public class Publisher implements MessageListener +{ + private static Logger log = Logger.getLogger(Publisher.class); + + /** The default AMQ connection URL to use for tests. */ + public static final String DEFAULT_URI = "amqp://guest:guest@default/test?brokerlist='tcp://localhost:5672'"; + + /** Holds the default test timeout for broker communications before tests give up. */ + public static final int TIMEOUT = 3000; + + /** Holds the routing key for the topic to send test messages on. */ + public static final String CONTROL_TOPIC = "topic_control"; + + /** Holds the routing key for the queue to receive reports on. */ + public static final String RESPONSE_QUEUE = "response"; + + /** Holds the JMS Topic to send test messages on. */ + private final Topic _topic; + + /** Holds the JMS Queue to receive reports on. */ + private final Queue _response; + + /** Holds the number of messages to send in each test run. */ + private int numMessages; + + /** A monitor used to wait for all reports to arrive back from consumers on. */ + private CountDownLatch allReportsReceivedEvt; + + /** Holds the connection to listen on. */ + private Connection _connection; + + /** Holds the channel for all test messages.*/ + private Session _session; + + /** Holds the producer to send test messages on. */ + private MessageProducer publisher; + + /** + * Creates a topic publisher that will send the specifed number of messages and expect the specifed number of report back from test + * subscribers. + * + * @param connectionUri The broker URL. + * @param numMessages The number of messages to send in each test. + * @param numSubscribers The number of subscribes that are expected to reply with a report. + */ + Publisher(String connectionUri, int numMessages, int numSubscribers) + throws AMQException, JMSException, URLSyntaxException + { + log.debug("Publisher(String connectionUri = " + connectionUri + ", int numMessages = " + numMessages + + ", int numSubscribers = " + numSubscribers + "): called"); + + // Create a connection to the broker. + _connection = new AMQConnection(connectionUri); + + // Establish a session on the broker. + _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Set up the destinations to send test messages and listen for reports on. + _topic = _session.createTopic(CONTROL_TOPIC); + _response = _session.createQueue(RESPONSE_QUEUE); + + // Set this listener up to listen for reports on the response queue. + _session.createConsumer(_response).setMessageListener(this); + + // Set up this listener with a producer to send the test messages and report requests on. + publisher = _session.createProducer(_topic); + + // Keep the test parameters. + this.numMessages = numMessages; + + // Set up a countdown to count all subscribers sending their reports. + allReportsReceivedEvt = new CountDownLatch(numSubscribers); + + _connection.start(); + System.out.println("Sending messages and waiting for reports..."); + } + + /** + * Start a test publisher. The broker URL must be specified as the first command line argument. + * + * @param argv The command line arguments, ignored. + * + * @todo Add command line arguments to configure all aspects of the test. + */ + public static void main(String[] argv) + { + try + { + // Create an instance of this publisher with the command line parameters. + Publisher publisher = new Publisher(DEFAULT_URI, 1, 1); + + // Publish the test messages. + publisher.doTest(); + } + catch (Exception e) + { + e.printStackTrace(); + } + } + + /** + * Sends the test messages and waits for all subscribers to reply with a report. + * + * @throws JMSException Any underlying JMSException is allowed to fall through. + */ + public void doTest() throws JMSException + { + log.debug("public void DoTest(): called"); + + // Create a test message to send. + Message testMessage = _session.createTextMessage("test"); + + // Send the desired number of test messages. + for (int i = 0; i < numMessages; i++) + { + publisher.send(testMessage); + } + + log.debug("Sent " + numMessages + " test messages."); + + // Send the report request. + Message reportRequestMessage = _session.createTextMessage("Report request message."); + reportRequestMessage.setStringProperty("TYPE", "REPORT_REQUEST"); + + reportRequestMessage.setBooleanProperty("BOOLEAN", false); + //reportRequestMessage.Headers.SetByte("BYTE", 5); + reportRequestMessage.setDoubleProperty("DOUBLE", 3.141); + reportRequestMessage.setFloatProperty("FLOAT", 1.0f); + reportRequestMessage.setIntProperty("INT", 1); + reportRequestMessage.setLongProperty("LONG", 1); + reportRequestMessage.setStringProperty("STRING", "hello"); + reportRequestMessage.setShortProperty("SHORT", (short) 2); + + publisher.send(reportRequestMessage); + + log.debug("Sent the report request message, waiting for all replies..."); + + // Wait until all the reports come in. + try + { + allReportsReceivedEvt.await(TIMEOUT, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { } + + // Check if all reports were really received or if the timeout occurred. + if (allReportsReceivedEvt.getCount() == 0) + { + log.debug("Got all reports."); + } + else + { + log.debug("Waiting for reports timed out, still waiting for " + allReportsReceivedEvt.getCount() + "."); + } + + // Send the termination request. + Message terminationRequestMessage = _session.createTextMessage("Termination request message."); + terminationRequestMessage.setStringProperty("TYPE", "TERMINATION_REQUEST"); + publisher.send(terminationRequestMessage); + + log.debug("Sent the termination request message."); + + // Close all message producers and consumers and the connection to the broker. + shutdown(); + } + + /** + * Handles all report messages from subscribers. This decrements the count of subscribers that are still to reply, until this becomes + * zero, at which time waiting threads are notified of this event. + * + * @param message The received report message. + */ + public void onMessage(Message message) + { + log.debug("public void OnMessage(Message message = " + message + "): called"); + + // Decrement the count of expected messages and release the wait monitor when this becomes zero. + allReportsReceivedEvt.countDown(); + + if (allReportsReceivedEvt.getCount() == 0) + { + log.debug("Got reports from all subscribers."); + } + } + + /** + * Stops the message consumers and closes the connection. + * + * @throws JMSException Any underlying JMSException is allowed to fall through. + */ + private void shutdown() throws JMSException + { + _session.close(); + _connection.close(); + } +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/CircuitTestCase.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/CircuitTestCase.java deleted file mode 100644 index 966a545e16..0000000000 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/CircuitTestCase.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.interop.testcases; - -import org.apache.log4j.Logger; - -import org.apache.qpid.test.framework.sequencers.TestCaseSequencer; -import org.apache.qpid.test.framework.Circuit; -import org.apache.qpid.test.framework.FrameworkBaseCase; -import org.apache.qpid.test.framework.MessagingTestConfigProperties; - -import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; -import uk.co.thebadgerset.junit.extensions.util.TestContextProperties; - -/** - * CircuitTestCase runs a test over a {@link Circuit} controlled by the test parameters. - * - *

- *
CRC Card
Responsibilities Collaborations - *
- *
- * - * @todo When working with test context properties, add overrides to defaults to the singleton instance, but when taking - * a starting point to add specific test case parameters to, take a copy. Use the copy with test case specifics - * to control the test. - */ -public class CircuitTestCase extends FrameworkBaseCase -{ - /** Used for debugging. */ - private static final Logger log = Logger.getLogger(CircuitTestCase.class); - - /** - * Creates a new test case with the specified name. - * - * @param name The test case name. - */ - public CircuitTestCase(String name) - { - super(name); - } - - /** - * Performs the a basic P2P test case. - * - * @throws Exception Any exceptions are allowed to fall through and fail the test. - */ - public void testBasicP2P() throws Exception - { - log.debug("public void testBasicP2P(): called"); - - // Get the test parameters, any overrides on the command line will have been applied. - ParsedProperties testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults); - - // Customize the test parameters. - testProps.setProperty("TEST_NAME", "DEFAULT_CIRCUIT_TEST"); - testProps.setProperty(MessagingTestConfigProperties.SEND_DESTINATION_NAME_ROOT_PROPNAME, "testqueue"); - - // Get the test sequencer to create test circuits and run the standard test procedure through. - TestCaseSequencer sequencer = getTestSequencer(); - - // Send the test messages, and check that there were no errors doing so. - Circuit testCircuit = sequencer.createCircuit(testProps); - sequencer.sequenceTest(testCircuit, assertionList(testCircuit.getPublisher().noExceptionsAssertion()), testProps); - - // Check that all of the message were sent. - // Check that the receiving end got the same number of messages as the publishing end. - } - - /** - * Should provide a translation from the junit method name of a test to its test case name as known to the test - * clients that will run the test. The purpose of this is to convert the JUnit method name into the correct test - * case name to place into the test invite. For example the method "testP2P" might map onto the interop test case - * name "TC2_BasicP2P". - * - * @param methodName The name of the JUnit test method. - * - * @return The name of the corresponding interop test case. - */ - public String getTestCaseNameForTestMethod(String methodName) - { - return "DEFAULT_CIRCUIT_TEST"; - } -} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/InteropTestCase1DummyRun.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/InteropTestCase1DummyRun.java deleted file mode 100644 index 73e08b578e..0000000000 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/InteropTestCase1DummyRun.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.interop.testcases; - -import org.apache.log4j.Logger; - -import org.apache.qpid.test.framework.distributedtesting.DistributedTestCase; - -import java.util.Properties; - -/** - * Coordinates test case 1, from the interop test specification. This test connects up the sender and receivers roles, - * and gets some dummy test reports from them, in order to check that the test framework itself is operational. - * - *

- *
CRC Card
Responsibilities Collaborations - *
Exercises the interop testing framework without actually sending any test messages. - * {@link org.apache.qpid.test.framework.distributedtesting.DistributedTestCase} - *
- */ -public class InteropTestCase1DummyRun extends DistributedTestCase -{ - /** Used for debugging. */ - private static final Logger log = Logger.getLogger(InteropTestCase1DummyRun.class); - - /** - * Creates a new coordinating test case with the specified name. - * - * @param name The test case name. - */ - public InteropTestCase1DummyRun(String name) - { - super(name); - } - - /** - * Performs the basic P2P test case, "Test Case 2" in the specification. - * - * @throws Exception Any exceptions are allowed to fall through and fail the test. - */ - public void testDummyRun() throws Exception - { - log.debug("public void testDummyRun(): called"); - - Properties testConfig = new Properties(); - testConfig.put("TEST_NAME", "TC1_DummyRun"); - - /*Message[] reports =*/ getTestSequencer().sequenceTest(null, null, testConfig); - - // Compare sender and receivers reports. - // Assert.assertEquals("Expected to get 2 dummy reports.", 2, reports.length); - } - - /** - * Should provide a translation from the junit method name of a test to its test case name as defined in the - * interop testing specification. For example the method "testP2P" might map onto the interop test case name - * "TC2_BasicP2P". - * - * @param methodName The name of the JUnit test method. - * @return The name of the corresponding interop test case. - */ - public String getTestCaseNameForTestMethod(String methodName) - { - return "TC1_DummyRun"; - } -} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/InteropTestCase2BasicP2P.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/InteropTestCase2BasicP2P.java deleted file mode 100644 index f77bbf032f..0000000000 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/InteropTestCase2BasicP2P.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.interop.testcases; - -import org.apache.log4j.Logger; - -import org.apache.qpid.test.framework.distributedtesting.DistributedTestCase; - -import java.util.Properties; - -/** - * Implements test case 2, from the interop test specification. This test sets up the TC2_BasicP2P test for 50 - * messages. It checks that the sender and receivers reports both indicate that all the test messages were transmitted - * successfully. - * - *

- *
CRC Card
Responsibilities Collaborations - *
Setup p2p test parameters and compare with test output. {@link DistributedTestCase} - *
- */ -public class InteropTestCase2BasicP2P extends DistributedTestCase -{ - /** Used for debugging. */ - private static final Logger log = Logger.getLogger(InteropTestCase2BasicP2P.class); - - /** - * Creates a new coordinating test case with the specified name. - * - * @param name The test case name. - */ - public InteropTestCase2BasicP2P(String name) - { - super(name); - } - - /** - * Performs the basic P2P test case, "Test Case 2" in the specification. - * - * @throws Exception Any exceptions are allowed to fall through and fail the test. - */ - public void testBasicP2P() throws Exception - { - log.debug("public void testBasicP2P(): called"); - - Properties testConfig = new Properties(); - testConfig.setProperty("TEST_NAME", "TC2_BasicP2P"); - testConfig.setProperty("P2P_QUEUE_AND_KEY_NAME", "tc2queue"); - testConfig.put("P2P_NUM_MESSAGES", 50); - - /*Message[] reports =*/ getTestSequencer().sequenceTest(null, null, testConfig); - - // Compare sender and receivers reports. - /*int messagesSent = reports[0].getIntProperty("MESSAGE_COUNT"); - int messagesReceived = reports[1].getIntProperty("MESSAGE_COUNT"); - - Assert.assertEquals("The requested number of messages were not sent.", 50, messagesSent); - Assert.assertEquals("Sender and receivers messages sent did not match up.", messagesSent, messagesReceived);*/ - } - - /** - * Should provide a translation from the junit method name of a test to its test case name as defined in the - * interop testing specification. For example the method "testP2P" might map onto the interop test case name - * "TC2_BasicP2P". - * - * @param methodName The name of the JUnit test method. - * @return The name of the corresponding interop test case. - */ - public String getTestCaseNameForTestMethod(String methodName) - { - return "TC2_BasicP2P"; - } -} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/InteropTestCase3BasicPubSub.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/InteropTestCase3BasicPubSub.java deleted file mode 100644 index ad27ca63bd..0000000000 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/InteropTestCase3BasicPubSub.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.interop.testcases; - -import org.apache.log4j.Logger; - -import org.apache.qpid.test.framework.distributedtesting.DistributedTestCase; - -import java.util.Properties; - -/** - *

- *
CRC Card
Responsibilities Collaborations - *
Setup pub/sub test parameters and compare with test output. {@link DistributedTestCase} - *
- */ -public class InteropTestCase3BasicPubSub extends DistributedTestCase -{ - /** Used for debugging. */ - private static final Logger log = Logger.getLogger(InteropTestCase3BasicPubSub.class); - - /** - * Creates a new coordinating test case with the specified name. - * - * @param name The test case name. - */ - public InteropTestCase3BasicPubSub(String name) - { - super(name); - } - - /** - * Performs the basic P2P test case, "Test Case 2" in the specification. - * - * @throws Exception Any exceptions are allowed to fall through and fail the test. - */ - public void testBasicPubSub() throws Exception - { - log.debug("public void testBasicPubSub(): called"); - - Properties testConfig = new Properties(); - testConfig.put("TEST_NAME", "TC3_BasicPubSub"); - testConfig.put("PUBSUB_KEY", "tc3route"); - testConfig.put("PUBSUB_NUM_MESSAGES", 10); - testConfig.put("PUBSUB_NUM_RECEIVERS", 5); - - /*Message[] reports =*/ getTestSequencer().sequenceTest(null, null, testConfig); - - // Compare sender and receivers reports. - /*int messagesSent = reports[0].getIntProperty("MESSAGE_COUNT"); - int messagesReceived = reports[1].getIntProperty("MESSAGE_COUNT"); - - Assert.assertEquals("The requested number of messages were not sent.", 10, messagesSent); - Assert.assertEquals("Received messages did not match up to num sent * num receivers.", messagesSent * 5, - messagesReceived);*/ - } - - /** - * Should provide a translation from the junit method name of a test to its test case name as defined in the - * interop testing specification. For example the method "testP2P" might map onto the interop test case name - * "TC2_BasicP2P". - * - * @param methodName The name of the JUnit test method. - * @return The name of the corresponding interop test case. - */ - public String getTestCaseNameForTestMethod(String methodName) - { - return "TC3_BasicPubSub"; - } -} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java new file mode 100644 index 0000000000..37952d08c8 --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java @@ -0,0 +1,104 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.interop.testclient; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.Session; + +/** + * InteropClientTestCase provides an interface that classes implementing test cases from the interop testing spec + * (http://cwiki.apache.org/confluence/display/qpid/Interop+Testing+Specification) should implement. Implementations + * must be Java beans, that is, to provide a default constructor and to implement the {@link #getName} method. + * + *

+ *
CRC Card
Responsibilities + *
Supply the name of the test case that this implements. + *
Accept/Reject invites based on test parameters. + *
Adapt to assigned roles. + *
Perform test case actions. + *
Generate test reports. + *
+ */ +public interface InteropClientTestCase extends MessageListener +{ + /** Defines the possible test case roles that an interop test case can take on. */ + public enum Roles + { + SENDER, RECEIVER; + } + + /** + * Should provide the name of the test case that this class implements. The exact names are defined in the + * interop testing spec. + * + * @return The name of the test case that this implements. + */ + public String getName(); + + /** + * Determines whether the test invite that matched this test case is acceptable. + * + * @param inviteMessage The invitation to accept or reject. + * + * @return true to accept the invitation, false to reject it. + * + * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through. + */ + public boolean acceptInvite(Message inviteMessage) throws JMSException; + + /** + * Assigns the role to be played by this test case. The test parameters are fully specified in the + * assignment message. When this method return the test case will be ready to execute. + * + * @param role The role to be played; sender or receiver. + * @param assignRoleMessage The role assingment message, contains the full test parameters. + * + * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through. + */ + public void assignRole(Roles role, Message assignRoleMessage) throws JMSException; + + /** + * Performs the test case actions. + * return from here when you have finished the test.. this will signal the controller that the test has ended. + * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through. + */ + public void start() throws JMSException; + + /** + * Gives notice of termination of the test case actions. + * + * @throws JMSException Any JMSException resulting from allowed to fall through. + */ + public void terminate() throws JMSException, InterruptedException; + + /** + * Gets a report on the actions performed by the test case in its assigned role. + * + * @param session The session to create the report message in. + * + * @return The report message. + * + * @throws JMSException Any JMSExceptions resulting from creating the report are allowed to fall through. + */ + public Message getReport(Session session) throws JMSException; +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java new file mode 100644 index 0000000000..a904bfa419 --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java @@ -0,0 +1,422 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.interop.testclient; + +import org.apache.log4j.Logger; +import org.apache.qpid.interop.testclient.testcases.TestCase1DummyRun; +import org.apache.qpid.interop.testclient.testcases.TestCase2BasicP2P; +import org.apache.qpid.util.CommandLineParser; +import org.apache.qpid.util.PropertiesUtils; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +/** + * Implements a test client as described in the interop testing spec + * (http://cwiki.apache.org/confluence/display/qpid/Interop+Testing+Specification). A test client is an agent that + * reacts to control message sequences send by the test {@link org.apache.qpid.interop.coordinator.Coordinator}. + * + *

+ *
Messages Handled by TestClient
Message Action + *
Invite(compulsory) Reply with Enlist. + *
Invite(test case) Reply with Enlist if test case available. + *
AssignRole(test case) Reply with Accept Role if matches an enlisted test. Keep test parameters. + *
Start Send test messages defined by test parameters. Send report on messages sent. + *
Status Request Send report on messages received. + *
+ * + *

+ *
CRC Card
Responsibilities Collaborations + *
Handle all incoming control messages. {@link InteropClientTestCase} + *
Configure and look up test cases by name. {@link InteropClientTestCase} + *
+ */ +public class TestClient implements MessageListener +{ + private static Logger log = Logger.getLogger(TestClient.class); + + public static final String CONNECTION_PROPERTY = "connectionfactory.broker"; + public static final String CONNECTION_NAME = "broker"; + public static final String CLIENT_NAME = "java"; + public static final String DEFAULT_CONNECTION_PROPS_RESOURCE = "org/apache/qpid/interop/connection.properties"; + + /** Holds the URL of the broker to run the tests on. */ + public static String brokerUrl; + + /** Holds the virtual host to run the tests on. If null, then the default virtual host is used. */ + public static String virtualHost; + + /** Holds all the test cases loaded from the classpath. */ + Map testCases = new HashMap(); + + protected InteropClientTestCase currentTestCase; + + protected Connection _connection; + protected MessageProducer producer; + protected Session session; + + protected String clientName = CLIENT_NAME; + + /** + * Creates a new interop test client, listenting to the specified broker and virtual host, with the specified client + * identifying name. + * + * @param brokerUrl The url of the broker to connect to. + * @param virtualHost The virtual host to conect to. + * @param clientName The client name to use. + */ + public TestClient(String brokerUrl, String virtualHost, String clientName) + { + log.debug("public TestClient(String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost + + ", String clientName = " + clientName + "): called"); + + // Retain the connection parameters. + this.brokerUrl = brokerUrl; + this.virtualHost = virtualHost; + this.clientName = clientName; + } + + /** + * The entry point for the interop test coordinator. This client accepts the following command line arguments: + * + *

+ *
-b The broker URL. Optional. + *
-h The virtual host. Optional. + *
-n The test client name. Optional. + *
name=value Trailing argument define name/value pairs. Added to system properties. Optional. + *
+ * + * @param args The command line arguments. + */ + public static void main(String[] args) + { + // Use the command line parser to evaluate the command line. + CommandLineParser commandLine = + new CommandLineParser( + new String[][] + { + {"b", "The broker URL.", "broker", "false"}, + {"h", "The virtual host to use.", "virtual host", "false"}, + {"n", "The test client name.", "name", "false"} + }); + + // Capture the command line arguments or display errors and correct usage and then exit. + Properties options = null; + + try + { + options = commandLine.parseCommandLine(args); + } + catch (IllegalArgumentException e) + { + System.out.println(commandLine.getErrors()); + System.out.println(commandLine.getUsage()); + System.exit(1); + } + + // Extract the command line options. + String brokerUrl = options.getProperty("b"); + String virtualHost = options.getProperty("h"); + String clientName = options.getProperty("n"); + + // Add all the trailing command line options (name=value pairs) to system properties. Tests may pick up + // overridden values from there. + commandLine.addCommandLineToSysProperties(); + + // Create a test client and start it running. + TestClient client = new TestClient(brokerUrl, virtualHost, (clientName == null) ? CLIENT_NAME : clientName); + + // Use a class path scanner to find all the interop test case implementations. + Collection> testCaseClasses = + new ArrayList>(); + // ClasspathScanner.getMatches(InteropClientTestCase.class, "^TestCase.*", true); + // Hard code the test classes till the classpath scanner is fixed. + Collections.addAll(testCaseClasses, + new Class[]{TestCase1DummyRun.class, TestCase2BasicP2P.class, TestClient.class}); + + try + { + client.start(testCaseClasses); + } + catch (Exception e) + { + log.error("The test client was unable to start.", e); + System.exit(1); + } + } + + /** + * Starts the interop test client running. This causes it to start listening for incoming test invites. + * + * @throws JMSException Any underlying JMSExceptions are allowed to fall through. @param testCaseClasses + */ + protected void start(Collection> testCaseClasses) throws JMSException + { + log.debug("private void start(): called"); + + // Create all the test case implementations and index them by the test names. + for (Class nextClass : testCaseClasses) + { + try + { + InteropClientTestCase testCase = nextClass.newInstance(); + testCases.put(testCase.getName(), testCase); + } + catch (InstantiationException e) + { + log.warn("Could not instantiate test case class: " + nextClass.getName(), e); + // Ignored. + } + catch (IllegalAccessException e) + { + log.warn("Could not instantiate test case class due to illegal access: " + nextClass.getName(), e); + // Ignored. + } + } + + // Open a connection to communicate with the coordinator on. + _connection = createConnection(DEFAULT_CONNECTION_PROPS_RESOURCE, clientName, brokerUrl, virtualHost); + + session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Set this up to listen for control messages. + MessageConsumer consumer = session.createConsumer(session.createTopic("iop.control." + clientName)); + consumer.setMessageListener(this); + + MessageConsumer consumer2 = session.createConsumer(session.createTopic("iop.control")); + consumer2.setMessageListener(this); + + // Create a producer to send replies with. + producer = session.createProducer(null); + + // Start listening for incoming control messages. + _connection.start(); + } + + + public static Connection createConnection(String connectionPropsResource, String brokerUrl, String virtualHost) + { + return createConnection(connectionPropsResource, "clientID", brokerUrl, virtualHost); + } + + /** + * Establishes a JMS connection using a properties file and qpids built in JNDI implementation. This is a simple + * convenience method for code that does anticipate handling connection failures. All exceptions that indicate that + * the connection has failed, are wrapped as rutime exceptions, preumably handled by a top level failure handler. + * + * @param connectionPropsResource The name of the connection properties file. + * @param clientID + * @param brokerUrl The broker url to connect to, null to use the default from the + * properties. + * @param virtualHost The virtual host to connectio to, null to use the default. + * + * @return A JMS conneciton. + * + * @todo Make username/password configurable. Allow multiple urls for fail over. Once it feels right, move it to a + * Utils library class. + */ + public static Connection createConnection(String connectionPropsResource, String clientID, String brokerUrl, String virtualHost) + { + log.debug("public static Connection createConnection(String connectionPropsResource = " + connectionPropsResource + + ", String brokerUrl = " + brokerUrl + ", String clientID = " + clientID + + ", String virtualHost = " + virtualHost + " ): called"); + + try + { + Properties connectionProps = + PropertiesUtils.getProperties(TestClient.class.getClassLoader().getResourceAsStream( + connectionPropsResource)); + + if (brokerUrl != null) + { + String connectionString = + "amqp://guest:guest@" + clientID + "/" + ((virtualHost != null) ? virtualHost : "") + "?brokerlist='" + brokerUrl + "'"; + connectionProps.setProperty(CONNECTION_PROPERTY, connectionString); + } + + Context ctx = new InitialContext(connectionProps); + + ConnectionFactory cf = (ConnectionFactory) ctx.lookup(CONNECTION_NAME); + Connection connection = cf.createConnection(); + + return connection; + } + catch (IOException e) + { + throw new RuntimeException(e); + } + catch (NamingException e) + { + throw new RuntimeException(e); + } + catch (JMSException e) + { + throw new RuntimeException(e); + } + } + + /** + * Handles all incoming control messages. + * + * @param message The incoming message. + */ + public void onMessage(Message message) + { + log.debug("public void onMessage(Message message = " + message + "): called"); + + try + { + String controlType = message.getStringProperty("CONTROL_TYPE"); + String testName = message.getStringProperty("TEST_NAME"); + + log.info("onMessage(Message message = " + message + "): for '" + controlType + "' to '" + testName + "'"); + + // Check if the message is a test invite. + if ("INVITE".equals(controlType)) + { + // Flag used to indicate that an enlist should be sent. Only enlist to compulsory invites or invites + // for which test cases exist. + boolean enlist = false; + + if (testName != null) + { + log.debug("Got an invite to test: " + testName); + + // Check if the requested test case is available. + InteropClientTestCase testCase = testCases.get(testName); + + if (testCase != null) + { + // Make the requested test case the current test case. + currentTestCase = testCase; + enlist = true; + } + else + { + log.warn("'" + testName + "' not part of this clients tests."); + } + } + else + { + log.debug("Got a compulsory invite."); + + enlist = true; + } + + if (enlist) + { + // Reply with the client name in an Enlist message. + Message enlistMessage = session.createMessage(); + enlistMessage.setStringProperty("CONTROL_TYPE", "ENLIST"); + enlistMessage.setStringProperty("CLIENT_NAME", clientName); + enlistMessage.setStringProperty("CLIENT_PRIVATE_CONTROL_KEY", "iop.control." + clientName); + enlistMessage.setJMSCorrelationID(message.getJMSCorrelationID()); + + log.info("Sending Message '" + enlistMessage + "'. to " + message.getJMSReplyTo()); + + producer.send(message.getJMSReplyTo(), enlistMessage); + } + } + else if ("ASSIGN_ROLE".equals(controlType)) + { + // Assign the role to the current test case. + String roleName = message.getStringProperty("ROLE"); + + log.debug("Got a role assignment to role: " + roleName); + + InteropClientTestCase.Roles role = Enum.valueOf(InteropClientTestCase.Roles.class, roleName); + + currentTestCase.assignRole(role, message); + + // Reply by accepting the role in an Accept Role message. + Message acceptRoleMessage = session.createMessage(); + acceptRoleMessage.setStringProperty("CONTROL_TYPE", "ACCEPT_ROLE"); + acceptRoleMessage.setJMSCorrelationID(message.getJMSCorrelationID()); + + producer.send(message.getJMSReplyTo(), acceptRoleMessage); + } + else if ("START".equals(controlType) || "STATUS_REQUEST".equals(controlType)) + { + if ("START".equals(controlType)) + { + log.debug("Got a start notification."); + + // Start the current test case. + currentTestCase.start(); + } + else + { + log.debug("Got a status request."); + } + + // Generate the report from the test case and reply with it as a Report message. + Message reportMessage = currentTestCase.getReport(session); + reportMessage.setStringProperty("CONTROL_TYPE", "REPORT"); + reportMessage.setJMSCorrelationID(message.getJMSCorrelationID()); + + producer.send(message.getJMSReplyTo(), reportMessage); + } + else if ("TERMINATE".equals(controlType)) + { + log.info("Received termination instruction from coordinator."); + +// try +// { +// currentTestCase.terminate(); +// } +// catch (InterruptedException e) +// { +// // +// } + // Is a cleaner shutdown needed? + _connection.close(); + System.exit(0); + } + else + { + // Log a warning about this but otherwise ignore it. + log.warn("Got an unknown control message, controlType = " + controlType + ", message = " + message); + } + } + catch (JMSException e) + { + // Log a warning about this, but otherwise ignore it. + log.warn("A JMSException occurred whilst handling a message."); + log.debug("Got JMSException whilst handling message: " + message, e); + } + } +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java new file mode 100644 index 0000000000..5f257c0b36 --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java @@ -0,0 +1,96 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.interop.testclient.testcases; + +import org.apache.log4j.Logger; + +import org.apache.qpid.interop.testclient.InteropClientTestCase; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Session; + +/** + * Implements tet case 1, dummy run. This test case sends no test messages, it exists to confirm that the test harness + * is interacting with the coordinator correctly. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Supply the name of the test case that this implements. + *
Accept/Reject invites based on test parameters. + *
Adapt to assigned roles. + *
Perform test case actions. + *
Generate test reports. + *
+ */ +public class TestCase1DummyRun implements InteropClientTestCase +{ + private static final Logger log = Logger.getLogger(TestCase1DummyRun.class); + + public String getName() + { + log.debug("public String getName(): called"); + + return "TC1_DummyRun"; + } + + public boolean acceptInvite(Message inviteMessage) throws JMSException + { + log.debug("public boolean acceptInvite(Message inviteMessage): called"); + + // Test parameters don't matter, accept all invites. + return true; + } + + public void assignRole(Roles role, Message assignRoleMessage) throws JMSException + { + log.debug("public void assignRole(Roles role, Message assignRoleMessage): called"); + + // Do nothing, both roles are the same. + } + + public void start() + { + log.debug("public void start(): called"); + + // Do nothing. + } + + public void terminate() throws JMSException + { + //todo + } + + public Message getReport(Session session) throws JMSException + { + log.debug("public Message getReport(Session session): called"); + + // Generate a dummy report, the coordinator expects a report but doesn't care what it is. + return session.createTextMessage("Dummy Run, Ok."); + } + + public void onMessage(Message message) + { + log.debug("public void onMessage(Message message = " + message + "): called"); + + // Ignore any messages. + } +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java new file mode 100644 index 0000000000..ff56ee9b93 --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java @@ -0,0 +1,214 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.interop.testclient.testcases; + +import javax.jms.*; + +import org.apache.log4j.Logger; + +import org.apache.qpid.interop.testclient.InteropClientTestCase; +import org.apache.qpid.interop.testclient.TestClient; + +/** + * Implements test case 2, basic P2P. Sends/received a specified number of messages to a specified route on the + * default direct exchange. Produces reports on the actual number of messages sent/received. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Supply the name of the test case that this implements. + *
Accept/Reject invites based on test parameters. + *
Adapt to assigned roles. + *
Send required number of test messages. + *
Generate test reports. + *
+ */ +public class TestCase2BasicP2P implements InteropClientTestCase +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(TestCase2BasicP2P.class); + + /** Holds the count of test messages received. */ + private int messageCount; + + /** The role to be played by the test. */ + private Roles role; + + /** The number of test messages to send. */ + private int numMessages; + + /** The routing key to send them to on the default direct exchange. */ + private Destination sendDestination; + + /** The connection to send the test messages on. */ + private Connection connection; + + /** The session to send the test messages on. */ + private Session session; + + /** The producer to send the test messages with. */ + MessageProducer producer; + + /** + * Should provide the name of the test case that this class implements. The exact names are defined in the + * interop testing spec. + * + * @return The name of the test case that this implements. + */ + public String getName() + { + log.debug("public String getName(): called"); + + return "TC2_BasicP2P"; + } + + /** + * Determines whether the test invite that matched this test case is acceptable. + * + * @param inviteMessage The invitation to accept or reject. + * + * @return true to accept the invitation, false to reject it. + * + * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through. + */ + public boolean acceptInvite(Message inviteMessage) throws JMSException + { + log.debug("public boolean acceptInvite(Message inviteMessage = " + inviteMessage + "): called"); + + // All invites are acceptable. + return true; + } + + /** + * Assigns the role to be played by this test case. The test parameters are fully specified in the + * assignment message. When this method return the test case will be ready to execute. + * + * @param role The role to be played; sender or receiver. + * + * @param assignRoleMessage The role assingment message, contains the full test parameters. + * + * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through. + */ + public void assignRole(Roles role, Message assignRoleMessage) throws JMSException + { + log.debug("public void assignRole(Roles role = " + role + ", Message assignRoleMessage = " + assignRoleMessage + + "): called"); + + // Reset the message count for a new test. + messageCount = 0; + + // Take note of the role to be played. + this.role = role; + + // Create a new connection to pass the test messages on. + connection = + TestClient.createConnection(TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, TestClient.brokerUrl, + TestClient.virtualHost); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Extract and retain the test parameters. + numMessages = assignRoleMessage.getIntProperty("P2P_NUM_MESSAGES"); + sendDestination = session.createQueue(assignRoleMessage.getStringProperty("P2P_QUEUE_AND_KEY_NAME")); + + log.debug("numMessages = " + numMessages); + log.debug("sendDestination = " + sendDestination); + log.debug("role = " + role); + + switch (role) + { + // Check if the sender role is being assigned, and set up a message producer if so. + case SENDER: + producer = session.createProducer(sendDestination); + break; + + // Otherwise the receiver role is being assigned, so set this up to listen for messages. + case RECEIVER: + MessageConsumer consumer = session.createConsumer(sendDestination); + consumer.setMessageListener(this); + break; + } + + connection.start(); + } + + /** + * Performs the test case actions. + */ + public void start() throws JMSException + { + log.debug("public void start(): called"); + + // Check that the sender role is being performed. + if (role.equals(Roles.SENDER)) + { + Message testMessage = session.createTextMessage("test"); + + for (int i = 0; i < numMessages; i++) + { + producer.send(testMessage); + + // Increment the message count. + messageCount++; + } + } + } + + public void terminate() throws JMSException + { + //todo + } + + /** + * Gets a report on the actions performed by the test case in its assigned role. + * + * @param session The session to create the report message in. + * + * @return The report message. + * + * @throws JMSException Any JMSExceptions resulting from creating the report are allowed to fall through. + */ + public Message getReport(Session session) throws JMSException + { + log.debug("public Message getReport(Session session): called"); + + // Close the test connection. + connection.close(); + + // Generate a report message containing the count of the number of messages passed. + Message report = session.createMessage(); + report.setStringProperty("CONTROL_TYPE", "REPORT"); + report.setIntProperty("MESSAGE_COUNT", messageCount); + + return report; + } + + /** + * Counts incoming test messages. + * + * @param message The incoming test message. + */ + public void onMessage(Message message) + { + log.debug("public void onMessage(Message message = " + message + "): called"); + + // Increment the message count. + messageCount++; + } +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java new file mode 100644 index 0000000000..7b35142c82 --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java @@ -0,0 +1,249 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.interop.testclient.testcases; + +import javax.jms.*; + +import org.apache.log4j.Logger; + +import org.apache.qpid.interop.testclient.InteropClientTestCase; + +/** + * Implements test case 3, basic pub/sub. Sends/received a specified number of messages to a specified route on the + * default topic exchange, using the specified number of receiver connections. Produces reports on the actual number of + * messages sent/received. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Supply the name of the test case that this implements. + *
Accept/Reject invites based on test parameters. + *
Adapt to assigned roles. + *
Send required number of test messages using pub/sub. + *
Generate test reports. + *
+ */ +public class TestCase3BasicPubSub implements InteropClientTestCase +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(TestCase3BasicPubSub.class); + + /** Holds the count of test messages received. */ + private int messageCount; + + /** The role to be played by the test. */ + private Roles role; + + /** The number of test messages to send. */ + private int numMessages; + + /** The number of receiver connection to use. */ + private int numReceivers; + + /** The routing key to send them to on the default direct exchange. */ + private Destination sendDestination; + + /** The connections to send/receive the test messages on. */ + private Connection[] connection; + + /** The sessions to send/receive the test messages on. */ + private Session[] session; + + /** The producer to send the test messages with. */ + MessageProducer producer; + + /** + * Should provide the name of the test case that this class implements. The exact names are defined in the + * interop testing spec. + * + * @return The name of the test case that this implements. + */ + public String getName() + { + log.debug("public String getName(): called"); + + return "TC3_BasicPubSub"; + } + + /** + * Determines whether the test invite that matched this test case is acceptable. + * + * @param inviteMessage The invitation to accept or reject. + * + * @return true to accept the invitation, false to reject it. + * + * @throws javax.jms.JMSException Any JMSException resulting from reading the message are allowed to fall through. + */ + public boolean acceptInvite(Message inviteMessage) throws JMSException + { + log.debug("public boolean acceptInvite(Message inviteMessage = " + inviteMessage + "): called"); + + // All invites are acceptable. + return true; + } + + /** + * Assigns the role to be played by this test case. The test parameters are fully specified in the + * assignment message. When this method return the test case will be ready to execute. + * + * @param role The role to be played; sender or receiver. + * + * @param assignRoleMessage The role assingment message, contains the full test parameters. + * + * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through. + */ + public void assignRole(Roles role, Message assignRoleMessage) throws JMSException + { + log.debug("public void assignRole(Roles role = " + role + ", Message assignRoleMessage = " + assignRoleMessage + + "): called"); + + // Reset the message count for a new test. + messageCount = 0; + + // Take note of the role to be played. + this.role = role; + + // Extract and retain the test parameters. + numMessages = assignRoleMessage.getIntProperty("PUBSUB_NUM_MESSAGES"); + numReceivers = assignRoleMessage.getIntProperty("PUBSUB_NUM_RECEIVERS"); + String sendKey = assignRoleMessage.getStringProperty("PUBSUB_KEY"); + + log.debug("numMessages = " + numMessages); + log.debug("numReceivers = " + numReceivers); + log.debug("sendKey = " + sendKey); + log.debug("role = " + role); + + switch (role) + { + // Check if the sender role is being assigned, and set up a single message producer if so. + case SENDER: + // Create a new connection to pass the test messages on. + connection = new Connection[1]; + session = new Session[1]; + + connection[0] = + org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, org.apache.qpid.interop.testclient.TestClient.brokerUrl, + org.apache.qpid.interop.testclient.TestClient.virtualHost); + session[0] = connection[0].createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Extract and retain the test parameters. + sendDestination = session[0].createTopic(sendKey); + + producer = session[0].createProducer(sendDestination); + break; + + // Otherwise the receiver role is being assigned, so set this up to listen for messages on the required number + // of receiver connections. + case RECEIVER: + // Create the required number of receiver connections. + connection = new Connection[numReceivers]; + session = new Session[numReceivers]; + + for (int i = 0; i < numReceivers; i++) + { + connection[i] = + org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, org.apache.qpid.interop.testclient.TestClient.brokerUrl, + org.apache.qpid.interop.testclient.TestClient.virtualHost); + session[i] = connection[i].createSession(false, Session.AUTO_ACKNOWLEDGE); + + sendDestination = session[i].createTopic(sendKey); + + MessageConsumer consumer = session[i].createConsumer(sendDestination); + consumer.setMessageListener(this); + } + + break; + } + + // Start all the connection dispatcher threads running. + for (int i = 0; i < connection.length; i++) + { + connection[i].start(); + } + } + + /** + * Performs the test case actions. + */ + public void start() throws JMSException + { + log.debug("public void start(): called"); + + // Check that the sender role is being performed. + if (role.equals(Roles.SENDER)) + { + Message testMessage = session[0].createTextMessage("test"); + + for (int i = 0; i < numMessages; i++) + { + producer.send(testMessage); + + // Increment the message count. + messageCount++; + } + } + } + + public void terminate() throws JMSException, InterruptedException + { + //todo + } + + /** + * Gets a report on the actions performed by the test case in its assigned role. + * + * @param session The session to create the report message in. + * + * @return The report message. + * + * @throws JMSException Any JMSExceptions resulting from creating the report are allowed to fall through. + */ + public Message getReport(Session session) throws JMSException + { + log.debug("public Message getReport(Session session): called"); + + // Close the test connections. + for (int i = 0; i < connection.length; i++) + { + connection[i].close(); + } + + // Generate a report message containing the count of the number of messages passed. + Message report = session.createMessage(); + report.setStringProperty("CONTROL_TYPE", "REPORT"); + report.setIntProperty("MESSAGE_COUNT", messageCount); + + return report; + } + + /** + * Counts incoming test messages. + * + * @param message The incoming test message. + */ + public void onMessage(Message message) + { + log.debug("public void onMessage(Message message = " + message + "): called"); + + // Increment the message count. + messageCount++; + } +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedClientTestCase.java b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedClientTestCase.java deleted file mode 100644 index 65e05fab4b..0000000000 --- a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedClientTestCase.java +++ /dev/null @@ -1,905 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.sustained; - -import org.apache.log4j.Logger; - -import org.apache.qpid.client.AMQNoConsumersException; -import org.apache.qpid.client.AMQNoRouteException; -import org.apache.qpid.test.framework.distributedtesting.TestClient; -import org.apache.qpid.interop.clienttestcases.TestCase3BasicPubSub; -import org.apache.qpid.test.framework.TestUtils; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.CountDownLatch; - -/** - * Implements test case 3, basic pub/sub. Sends/received a specified number of messages to a specified route on the - * default topic exchange, using the specified number of receivers connections. Produces reports on the actual number of - * messages sent/received. - * - *

- *
CRC Card
Responsibilities Collaborations - *
Supply the name of the test case that this implements. - *
Accept/Reject invites based on test parameters. - *
Adapt to assigned roles. - *
Send required number of test messages using pub/sub.
Generate test reports. - *
- */ -public class SustainedClientTestCase extends TestCase3BasicPubSub implements ExceptionListener -{ - /** Used for debugging. */ - private static final Logger log = Logger.getLogger(SustainedClientTestCase.class); - - /** Used to log to the console. */ - private static final Logger console = Logger.getLogger("SustainedTest"); - - /** The role to be played by the test. */ - private Roles role; - - /** The number of receivers connection to use. */ - private int numReceivers; - - /** The routing key to send them to on the default direct exchange. */ - private Destination sendDestination; - - /** The routing key to send updates to on the default direct exchange. */ - private Destination sendUpdateDestination; - - /** The connections to send/receive the test messages on. */ - private Connection[] connection; - - /** The sessions to send/receive the test messages on. */ - private Session[] session; - - /** The producer to send the test messages with. */ - MessageProducer producer; - - /** Adapter that adjusts the send rate based on the updates from clients. */ - SustainedRateAdapter _rateAdapter; - - /** */ - int _batchSize; - - private static final long TEN_MILLI_SEC = 10000000; - private static final int DEBUG_LOG_UPATE_INTERVAL = 10; - private static final int LOG_UPATE_INTERVAL = 10; - private static final boolean SLEEP_PER_MESSAGE = Boolean.getBoolean("sleepPerMessage"); - - /** - * Should provide the name of the test case that this class implements. The exact names are defined in the interop - * testing spec. - * - * @return The name of the test case that this implements. - */ - public String getName() - { - log.debug("public String getName(): called"); - - return "Perf_SustainedPubSub"; - } - - /** - * Assigns the role to be played by this test case. The test parameters are fully specified in the assignment - * message. When this method return the test case will be ready to execute. - * - * @param role The role to be played; sender or receivers. - * @param assignRoleMessage The role assingment message, contains the full test parameters. - * - * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through. - */ - public void assignRole(Roles role, Message assignRoleMessage) throws JMSException - { - log.debug("public void assignRole(Roles role = " + role + ", Message assignRoleMessage = " + assignRoleMessage - + "): called"); - - // Take note of the role to be played. - this.role = role; - - // Extract and retain the test parameters. - numReceivers = assignRoleMessage.getIntProperty("SUSTAINED_NUM_RECEIVERS"); - _batchSize = assignRoleMessage.getIntProperty("SUSTAINED_UPDATE_INTERVAL"); - String sendKey = assignRoleMessage.getStringProperty("SUSTAINED_KEY"); - String sendUpdateKey = assignRoleMessage.getStringProperty("SUSTAINED_UPDATE_KEY"); - int ackMode = assignRoleMessage.getIntProperty("ACKNOWLEDGE_MODE"); - String clientName = assignRoleMessage.getStringProperty("CLIENT_NAME"); - - if (log.isDebugEnabled()) - { - log.debug("numReceivers = " + numReceivers); - log.debug("_batchSize = " + _batchSize); - log.debug("ackMode = " + ackMode); - log.debug("sendKey = " + sendKey); - log.debug("sendUpdateKey = " + sendUpdateKey); - log.debug("role = " + role); - } - - switch (role) - { - // Check if the sender role is being assigned, and set up a single message producer if so. - case SENDER: - console.info("Creating Sender"); - // Create a new connection to pass the test messages on. - connection = new Connection[1]; - session = new Session[1]; - - connection[0] = TestUtils.createConnection(TestClient.testContextProperties); - session[0] = connection[0].createSession(false, ackMode); - - // Extract and retain the test parameters. - sendDestination = session[0].createTopic(sendKey); - - connection[0].setExceptionListener(this); - - producer = session[0].createProducer(sendDestination); - - sendUpdateDestination = session[0].createTopic(sendUpdateKey); - MessageConsumer updateConsumer = session[0].createConsumer(sendUpdateDestination); - - _rateAdapter = new SustainedRateAdapter(this); - updateConsumer.setMessageListener(_rateAdapter); - - break; - - // Otherwise the receivers role is being assigned, so set this up to listen for messages on the required number - // of receivers connections. - case RECEIVER: - console.info("Creating Receiver"); - // Create the required number of receivers connections. - connection = new Connection[numReceivers]; - session = new Session[numReceivers]; - - for (int i = 0; i < numReceivers; i++) - { - connection[i] = TestUtils.createConnection(TestClient.testContextProperties); - session[i] = connection[i].createSession(false, ackMode); - - sendDestination = session[i].createTopic(sendKey); - - sendUpdateDestination = session[i].createTopic(sendUpdateKey); - - MessageConsumer consumer = session[i].createConsumer(sendDestination); - - consumer.setMessageListener(new SustainedListener(clientName + "-" + i, _batchSize, session[i], - sendUpdateDestination)); - } - - break; - } - - // Start all the connection dispatcher threads running. - for (int i = 0; i < connection.length; i++) - { - connection[i].start(); - } - } - - /** Performs the test case actions. */ - public void start() throws JMSException - { - log.debug("public void start(): called"); - - // Check that the sender role is being performed. - switch (role) - { - // Check if the sender role is being assigned, and set up a single message producer if so. - case SENDER: - _rateAdapter.run(); - break; - case RECEIVER: - - } - - // return from here when you have finished the test.. this will signal the controller and - } - - public void terminate() throws JMSException, InterruptedException - { - if (_rateAdapter != null) - { - _rateAdapter.stop(); - } - } - - /** - * Gets a report on the actions performed by the test case in its assigned role. - * - * @param session The session to create the report message in. - * - * @return The report message. - * - * @throws JMSException Any JMSExceptions resulting from creating the report are allowed to fall through. - */ - public Message getReport(Session session) throws JMSException - { - log.debug("public Message getReport(Session session): called"); - - // Close the test connections. - for (int i = 0; i < connection.length; i++) - { - connection[i].close(); - } - - Message report = session.createMessage(); - report.setStringProperty("CONTROL_TYPE", "REPORT"); - - return report; - } - - public void onException(JMSException jmsException) - { - Exception linked = jmsException.getLinkedException(); - - if (linked != null) - { - if (log.isDebugEnabled()) - { - log.debug("Linked Exception:" + linked); - } - - if ((linked instanceof AMQNoRouteException) || (linked instanceof AMQNoConsumersException)) - { - if (log.isDebugEnabled()) - { - if (linked instanceof AMQNoConsumersException) - { - log.warn("No clients currently available for message:" - + ((AMQNoConsumersException) linked).getUndeliveredMessage()); - } - else - { - log.warn("No route for message"); - } - } - - // Tell the rate adapter that there are no clients ready yet - _rateAdapter.NO_CLIENTS = true; - } - } - else - { - log.warn("Exception:" + linked); - } - } - - /** - * Inner class that listens for messages and sends a report for the time taken between receiving the 'start' and - * 'end' messages. - */ - class SustainedListener implements MessageListener - { - /** Number of messages received */ - private long _received = 0; - /** The number of messages in the batch */ - private int _batchSize = 0; - /** Record of the when the 'start' messagse was sen */ - private Long _startTime; - /** Message producer to use to send reports */ - MessageProducer _updater; - /** Session to create the report message on */ - Session _session; - /** Record of the client ID used for this SustainedListnener */ - String _client; - - /** - * Main Constructor - * - * @param clientname The _client id used to identify this connection. - * @param batchSize The number of messages that are to be sent per batch. Note: This is not used to - * control the interval between sending reports. - * @param session The session used for communication. - * @param sendDestination The destination that update reports should be sent to. - * - * @throws JMSException My occur if creatingthe Producer fails - */ - public SustainedListener(String clientname, int batchSize, Session session, Destination sendDestination) - throws JMSException - { - _batchSize = batchSize; - _client = clientname; - _session = session; - _updater = session.createProducer(sendDestination); - } - - public void onMessage(Message message) - { - if (log.isTraceEnabled()) - { - log.trace("Message " + _received + "received in listener"); - } - - if (message instanceof TextMessage) - { - try - { - _received++; - if (((TextMessage) message).getText().equals("start")) - { - log.debug("Starting Batch"); - _startTime = System.nanoTime(); - } - else if (((TextMessage) message).getText().equals("end")) - { - if (_startTime != null) - { - long currentTime = System.nanoTime(); - sendStatus(currentTime - _startTime, _received, message.getIntProperty("BATCH")); - log.debug("End Batch"); - } - } - } - catch (JMSException e) - { - // ignore error - } - } - - } - - /** - * sendStatus creates and sends the report back to the publisher - * - * @param time taken for the the last batch - * @param received Total number of messages received. - * @param batchNumber the batch number - * @throws JMSException if an error occurs during the send - */ - private void sendStatus(long time, long received, int batchNumber) throws JMSException - { - Message updateMessage = _session.createTextMessage("update"); - updateMessage.setStringProperty("CLIENT_ID", ":" + _client); - updateMessage.setStringProperty("CONTROL_TYPE", "UPDATE"); - updateMessage.setLongProperty("RECEIVED", received); - updateMessage.setIntProperty("BATCH", batchNumber); - updateMessage.setLongProperty("DURATION", time); - - if (log.isInfoEnabled()) - { - log.info("**** SENDING [" + batchNumber + "]**** " + "CLIENT_ID:" + _client + " RECEIVED:" + received - + " BATCH:" + batchNumber + " DURATION:" + time); - } - - // Output on the main console.info the details of this batch - if ((batchNumber % 10) == 0) - { - console.info("Sending Report [" + batchNumber + "] " + "CLIENT_ID:" + _client + " RECEIVED:" + received - + " BATCH:" + batchNumber + " DURATION:" + time); - } - - _updater.send(updateMessage); - } - } - - /** - * This class is used here to adjust the _delay value which in turn is used to control the number of messages/second - * that are sent through the test system. - * - * By keeping a record of the messages recevied and the average time taken to process the batch size can be - * calculated and so the delay can be adjusted to maintain that rate. - * - * Given that delays of < 10ms can be rounded up the delay is only used between messages if the _delay > 10ms * no - * messages in the batch. Otherwise the delay is used at the end of the batch. - */ - class SustainedRateAdapter implements MessageListener, Runnable - { - private SustainedClientTestCase _client; - private long _batchVariance = Integer.getInteger("batchVariance", 3); // no. batches to allow drifting - private long _timeVariance = TEN_MILLI_SEC * 5; // no. nanos between send and report delay (10ms) - private volatile long _delay; // in nanos - private long _sent; - private Map _slowClients = new HashMap(); - private static final long PAUSE_SLEEP = TEN_MILLI_SEC / 1000; // 10 ms - private static final long NO_CLIENT_SLEEP = 1000; // 1s - private volatile boolean NO_CLIENTS = true; - private int _delayShifting; - private final int REPORTS_WITHOUT_CHANGE = Integer.getInteger("stableReportCount", 5); - private boolean _warmedup = false; - private static final long EXPECTED_TIME_PER_BATCH = 100000L; - private int _warmUpBatches = Integer.getInteger("warmUpBatches", 10); - - SustainedRateAdapter(SustainedClientTestCase client) - { - _client = client; - } - - public void onMessage(Message message) - { - if (log.isDebugEnabled()) - { - log.debug("SustainedRateAdapter onMessage(Message message = " + message + "): called"); - } - - try - { - String controlType = message.getStringProperty("CONTROL_TYPE"); - - // Check if the message is a test invite. - if ("UPDATE".equals(controlType)) - { - NO_CLIENTS = false; - long duration = message.getLongProperty("DURATION"); - long totalReceived = message.getLongProperty("RECEIVED"); - String client = message.getStringProperty("CLIENT_ID"); - int batchNumber = message.getIntProperty("BATCH"); - - if (log.isInfoEnabled() && ((batchNumber % DEBUG_LOG_UPATE_INTERVAL) == 0)) - { - log.info("Update Report: CLIENT_ID:" + client + " RECEIVED:" + totalReceived + " Recevied BATCH:" - + batchNumber + " DURATION:" + duration); - } - - recordSlow(client, totalReceived, batchNumber); - - adjustDelay(client, batchNumber, duration); - - // Warm up completes when: - // we haven't warmed up - // and the number of batches sent to each client is at least half of the required warmup batches - if (!_warmedup && (batchNumber >= _warmUpBatches)) - { - _warmedup = true; - _warmup.countDown(); - - } - } - } - catch (JMSException e) - { - // - } - } - - CountDownLatch _warmup = new CountDownLatch(1); - - int _numBatches = 10000; - - // long[] _timings = new long[_numBatches]; - private boolean _running = true; - - public void run() - { - console.info("Warming up"); - - doBatch(_warmUpBatches); - - try - { - // wait for warmup to complete. - _warmup.await(); - - // set delay to the average length of the batches - _delay = _totalDuration / _warmUpBatches / delays.size(); - - console.info("Warmup complete delay set : " + _delay + " based on _totalDuration: " + _totalDuration - + " over no. batches: " + _warmUpBatches + " with client count: " + delays.size()); - - _totalDuration = 0L; - _totalReceived = 0L; - _sent = 0L; - } - catch (InterruptedException e) - { - // - } - - doBatch(_numBatches); - - } - - private void doBatch(int batchSize) // long[] timings, - { - TextMessage testMessage = null; - try - { - testMessage = _client.session[0].createTextMessage("start"); - - for (int batch = 0; batch <= batchSize; batch++) - // while (_running) - { - long start = System.nanoTime(); - - testMessage.setText("start"); - testMessage.setIntProperty("BATCH", batch); - - _client.producer.send(testMessage); - _rateAdapter.sentMessage(); - - testMessage.setText("test"); - // start at 2 so start and end count as part of batch - for (int m = 2; m < _batchSize; m++) - { - _client.producer.send(testMessage); - _rateAdapter.sentMessage(); - } - - testMessage.setText("end"); - _client.producer.send(testMessage); - _rateAdapter.sentMessage(); - - long end = System.nanoTime(); - - long sendtime = end - start; - - if (log.isDebugEnabled()) - { - log.info("Sent batch[" + batch + "](" + _batchSize + ") in " + sendtime); // timings[batch]); - } - - if ((batch % LOG_UPATE_INTERVAL) == 0) - { - console.info("Sent Batch[" + batch + "](" + _batchSize + ")" + status()); - } - - _rateAdapter.sleepBatch(); - - } - } - catch (JMSException e) - { - console.error("Runner ended"); - } - } - - private String status() - { - return " TotalDuration: " + _totalDuration + " for " + delays.size() + " consumers" + " Delay is " + _delay - + " resulting in " - + ((_delay > (TEN_MILLI_SEC * _batchSize)) ? ((_delay / _batchSize) + "/msg") : (_delay + "/batch")); - } - - private void sleepBatch() - { - if (checkForSlowClients()) - { // if there werwe slow clients we have already slept so don't sleep anymore again. - return; - } - - if (!SLEEP_PER_MESSAGE) - { - // per batch sleep.. if sleep is to small to spread over the batch. - if (_delay <= (TEN_MILLI_SEC * _batchSize)) - { - sleepLong(_delay); - } - else - { - log.info("Not sleeping _delay > ten*batch is:" + _delay); - } - } - } - - public void stop() - { - _running = false; - } - - Map delays = new HashMap(); - Long _totalReceived = 0L; - Long _totalDuration = 0L; - int _skipUpdate = 0; - - /** - * Adjust the delay for sending messages based on this update from the client - * - * @param client The client that send this update - * @param duration The time taken for the last batch of messagse - * @param batchNumber The reported batchnumber from the client - */ - private void adjustDelay(String client, int batchNumber, long duration) - { - // Retrieve the current total time taken for this client. - Long currentTime = delays.get(client); - - // Add the new duration time to this client - if (currentTime == null) - { - currentTime = duration; - } - else - { - currentTime += duration; - } - - delays.put(client, currentTime); - - long batchesSent = _sent / _batchSize; - - // ensure we don't divide by zero - if (batchesSent == 0) - { - batchesSent = 1L; - } - - _totalReceived += _batchSize; - _totalDuration += duration; - - // calculate average duration accross clients per batch - long averageDuration = _totalDuration / delays.size() / batchesSent; - - // calculate the difference between current send delay and average report delay - long diff = (duration) - averageDuration; - - if (log.isInfoEnabled() && ((batchNumber % DEBUG_LOG_UPATE_INTERVAL) == 0)) - { - log.info("TotalDuration:" + _totalDuration + " for " + delays.size() + " consumers." + " on batch: " - + batchesSent + " received batch: " + batchNumber + " Batch Duration: " + duration + " Average: " - + averageDuration + " so diff: " + diff + " for : " + client + " Delay is " + _delay + " resulting in " - + ((_delay > (TEN_MILLI_SEC * _batchSize)) ? ((_delay / _batchSize) + "/msg") : (_delay + "/batch"))); - } - - // if the averageDuration differs from the current by more than the specified variane then adjust delay. - if (Math.abs(diff) > _timeVariance) - { - - // if the the _delay is larger than the required duration to send report - // speed up - if (diff > TEN_MILLI_SEC) - { - _delay -= TEN_MILLI_SEC; - - if (_delay < 0) - { - _delay = 0; - log.info("Reset _delay to 0"); - delayStable(); - } - else - { - delayChanged(); - } - - } - else if (diff < 0) // diff < 0 diff cannot be 0 as it is > _timeVariance - { - // the report took longer - _delay += TEN_MILLI_SEC; - delayChanged(); - } - } - else - { - delayStable(); - } - - // If we have a consumer that is behind with the batches. - if ((batchesSent - batchNumber) > _batchVariance) - { - log.debug("Increasing _delay as sending more than receiving"); - - _delay += 2 * TEN_MILLI_SEC; - delayChanged(); - } - - } - - /** Reset the number of iterations before we say the delay has stabilised. */ - private void delayChanged() - { - _delayShifting = REPORTS_WITHOUT_CHANGE; - } - - /** - * Record the fact that delay has stabilised If delay has stablised for REPORTS_WITHOUT_CHANGE then it will - * output Delay stabilised - */ - private void delayStable() - { - _delayShifting--; - - if (_delayShifting < 0) - { - _delayShifting = 0; - console.debug("Delay stabilised:" + _delay); - } - } - - /** - * Checks that the client has received enough messages. If the client has fallen behind then they are put in the - * _slowClients lists which will increase the delay. - * - * @param client The client identifier to check - * @param received the number of messages received by that client - * @param batchNumber - */ - private void recordSlow(String client, long received, int batchNumber) - { - if (Math.abs(batchNumber - (_sent / _batchSize)) > _batchVariance) - { - _slowClients.put(client, received); - } - else - { - _slowClients.remove(client); - } - } - - /** Incrment the number of sent messages and then sleep, if required. */ - public void sentMessage() - { - - _sent++; - - if (_delay > (TEN_MILLI_SEC * _batchSize)) - { - long batchDelay = _delay / _batchSize; - // less than 10ms sleep doesn't always work. - // _delay is in nano seconds - // if (batchDelay < (TEN_MILLI_SEC)) - // { - // sleep(0, (int) batchDelay); - // } - // else - { - // if (batchDelay < 30000000000L) - { - sleepLong(batchDelay); - } - } - } - else - { - if (SLEEP_PER_MESSAGE && (_delay > 0)) - { - sleepLong(_delay / _batchSize); - } - } - } - - /** - * Check at the end of each batch and pause sending messages to allow slow clients to catch up. - * - * @return true if there were slow clients that caught up. - */ - private boolean checkForSlowClients() - { - // This will allways be true as we are running this at the end of each batchSize - // if (_sent % _batchSize == 0) - { - // Cause test to pause when we have slow - if (!_slowClients.isEmpty() || NO_CLIENTS) - { - - while (!_slowClients.isEmpty()) - { - if (log.isInfoEnabled() && ((_sent / _batchSize % DEBUG_LOG_UPATE_INTERVAL) == 0)) - { - String clients = ""; - Iterator it = _slowClients.keySet().iterator(); - while (it.hasNext()) - { - clients += it.next(); - if (it.hasNext()) - { - clients += ", "; - } - } - - log.info("Pausing for slow clients:" + clients); - } - - if (console.isDebugEnabled() && ((_sent / _batchSize % LOG_UPATE_INTERVAL) == 0)) - { - console.debug(_slowClients.size() + " slow clients."); - } - - sleep(PAUSE_SLEEP); - } - - if (NO_CLIENTS) - { - sleep(NO_CLIENT_SLEEP); - } - - log.debug("Continuing"); - - return true; - } - else - { - if ((_sent / _batchSize % LOG_UPATE_INTERVAL) == 0) - { - console.info("Total Delay :" + _delay + " " - + ((_delayShifting == 0) ? "Stablised" : ("Not Stablised(" + _delayShifting + ")"))); - } - } - - } - - return false; - } - - /** - * Sleep normally takes micro-seconds this allows the use of a nano-second value. - * - * @param delay nanoseconds to sleep for. - */ - private void sleepLong(long delay) - { - sleep(delay / 1000000, (int) (delay % 1000000)); - } - - /** - * Sleep for the specified micro-seconds. - * @param sleep microseconds to sleep for. - */ - private void sleep(long sleep) - { - sleep(sleep, 0); - } - - /** - * Perform the sleep , swallowing any InteruptException. - * - * NOTE: If a sleep request is > 10s then reset only sleep for 5s - * - * @param milli to sleep for - * @param nano sub miliseconds to sleep for - */ - private void sleep(long milli, int nano) - { - try - { - log.debug("Sleep:" + milli + ":" + nano); - if (milli > 10000) - { - - if (_delay == milli) - { - _totalDuration = _totalReceived / _batchSize * EXPECTED_TIME_PER_BATCH; - log.error("Sleeping for more than 10 seconds adjusted to 5s!:" + (milli / 1000) - + "s. Reset _totalDuration:" + _totalDuration); - } - else - { - log.error("Sleeping for more than 10 seconds adjusted to 5s!:" + (milli / 1000) + "s"); - } - - milli = 5000; - } - - Thread.sleep(milli, nano); - } - catch (InterruptedException e) - { - // - } - } - - public void setClient(SustainedClientTestCase client) - { - _client = client; - } - } - -} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCase.java b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCase.java deleted file mode 100644 index 36f9b4eaf1..0000000000 --- a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCase.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.sustained; - -import org.apache.log4j.Logger; - -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.test.framework.distributedtesting.DistributedTestCase; -import org.apache.qpid.test.framework.DropInTest; - -import javax.jms.JMSException; -import javax.jms.Message; - -import java.util.Properties; - -/** - * SustainedTestCase is a {@link org.apache.qpid.test.framework.distributedtesting.DistributedTestCase} that runs the "Perf_SustainedPubSub" test case. This consists of one - * test client sending, and several receiving, and attempts to find the highest rate at which messages can be broadcast - * to the receivers. It is also a {@link DropInTest} to which more test clients may be added during a test run. - * - *

- *
CRC Card
Responsibilities Collaborations - *
- *
- */ -public class SustainedTestCase extends DistributedTestCase implements DropInTest -{ - /** Used for debugging. */ - Logger log = Logger.getLogger(SustainedTestCase.class); - - /** Holds the root name of the topic on which to send the test messages. */ - private static final String SUSTAINED_KEY = "Perf_SustainedPubSub"; - - /** - * Creates a new coordinating test case with the specified name. - * - * @param name The test case name. - */ - public SustainedTestCase(String name) - { - super(name); - } - - /** - * Performs a single test run of the sustained test. - * - * @throws Exception Any exceptions are allowed to fall through and fail the test. - */ - public void testBasicPubSub() throws Exception - { - log.debug("public void testSinglePubSubCycle(): called"); - - Properties testConfig = new Properties(); - testConfig.put("TEST_NAME", "Perf_SustainedPubSub"); - testConfig.put("SUSTAINED_KEY", SUSTAINED_KEY); - testConfig.put("SUSTAINED_NUM_RECEIVERS", Integer.getInteger("numReceives", 2)); - testConfig.put("SUSTAINED_UPDATE_INTERVAL", Integer.getInteger("batchSize", 1000)); - testConfig.put("SUSTAINED_UPDATE_KEY", SUSTAINED_KEY + ".UPDATE"); - testConfig.put("ACKNOWLEDGE_MODE", Integer.getInteger("ackMode", AMQSession.AUTO_ACKNOWLEDGE)); - - log.info("Created Config: " + testConfig.entrySet().toArray()); - - getTestSequencer().sequenceTest(null, null, testConfig); - } - - /** - * Accepts a late joining client into this test case. The client will be enlisted with a control message - * with the 'CONTROL_TYPE' field set to the value 'LATEJOIN'. It should also provide values for the fields: - * - *

- *
CLIENT_NAME A unique name for the new client. - *
CLIENT_PRIVATE_CONTROL_KEY The key for the route on which the client receives its control messages. - *
- * - * @param message The late joiners join message. - * - * @throws JMSException Any JMS Exception are allowed to fall through, indicating that the join failed. - */ - public void lateJoin(Message message) throws JMSException - { - throw new RuntimeException("Not implemented."); - /* - // Extract the joining clients details from its join request message. - TestClientDetails clientDetails = new TestClientDetails(); - clientDetails.clientName = message.getStringProperty("CLIENT_NAME"); - clientDetails.privateControlKey = message.getStringProperty("CLIENT_PRIVATE_CONTROL_KEY"); - - // Register the joining client, but do block for confirmation as cannot do a synchronous receivers during this - // method call, as it may have been called from an 'onMessage' method. - assignReceiverRole(clientDetails, new Properties(), false); - */ - } - - /** - * Should provide a translation from the junit method name of a test to its test case name as known to the test - * clients that will run the test. The purpose of this is to convert the JUnit method name into the correct test - * case name to place into the test invite. For example the method "testP2P" might map onto the interop test case - * name "TC2_BasicP2P". - * - * @param methodName The name of the JUnit test method. - * - * @return The name of the corresponding interop test case. - */ - public String getTestCaseNameForTestMethod(String methodName) - { - return "Perf_SustainedPubSub"; - } -} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java index 79707bafa5..6f2089290a 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java @@ -425,7 +425,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti class SustainedRateAdapter implements MessageListener, Runnable { private SustainedTestClient _client; - private long _batchVariance = Integer.getInteger("batchVariance", 3); //no. batches to allow drifting + private long _batchVariance = 3; //no. batches to allow drifting private long _timeVariance = TEN_MILLI_SEC * 5; // no. nanos between send and report delay (10ms) private volatile long _delay; //in nanos private long _sent; @@ -434,11 +434,9 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti private static final long NO_CLIENT_SLEEP = 1000; // 1s private volatile boolean NO_CLIENTS = true; private int _delayShifting; - private final int REPORTS_WITHOUT_CHANGE = Integer.getInteger("stableReportCount", 5); + private static final int REPORTS_WITHOUT_CHANGE = 5; private boolean _warmedup = false; private static final long EXPECTED_TIME_PER_BATCH = 100000L; - private int _warmUpBatches = Integer.getInteger("warmUpBatches", 10); - SustainedRateAdapter(SustainedTestClient client) { @@ -495,6 +493,8 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti CountDownLatch _warmup = new CountDownLatch(1); + int _warmUpBatches = Integer.getInteger("warmUpBatches", 10); + int _numBatches = 10000; // long[] _timings = new long[_numBatches]; diff --git a/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java new file mode 100644 index 0000000000..0075e45a8c --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.sustained; + +import org.apache.log4j.Logger; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.interop.coordinator.ListeningCoordinatorTest; +import org.apache.qpid.interop.coordinator.TestClientDetails; +import org.apache.qpid.interop.coordinator.testcases.CoordinatingTestCase3BasicPubSub; +import org.apache.qpid.util.ConversationFactory; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Session; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +public class SustainedTestCoordinator extends CoordinatingTestCase3BasicPubSub implements ListeningCoordinatorTest +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(SustainedTestCoordinator.class); + private List _receivers; + private static final String SUSTAINED_KEY = "Perf_SustainedPubSub"; + Map _testProperties; + + /** + * Creates a new coordinating test case with the specified name. + * + * @param name The test case name. + */ + public SustainedTestCoordinator(String name) + { + super(name); + _receivers = new LinkedList(); + } + + /** + * Adds a receiver to this test. + * + * @param receiver The contact details of the sending client in the test. + */ + public void setReceiver(TestClientDetails receiver) + { + _receivers.add(receiver); + } + + + /** + * Performs the a single test run + * + * @throws Exception if there was a problem running the test. + */ + public void testBasicPubSub() throws Exception + { + log.debug("public void testSinglePubSubCycle(): called"); + + Map testConfig = new HashMap(); + testConfig.put("TEST_NAME", "Perf_SustainedPubSub"); + testConfig.put("SUSTAINED_KEY", SUSTAINED_KEY); + testConfig.put("SUSTAINED_NUM_RECEIVERS", Integer.getInteger("numReceives", 2)); + testConfig.put("SUSTAINED_UPDATE_INTERVAL", Integer.getInteger("batchSize", 1000)); + testConfig.put("SUSTAINED_UPDATE_KEY", SUSTAINED_KEY + ".UPDATE"); + testConfig.put("ACKNOWLEDGE_MODE", Integer.getInteger("ackMode", AMQSession.AUTO_ACKNOWLEDGE)); + + log.info("Created Config: " + testConfig.entrySet().toArray()); + + sequenceTest(testConfig); + } + + /** + * Holds a test coordinating conversation with the test clients. This is the basic implementation of the inner loop + * of Use Case 5. It consists of assigning the test roles, begining the test and gathering the test reports from the + * participants. + * + * @param testProperties The test case definition. + * + * @return The test results from the senders and receivers. + * + * @throws javax.jms.JMSException All underlying JMSExceptions are allowed to fall through. + */ + protected Message[] sequenceTest(Map testProperties) throws JMSException + { + log.debug("protected Message[] sequenceTest(Object... testProperties = " + testProperties + "): called"); + + Session session = conversationFactory.getSession(); + Destination senderControlTopic = session.createTopic(sender.privateControlKey); + + ConversationFactory.Conversation senderConversation = conversationFactory.startConversation(); + + // Assign the sender role to the sending test client. + Message assignSender = conversationFactory.getSession().createMessage(); + setPropertiesOnMessage(assignSender, testProperties); + assignSender.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE"); + assignSender.setStringProperty("ROLE", "SENDER"); + assignSender.setStringProperty("CLIENT_NAME", "Sustained_SENDER"); + + senderConversation.send(senderControlTopic, assignSender); + + //Assign and wait for the receiver ckuebts to be ready. + _testProperties = testProperties; + + // Wait for the senders to confirm their roles. + senderConversation.receive(); + + assignReceivers(); + + // Start the test. + Message start = session.createMessage(); + start.setStringProperty("CONTROL_TYPE", "START"); + + senderConversation.send(senderControlTopic, start); + + // Wait for the test sender to return its report. + Message senderReport = senderConversation.receive(); + + try + { + Thread.sleep(500); + } + catch (InterruptedException e) + { + } + + // Ask the receiver for its report. + Message statusRequest = session.createMessage(); + statusRequest.setStringProperty("CONTROL_TYPE", "STATUS_REQUEST"); + + + return new Message[]{senderReport}; + } + + private void assignReceivers() + { + for (TestClientDetails receiver : _receivers) + { + registerReceiver(receiver); + } + } + + private void registerReceiver(TestClientDetails receiver) + { + log.info("registerReceiver called for receiver:" + receiver); + try + { + Session session = conversationFactory.getSession(); + Destination receiverControlTopic = session.createTopic(receiver.privateControlKey); + ConversationFactory.Conversation receiverConversation = conversationFactory.startConversation(); + // Assign the receiver role the receiving client. + Message assignReceiver = session.createMessage(); + setPropertiesOnMessage(assignReceiver, _testProperties); + assignReceiver.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE"); + assignReceiver.setStringProperty("ROLE", "RECEIVER"); + assignReceiver.setStringProperty("CLIENT_NAME", "Sustained_RECEIVER_" + receiver.clientName); + + receiverConversation.send(receiverControlTopic, assignReceiver); + + //Don't wait for receiver to be ready.... we can't this is being done in + // the dispatcher thread, and most likely the acceptance message we + // want is sitting in the Dispatcher._queue waiting its turn for being + // dispatched so if we block here we won't can't get the message. + // So assume consumer is ready for action. + //receiverConversation.receive(); + } + catch (JMSException e) + { + log.warn("Unable to assign receiver:" + receiver + ". Due to:" + e.getMessage()); + } + } + + public void latejoin(Message message) + { + try + { + + TestClientDetails clientDetails = new TestClientDetails(); + clientDetails.clientName = message.getStringProperty("CLIENT_NAME"); + clientDetails.privateControlKey = message.getStringProperty("CLIENT_PRIVATE_CONTROL_KEY"); + + + registerReceiver(clientDetails); + } + catch (JMSException e) + { + //swallow + } + } + + /** + * Should provide a translation from the junit method name of a test to its test case name as defined in the interop + * testing specification. For example the method "testP2P" might map onto the interop test case name + * "TC2_BasicP2P". + * + * @param methodName The name of the JUnit test method. + * + * @return The name of the corresponding interop test case. + */ + public String getTestCaseNameForTestMethod(String methodName) + { + return "Perf_SustainedPubSub"; + } +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestClient.java b/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestClient.java new file mode 100644 index 0000000000..44fc090410 --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestClient.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.sustained; + +import org.apache.log4j.Logger; +import org.apache.qpid.interop.testclient.InteropClientTestCase; +import org.apache.qpid.util.CommandLineParser; + +import javax.jms.JMSException; +import javax.jms.Message; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Properties; + +public class TestClient extends org.apache.qpid.interop.testclient.TestClient +{ + private static Logger log = Logger.getLogger(TestClient.class); + + /** + * Creates a new interop test client, listenting to the specified broker and virtual host, with the specified client + * identifying name. + * + * @param brokerUrl The url of the broker to connect to. + * @param virtualHost The virtual host to conect to. + * @param clientName The client name to use. + */ + public TestClient(String brokerUrl, String virtualHost, String clientName) + { + super(brokerUrl, virtualHost, clientName); + } + + /** + * The entry point for the interop test coordinator. This client accepts the following command line arguments: + * + *

-b The broker URL. Optional.
-h The virtual + * host. Optional.
-n The test client name. Optional.
name=value + * Trailing argument define name/value pairs. Added to system properties. Optional.
+ * + * @param args The command line arguments. + */ + public static void main(String[] args) + { + // Use the command line parser to evaluate the command line. + CommandLineParser commandLine = + new CommandLineParser( + new String[][] + { + {"b", "The broker URL.", "broker", "false"}, + {"h", "The virtual host to use.", "virtual host", "false"}, + {"n", "The test client name.", "name", "false"}, + {"j", "Join this test client to running test.", "join", ""} + }); + + // Capture the command line arguments or display errors and correct usage and then exit. + Properties options = null; + + try + { + options = commandLine.parseCommandLine(args); + } + catch (IllegalArgumentException e) + { + System.out.println(commandLine.getErrors()); + System.out.println(commandLine.getUsage()); + System.exit(1); + } + + // Extract the command line options. + String brokerUrl = options.getProperty("b"); + String virtualHost = options.getProperty("h"); + String clientName = options.getProperty("n"); + String join = options.getProperty("j"); + + // Add all the trailing command line options (name=value pairs) to system properties. Tests may pick up + // overridden values from there. + commandLine.addCommandLineToSysProperties(); + + // Create a test client and start it running. + TestClient client = new TestClient(brokerUrl, virtualHost, (clientName == null) ? CLIENT_NAME : clientName); + + // Use a class path scanner to find all the interop test case implementations. + Collection> testCaseClasses = + new ArrayList>(); + // ClasspathScanner.getMatches(InteropClientTestCase.class, "^TestCase.*", true); + // Hard code the test classes till the classpath scanner is fixed. + Collections.addAll(testCaseClasses, + SustainedTestClient.class); + + + try + { + client.start(testCaseClasses, join); + } + catch (Exception e) + { + log.error("The test client was unable to start.", e); + System.exit(1); + } + } + + protected void start(Collection> testCaseClasses, String join) throws JMSException, ClassNotFoundException + { + super.start(testCaseClasses); + log.debug("private void start(): called"); + + if (join != null && !join.equals("")) + { + Message latejoin = session.createMessage(); + + try + { + Object test = Class.forName(join).newInstance(); + if (test instanceof InteropClientTestCase) + { + currentTestCase = (InteropClientTestCase) test; + } + else + { + throw new RuntimeException("Requested to join class '" + join + "' but this is not a InteropClientTestCase."); + } + + latejoin.setStringProperty("CONTROL_TYPE", "LATEJOIN"); + latejoin.setStringProperty("CLIENT_NAME", clientName); + latejoin.setStringProperty("CLIENT_PRIVATE_CONTROL_KEY", "iop.control." + clientName); + producer.send(session.createTopic("iop.control.test." + currentTestCase.getName()), latejoin); + } + catch (InstantiationException e) + { + log.warn("Unable to request latejoining of test:" + currentTestCase); + } + catch (IllegalAccessException e) + { + log.warn("Unable to request latejoining of test:" + currentTestCase); + } + } + } + +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestCoordinator.java b/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestCoordinator.java new file mode 100644 index 0000000000..7e12fe39fb --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestCoordinator.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.sustained; + +import org.apache.qpid.interop.coordinator.Coordinator; +import org.apache.qpid.interop.coordinator.ListeningTestDecorator; +import org.apache.qpid.interop.coordinator.TestClientDetails; +import org.apache.qpid.util.CommandLineParser; +import org.apache.qpid.util.ConversationFactory; +import org.apache.log4j.Logger; + +import java.util.Properties; +import java.util.Set; + +import junit.framework.TestResult; +import uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator; + +import javax.jms.Connection; + +public class TestCoordinator extends Coordinator +{ + + private static final Logger log = Logger.getLogger(TestCoordinator.class); + + /** + * Creates an interop test coordinator on the specified broker and virtual host. + * + * @param brokerUrl The URL of the broker to connect to. + * @param virtualHost The virtual host to run all tests on. Optional, may be null. + */ + TestCoordinator(String brokerUrl, String virtualHost) + { + super(brokerUrl, virtualHost); + } + + protected WrappedSuiteTestDecorator newTestDecorator(WrappedSuiteTestDecorator targetTest, Set enlistedClients, ConversationFactory conversationFactory, Connection connection) + { + return new ListeningTestDecorator(targetTest, enlistedClients, conversationFactory, connection); + } + + + /** + * The entry point for the interop test coordinator. This client accepts the following command line arguments: + * + *

-b The broker URL. Mandatory.
-h The virtual host. + * Optional.
name=value Trailing argument define name/value pairs. Added to system properties. + * Optional.
+ * + * @param args The command line arguments. + */ + public static void main(String[] args) + { + try + { + // Use the command line parser to evaluate the command line with standard handling behaviour (print errors + // and usage then exist if there are errors). + Properties options = + CommandLineParser.processCommandLine(args, + new CommandLineParser( + new String[][] + { + {"b", "The broker URL.", "broker", "false"}, + {"h", "The virtual host to use.", "virtual host", "false"}, + {"o", "The name of the directory to output test timings to.", "dir", "false"} + })); + + // Extract the command line options. + String brokerUrl = options.getProperty("b"); + String virtualHost = options.getProperty("h"); + String reportDir = options.getProperty("o"); + reportDir = (reportDir == null) ? "." : reportDir; + + + String[] testClassNames = {SustainedTestCoordinator.class.getName()}; + + // Create a coordinator and begin its test procedure. + Coordinator coordinator = new TestCoordinator(brokerUrl, virtualHost); + + coordinator.setReportDir(reportDir); + + TestResult testResult = coordinator.start(testClassNames); + + if (testResult.failureCount() > 0) + { + System.exit(FAILURE_EXIT); + } + else + { + System.exit(SUCCESS_EXIT); + } + } + catch (Exception e) + { + System.err.println(e.getMessage()); + log.error("Top level handler caught execption.", e); + System.exit(EXCEPTION_EXIT); + } + } +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/test/framework/distributedtesting/InteropClientTestCase.java b/java/integrationtests/src/main/java/org/apache/qpid/test/framework/distributedtesting/InteropClientTestCase.java deleted file mode 100644 index 5e6d61a9e0..0000000000 --- a/java/integrationtests/src/main/java/org/apache/qpid/test/framework/distributedtesting/InteropClientTestCase.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.framework.distributedtesting; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import javax.jms.Session; - -/** - * InteropClientTestCase provides an interface that classes implementing test cases from the interop testing spec - * (http://cwiki.apache.org/confluence/display/qpid/Interop+Testing+Specification) should implement. Implementations - * must be Java beans, that is, to provide a default constructor and to implement the {@link #getName} method. - * - *

- *
CRC Card
Responsibilities - *
Supply the name of the test case that this implements. - *
Accept/Reject invites based on test parameters. - *
Adapt to assigned roles. - *
Perform test case actions. - *
Generate test reports. - *
- */ -public interface InteropClientTestCase extends MessageListener -{ - /** Defines the possible test case roles that an interop test case can take on. */ - public enum Roles - { - /** Specifies the sender role. */ - SENDER, - - /** Specifies the receivers role. */ - RECEIVER - } - - /** - * Should provide the name of the test case that this class implements. The exact names are defined in the - * interop testing spec. - * - * @return The name of the test case that this implements. - */ - public String getName(); - - /** - * Determines whether the test invite that matched this test case is acceptable. - * - * @param inviteMessage The invitation to accept or reject. - * - * @return true to accept the invitation, false to reject it. - * - * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through. - */ - public boolean acceptInvite(Message inviteMessage) throws JMSException; - - /** - * Assigns the role to be played by this test case. The test parameters are fully specified in the - * assignment message. When this method return the test case will be ready to execute. - * - * @param role The role to be played; sender or receivers. - * @param assignRoleMessage The role assingment message, contains the full test parameters. - * - * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through. - */ - public void assignRole(Roles role, Message assignRoleMessage) throws JMSException; - - /** - * Performs the test case actions. Returning from here, indicates that the sending role has completed its test. - * - * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through. - */ - public void start() throws JMSException; - - /** - * Gets a report on the actions performed by the test case in its assigned role. - * - * @param session The session to create the report message in. - * - * @return The report message. - * - * @throws JMSException Any JMSExceptions resulting from creating the report are allowed to fall through. - */ - public Message getReport(Session session) throws JMSException; -} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/test/framework/distributedtesting/TestClient.java b/java/integrationtests/src/main/java/org/apache/qpid/test/framework/distributedtesting/TestClient.java deleted file mode 100644 index 12c0d0aa69..0000000000 --- a/java/integrationtests/src/main/java/org/apache/qpid/test/framework/distributedtesting/TestClient.java +++ /dev/null @@ -1,377 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.framework.distributedtesting; - -import org.apache.log4j.Logger; - -import org.apache.qpid.interop.clienttestcases.TestCase1DummyRun; -import org.apache.qpid.interop.clienttestcases.TestCase2BasicP2P; -import org.apache.qpid.interop.clienttestcases.TestCase3BasicPubSub; -import org.apache.qpid.sustained.SustainedClientTestCase; -import org.apache.qpid.test.framework.MessagingTestConfigProperties; -import org.apache.qpid.test.framework.TestUtils; - -import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; -import uk.co.thebadgerset.junit.extensions.util.TestContextProperties; - -import javax.jms.*; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -/** - * Implements a test client as described in the interop testing spec - * (http://cwiki.apache.org/confluence/display/qpid/Interop+Testing+Specification). A test client is an agent that - * reacts to control message sequences send by the test {@link org.apache.qpid.test.framework.distributedtesting.Coordinator}. - * - *

- *
Messages Handled by SustainedTestClient
Message Action - *
Invite(compulsory) Reply with Enlist. - *
Invite(test case) Reply with Enlist if test case available. - *
AssignRole(test case) Reply with Accept Role if matches an enlisted test. Keep test parameters. - *
Start Send test messages defined by test parameters. Send report on messages sent. - *
Status Request Send report on messages received. - *
Terminate Terminate the test client. - *
- * - *

- *
CRC Card
Responsibilities Collaborations - *
Handle all incoming control messages. {@link InteropClientTestCase} - *
Configure and look up test cases by name. {@link InteropClientTestCase} - *
- */ -public class TestClient implements MessageListener -{ - /** Used for debugging. */ - private static final Logger log = Logger.getLogger(TestClient.class); - - /** Used for reporting to the console. */ - private static final Logger console = Logger.getLogger("CONSOLE"); - - /** Holds the default identifying name of the test client. */ - public static final String CLIENT_NAME = "java"; - - /** Holds the URL of the broker to run the tests on. */ - public static String brokerUrl; - - /** Holds the virtual host to run the tests on. If null, then the default virtual host is used. */ - public static String virtualHost; - - /** - * Holds the test context properties that provides the default test parameters, plus command line overrides. - * This is initialized with the default test parameters, to which command line overrides may be applied. - */ - public static ParsedProperties testContextProperties = - TestContextProperties.getInstance(MessagingTestConfigProperties.defaults); - - /** Holds all the test cases loaded from the classpath. */ - Map testCases = new HashMap(); - - /** Holds the test case currently being run by this client. */ - protected InteropClientTestCase currentTestCase; - - /** Holds the connection to the broker that the test is being coordinated on. */ - protected Connection connection; - - /** Holds the message producer to hold the test coordination over. */ - protected MessageProducer producer; - - /** Holds the JMS session for the test coordination. */ - protected Session session; - - /** Holds the name of this client, with a default value. */ - protected String clientName = CLIENT_NAME; - - /** This flag indicates that the test client should attempt to join the currently running test case on start up. */ - protected boolean join; - - /** - * Creates a new interop test client, listenting to the specified broker and virtual host, with the specified client - * identifying name. - * - * @param brokerUrl The url of the broker to connect to. - * @param virtualHost The virtual host to conect to. - * @param clientName The client name to use. - */ - public TestClient(String brokerUrl, String virtualHost, String clientName, boolean join) - { - log.debug("public SustainedTestClient(String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost - + ", String clientName = " + clientName + "): called"); - - // Retain the connection parameters. - this.brokerUrl = brokerUrl; - this.virtualHost = virtualHost; - this.clientName = clientName; - this.join = join; - } - - /** - * The entry point for the interop test coordinator. This client accepts the following command line arguments: - * - *

- *
-b The broker URL. Optional. - *
-h The virtual host. Optional. - *
-n The test client name. Optional. - *
name=value Trailing argument define name/value pairs. Added to system properties. Optional. - *
- * - * @param args The command line arguments. - */ - public static void main(String[] args) - { - // Override the default broker url to be localhost:5672. - testContextProperties.setProperty(MessagingTestConfigProperties.BROKER_PROPNAME, "tcp://localhost:5672"); - - // Use the command line parser to evaluate the command line with standard handling behaviour (print errors - // and usage then exist if there are errors). - // Any options and trailing name=value pairs are also injected into the test context properties object, - // to override any defaults that may have been set up. - ParsedProperties options = - new ParsedProperties(uk.co.thebadgerset.junit.extensions.util.CommandLineParser.processCommandLine(args, - new uk.co.thebadgerset.junit.extensions.util.CommandLineParser( - new String[][] - { - { "b", "The broker URL.", "broker", "false" }, - { "h", "The virtual host to use.", "virtual host", "false" }, - { "o", "The name of the directory to output test timings to.", "dir", "false" }, - { "n", "The name of the test client.", "name", "false" }, - { "j", "Join this test client to running test.", "false" } - }), testContextProperties)); - - // Extract the command line options. - String brokerUrl = options.getProperty("b"); - String virtualHost = options.getProperty("h"); - String clientName = options.getProperty("n"); - boolean join = options.getPropertyAsBoolean("j"); - - // Create a test client and start it running. - TestClient client = new TestClient(brokerUrl, virtualHost, (clientName == null) ? CLIENT_NAME : clientName, join); - - // Use a class path scanner to find all the interop test case implementations. - // Hard code the test classes till the classpath scanner is fixed. - Collection> testCaseClasses = - new ArrayList>(); - // ClasspathScanner.getMatches(InteropClientTestCase.class, "^TestCase.*", true); - Collections.addAll(testCaseClasses, TestCase1DummyRun.class, TestCase2BasicP2P.class, TestCase3BasicPubSub.class, - SustainedClientTestCase.class); - - try - { - client.start(testCaseClasses); - } - catch (Exception e) - { - log.error("The test client was unable to start.", e); - System.exit(1); - } - } - - /** - * Starts the interop test client running. This causes it to start listening for incoming test invites. - * - * @param testCaseClasses The classes of the available test cases. The test case names from these are used to - * matchin incoming test invites against. - * - * @throws JMSException Any underlying JMSExceptions are allowed to fall through. - */ - protected void start(Collection> testCaseClasses) throws JMSException - { - log.debug("private void start(): called"); - - // Create all the test case implementations and index them by the test names. - for (Class nextClass : testCaseClasses) - { - try - { - InteropClientTestCase testCase = nextClass.newInstance(); - testCases.put(testCase.getName(), testCase); - } - catch (InstantiationException e) - { - log.warn("Could not instantiate test case class: " + nextClass.getName(), e); - // Ignored. - } - catch (IllegalAccessException e) - { - log.warn("Could not instantiate test case class due to illegal access: " + nextClass.getName(), e); - // Ignored. - } - } - - // Open a connection to communicate with the coordinator on. - connection = TestUtils.createConnection(testContextProperties); - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - // Set this up to listen for control messages. - Topic privateControlTopic = session.createTopic("iop.control." + clientName); - MessageConsumer consumer = session.createConsumer(privateControlTopic); - consumer.setMessageListener(this); - - Topic controlTopic = session.createTopic("iop.control"); - MessageConsumer consumer2 = session.createConsumer(controlTopic); - consumer2.setMessageListener(this); - - // Create a producer to send replies with. - producer = session.createProducer(null); - - // If the join flag was set, then broadcast a join message to notify the coordinator that a new test client - // is available to join the current test case, if it supports it. This message may be ignored, or it may result - // in this test client receiving a test invite. - if (join) - { - Message joinMessage = session.createMessage(); - - joinMessage.setStringProperty("CONTROL_TYPE", "JOIN"); - joinMessage.setStringProperty("CLIENT_NAME", clientName); - joinMessage.setStringProperty("CLIENT_PRIVATE_CONTROL_KEY", "iop.control." + clientName); - producer.send(controlTopic, joinMessage); - } - - // Start listening for incoming control messages. - connection.start(); - } - - /** - * Handles all incoming control messages. - * - * @param message The incoming message. - */ - public void onMessage(Message message) - { - log.debug("public void onMessage(Message message = " + message + "): called"); - - try - { - String controlType = message.getStringProperty("CONTROL_TYPE"); - String testName = message.getStringProperty("TEST_NAME"); - - log.info("onMessage(Message message = " + message + "): for '" + controlType + "' to '" + testName + "'"); - - // Check if the message is a test invite. - if ("INVITE".equals(controlType)) - { - // Flag used to indicate that an enlist should be sent. Only enlist to compulsory invites or invites - // for which test cases exist. - boolean enlist = false; - - if (testName != null) - { - log.debug("Got an invite to test: " + testName); - - // Check if the requested test case is available. - InteropClientTestCase testCase = testCases.get(testName); - - if (testCase != null) - { - // Make the requested test case the current test case. - currentTestCase = testCase; - enlist = true; - } - else - { - log.debug("Received an invite to the test '" + testName + "' but this test is not known."); - } - } - else - { - log.debug("Got a compulsory invite."); - - enlist = true; - } - - if (enlist) - { - // Reply with the client name in an Enlist message. - Message enlistMessage = session.createMessage(); - enlistMessage.setStringProperty("CONTROL_TYPE", "ENLIST"); - enlistMessage.setStringProperty("CLIENT_NAME", clientName); - enlistMessage.setStringProperty("CLIENT_PRIVATE_CONTROL_KEY", "iop.control." + clientName); - enlistMessage.setJMSCorrelationID(message.getJMSCorrelationID()); - - log.info("Sending Message '" + enlistMessage + "'. to " + message.getJMSReplyTo()); - - producer.send(message.getJMSReplyTo(), enlistMessage); - } - } - else if ("ASSIGN_ROLE".equals(controlType)) - { - // Assign the role to the current test case. - String roleName = message.getStringProperty("ROLE"); - - log.debug("Got a role assignment to role: " + roleName); - - InteropClientTestCase.Roles role = Enum.valueOf(InteropClientTestCase.Roles.class, roleName); - - currentTestCase.assignRole(role, message); - - // Reply by accepting the role in an Accept Role message. - Message acceptRoleMessage = session.createMessage(); - acceptRoleMessage.setStringProperty("CONTROL_TYPE", "ACCEPT_ROLE"); - acceptRoleMessage.setJMSCorrelationID(message.getJMSCorrelationID()); - - producer.send(message.getJMSReplyTo(), acceptRoleMessage); - } - else if ("START".equals(controlType) || "STATUS_REQUEST".equals(controlType)) - { - if ("START".equals(controlType)) - { - log.debug("Got a start notification."); - - // Start the current test case. - currentTestCase.start(); - } - else - { - log.debug("Got a status request."); - } - - // Generate the report from the test case and reply with it as a Report message. - Message reportMessage = currentTestCase.getReport(session); - reportMessage.setStringProperty("CONTROL_TYPE", "REPORT"); - reportMessage.setJMSCorrelationID(message.getJMSCorrelationID()); - - producer.send(message.getJMSReplyTo(), reportMessage); - } - else if ("TERMINATE".equals(controlType)) - { - log.info("Received termination instruction from coordinator."); - - // Is a cleaner shutdown needed? - connection.close(); - System.exit(0); - } - else - { - // Log a warning about this but otherwise ignore it. - log.warn("Got an unknown control message, controlType = " + controlType + ", message = " + message); - } - } - catch (JMSException e) - { - // Log a warning about this, but otherwise ignore it. - log.warn("A JMSException occurred whilst handling a message."); - log.debug("Got JMSException whilst handling message: " + message, e); - } - } -} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/util/ClasspathScanner.java b/java/integrationtests/src/main/java/org/apache/qpid/util/ClasspathScanner.java new file mode 100644 index 0000000000..bad49060ca --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/util/ClasspathScanner.java @@ -0,0 +1,234 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.util; + +import java.io.File; +import java.util.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.log4j.Logger; + +/** + * An ClasspathScanner scans the classpath for classes that implement an interface or extend a base class and have names + * that match a regular expression. + * + *

In order to test whether a class implements an interface or extends a class, the class must be loaded (unless + * the class files were to be scanned directly). Using this collector can cause problems when it scans the classpath, + * because loading classes will initialize their statics, which in turn may cause undesired side effects. For this + * reason, the collector should always be used with a regular expression, through which the class file names are + * filtered, and only those that pass this filter will be tested. For example, if you define tests in classes that + * end with the keyword "Test" then use the regular expression "Test$" to match this. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Find all classes matching type and name pattern on the classpath. + *
+ * + * @todo Add logic to scan jars as well as directories. + */ +public class ClasspathScanner +{ + private static final Logger log = Logger.getLogger(ClasspathScanner.class); + + /** + * Scans the classpath and returns all classes that extend a specified class and match a specified name. + * There is an flag that can be used to indicate that only Java Beans will be matched (that is, only those classes + * that have a default constructor). + * + * @param matchingClass The class or interface to match. + * @param matchingRegexp The regular expression to match against the class name. + * @param beanOnly Flag to indicate that onyl classes with default constructors should be matched. + * + * @return All the classes that match this collector. + */ + public static Collection> getMatches(Class matchingClass, String matchingRegexp, + boolean beanOnly) + { + log.debug("public static Collection> getMatches(Class matchingClass = " + matchingClass + + ", String matchingRegexp = " + matchingRegexp + ", boolean beanOnly = " + beanOnly + "): called"); + + // Build a compiled regular expression from the pattern to match. + Pattern matchPattern = Pattern.compile(matchingRegexp); + + String classPath = System.getProperty("java.class.path"); + Map> result = new HashMap>(); + + log.debug("classPath = " + classPath); + + // Find matching classes starting from all roots in the classpath. + for (String path : splitClassPath(classPath)) + { + gatherFiles(new File(path), "", result, matchPattern, matchingClass); + } + + return result.values(); + } + + /** + * Finds all matching classes rooted at a given location in the file system. If location is a directory it + * is recursively examined. + * + * @param classRoot The root of the current point in the file system being examined. + * @param classFileName The name of the current file or directory to examine. + * @param result The accumulated mapping from class names to classes that match the scan. + * + * @todo Recursion ok as file system depth is not likely to exhaust the stack. Might be better to replace with + * iteration. + */ + private static void gatherFiles(File classRoot, String classFileName, Map> result, + Pattern matchPattern, Class matchClass) + { + log.debug("private static void gatherFiles(File classRoot = " + classRoot + ", String classFileName = " + + classFileName + ", Map> result, Pattern matchPattern = " + matchPattern + + ", Class matchClass = " + matchClass + "): called"); + + File thisRoot = new File(classRoot, classFileName); + + // If the current location is a file, check if it is a matching class. + if (thisRoot.isFile()) + { + // Check that the file has a matching name. + if (matchesName(thisRoot.getName(), matchPattern)) + { + String className = classNameFromFile(thisRoot.getName()); + + // Check that the class has matching type. + try + { + Class candidateClass = Class.forName(className); + + Class matchedClass = matchesClass(candidateClass, matchClass); + + if (matchedClass != null) + { + result.put(className, matchedClass); + } + } + catch (ClassNotFoundException e) + { + // Ignore this. The matching class could not be loaded. + log.debug("Got ClassNotFoundException, ignoring.", e); + } + } + + return; + } + // Otherwise the current location is a directory, so examine all of its contents. + else + { + String[] contents = thisRoot.list(); + + if (contents != null) + { + for (String content : contents) + { + gatherFiles(classRoot, classFileName + File.separatorChar + content, result, matchPattern, matchClass); + } + } + } + } + + /** + * Checks if the specified class file name corresponds to a class with name matching the specified regular expression. + * + * @param classFileName The class file name. + * @param matchPattern The regular expression pattern to match. + * + * @return true if the class name matches, false otherwise. + */ + private static boolean matchesName(String classFileName, Pattern matchPattern) + { + String className = classNameFromFile(classFileName); + Matcher matcher = matchPattern.matcher(className); + + return matcher.matches(); + } + + /** + * Checks if the specified class to compare extends the base class being scanned for. + * + * @param matchingClass The base class to match against. + * @param toMatch The class to match against the base class. + * + * @return The class to check, cast as an instance of the class to match if the class extends the base class, or + * null otherwise. + */ + private static Class matchesClass(Class matchingClass, Class toMatch) + { + try + { + return matchingClass.asSubclass(toMatch); + } + catch (ClassCastException e) + { + return null; + } + } + + /** + * Takes a classpath (which is a series of paths) and splits it into its component paths. + * + * @param classPath The classpath to split. + * + * @return A list of the component paths that make up the class path. + */ + private static List splitClassPath(String classPath) + { + List result = new LinkedList(); + String separator = System.getProperty("path.separator"); + StringTokenizer tokenizer = new StringTokenizer(classPath, separator); + + while (tokenizer.hasMoreTokens()) + { + result.add(tokenizer.nextToken()); + } + + return result; + } + + /** + * Translates from the filename of a class to its fully qualified classname. Files are named using forward slash + * seperators and end in ".class", whereas fully qualified class names use "." sperators and no ".class" ending. + * + * @param classFileName The filename of the class to translate to a class name. + * + * @return The fully qualified class name. + */ + private static String classNameFromFile(String classFileName) + { + log.debug("private static String classNameFromFile(String classFileName = " + classFileName + "): called"); + + // Remove the .class ending. + String s = classFileName.substring(0, classFileName.length() - ".class".length()); + + // Turn / seperators in . seperators. + String s2 = s.replace(File.separatorChar, '.'); + + // Knock off any leading . caused by a leading /. + if (s2.startsWith(".")) + { + return s2.substring(1); + } + + return s2; + } +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java b/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java new file mode 100644 index 0000000000..0090bec3d0 --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java @@ -0,0 +1,479 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.util; + +import org.apache.log4j.Logger; + +import javax.jms.*; + +import java.util.*; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +/** + * A conversation helper, uses a message correlation id pattern to match up sent and received messages as a conversation + * over JMS messaging. Incoming message traffic is divided up by correlation id. Each id has a queue (behaviour dependant + * on the queue implementation). Clients of this de-multiplexer can wait on messages, defined by message correlation ids. + * + *

One use of this is as a conversation synchronizer where multiple threads are carrying out conversations over a + * multiplexed messaging route. This can be usefull, as JMS sessions are not multi-threaded. Setting up the conversation + * with synchronous queues will allow these threads to be written in a synchronous style, but with their execution order + * governed by the asynchronous message flow. For example, something like the following code could run a multi-threaded + * conversation (the conversation methods can be called many times in parallel): + * + *

+ * class Initiator
+ * {
+ * ConversationHelper conversation = new ConversationHelper(connection, null,
+ *                                                          java.util.concurrent.LinkedBlockingQueue.class);
+ *
+ * initiateConversation()
+ * {
+ *  try {
+ *   // Exchange greetings.
+ *   conversation.send(sendDestination, conversation.getSession().createTextMessage("Hello."));
+ *   Message greeting = conversation.receive();
+ *
+ *   // Exchange goodbyes.
+ *   conversation.send(conversation.getSession().createTextMessage("Goodbye."));
+ *   Message goodbye = conversation.receive();
+ *  } finally {
+ *   conversation.end();
+ *  }
+ * }
+ * }
+ *
+ * class Responder
+ * {
+ * ConversationHelper conversation = new ConversationHelper(connection, receiveDestination,
+ *                                                          java.util.concurrent.LinkedBlockingQueue.class);
+ *
+ * respondToConversation()
+ * {
+ *   try {
+ *   // Exchange greetings.
+ *   Message greeting = conversation.receive();
+ *   conversation.send(conversation.getSession().createTextMessage("Hello."));
+ *
+ *   // Exchange goodbyes.
+ *   Message goodbye = conversation.receive();
+ *   conversation.send(conversation.getSession().createTextMessage("Goodbye."));
+ *  } finally {
+ *   conversation.end();
+ *  }
+ * }
+ * }
+ * 
+ * + *

Conversation correlation id's are generated on a per thread basis. + * + *

The same session is shared amongst all conversations. Calls to send are therefore synchronized because JMS + * sessions are not multi-threaded. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Associate messages to an ongoing conversation using correlation ids. + *
Auto manage sessions for conversations. + *
Store messages not in a conversation in dead letter box. + *
+ */ +public class ConversationFactory +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(ConversationFactory.class); + + /** Holds a map from correlation id's to queues. */ + private Map> idsToQueues = new HashMap>(); + + /** Holds the connection over which the conversation is conducted. */ + private Connection connection; + + /** Holds the session over which the conversation is conduxted. */ + private Session session; + + /** The message consumer for incoming messages. */ + MessageConsumer consumer; + + /** The message producer for outgoing messages. */ + MessageProducer producer; + + /** The well-known or temporary destination to receive replies on. */ + Destination receiveDestination; + + /** Holds the queue implementation class for the reply queue. */ + Class queueClass; + + /** Used to hold any replies that are received outside of the context of a conversation. */ + BlockingQueue deadLetterBox = new LinkedBlockingQueue(); + + /* Used to hold conversation state on a per thread basis. */ + /* + ThreadLocal threadLocals = + new ThreadLocal() + { + protected Conversation initialValue() + { + Conversation settings = new Conversation(); + settings.conversationId = conversationIdGenerator.getAndIncrement(); + + return settings; + } + }; + */ + + /** Generates new coversation id's as needed. */ + AtomicLong conversationIdGenerator = new AtomicLong(); + + /** + * Creates a conversation helper on the specified connection with the default sending destination, and listening + * to the specified receiving destination. + * + * @param connection The connection to build the conversation helper on. + * @param receiveDestination The destination to listen to for incoming messages. This may be null to use a temporary + * queue. + * @param queueClass The queue implementation class. + * + * @throws JMSException All underlying JMSExceptions are allowed to fall through. + */ + public ConversationFactory(Connection connection, Destination receiveDestination, + Class queueClass) throws JMSException + { + log.debug("public ConversationFactory(Connection connection, Destination receiveDestination = " + receiveDestination + + ", Class queueClass = " + queueClass + "): called"); + + this.connection = connection; + this.queueClass = queueClass; + + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Check if a well-known receive destination has been provided, or use a temporary queue if not. + this.receiveDestination = (receiveDestination != null) ? receiveDestination : session.createTemporaryQueue(); + + consumer = session.createConsumer(receiveDestination); + producer = session.createProducer(null); + + consumer.setMessageListener(new Receiver()); + } + + /** + * Creates a new conversation context. + * + * @return A new conversation context. + */ + public Conversation startConversation() + { + log.debug("public Conversation startConversation(): called"); + + Conversation conversation = new Conversation(); + conversation.conversationId = conversationIdGenerator.getAndIncrement(); + + return conversation; + } + + /** + * Ensures that the reply queue for a conversation exists. + * + * @param conversationId The conversation correlation id. + */ + private void initQueueForId(long conversationId) + { + if (!idsToQueues.containsKey(conversationId)) + { + idsToQueues.put(conversationId, ReflectionUtils.newInstance(queueClass)); + } + } + + /** + * Clears the dead letter box, returning all messages that were in it. + * + * @return All messages in the dead letter box. + */ + public Collection emptyDeadLetterBox() + { + log.debug("public Collection emptyDeadLetterBox(): called"); + + Collection result = new ArrayList(); + deadLetterBox.drainTo(result); + + return result; + } + + /** + * Gets the session over which the conversation is conducted. + * + * @return The session over which the conversation is conducted. + */ + public Session getSession() + { + // Conversation settings = threadLocals.get(); + + return session; + } + + /** + * Used to hold a conversation context. This consists of a correlating id for the conversation, and a reply + * destination automatically updated to the last received reply-to destination. + */ + public class Conversation + { + /** Holds the correlation id for the context. */ + long conversationId; + + /** + * Holds the send destination for the context. This will automatically be updated to the most recently received + * reply-to destination. + */ + Destination sendDestination; + + /** + * Sends a message to the default sending location. The correlation id of the message will be assigned by this + * method, overriding any previously set value. + * + * @param sendDestination The destination to send to. This may be null to use the last received reply-to + * destination. + * @param message The message to send. + * + * @throws JMSException All undelying JMSExceptions are allowed to fall through. This will also be thrown if no + * send destination is specified and there is no most recent reply-to destination available + * to use. + */ + public void send(Destination sendDestination, Message message) throws JMSException + { + log.debug("public void send(Destination sendDestination = " + sendDestination + ", Message message = " + message + + "): called"); + + // Conversation settings = threadLocals.get(); + // long conversationId = conversationId; + message.setJMSCorrelationID(Long.toString(conversationId)); + message.setJMSReplyTo(receiveDestination); + + // Ensure that the reply queue for this conversation exists. + initQueueForId(conversationId); + + // Check if an overriding send to destination has been set or use the last reply-to if not. + Destination sendTo = null; + + if (sendDestination != null) + { + sendTo = sendDestination; + } + else if (sendDestination != null) + { + sendTo = sendDestination; + } + else + { + throw new JMSException("The send destination was specified, and no most recent reply-to available to use."); + } + + // Send the message. + synchronized (this) + { + producer.send(sendTo, message); + } + } + + /** + * Gets the next message in an ongoing conversation. This method may block until such a message is received. + * + * @return The next incoming message in the conversation. + * + * @throws JMSException All undelying JMSExceptions are allowed to fall through. Thrown if the received message + * did not have its reply-to destination set up. + */ + public Message receive() throws JMSException + { + log.debug("public Message receive(): called"); + + // Conversation settings = threadLocals.get(); + // long conversationId = settings.conversationId; + + // Ensure that the reply queue for this conversation exists. + initQueueForId(conversationId); + + BlockingQueue queue = idsToQueues.get(conversationId); + + try + { + Message result = queue.take(); + + // Keep the reply-to destination to send replies to. + sendDestination = result.getJMSReplyTo(); + + return result; + } + catch (InterruptedException e) + { + return null; + } + } + + /** + * Gets many messages in an ongoing conversation. If a limit is specified, then once that many messages are + * received they will be returned. If a timeout is specified, then all messages up to the limit, received within + * that timespan will be returned. At least one of the message count or timeout should be set to a value of + * 1 or greater. + * + * @param num The number of messages to receive, or all if this is less than 1. + * @param timeout The timeout in milliseconds to receive the messages in, or forever if this is less than 1. + * + * @return All messages received within the count limit and the timeout. + * + * @throws JMSException All undelying JMSExceptions are allowed to fall through. + */ + public Collection receiveAll(int num, long timeout) throws JMSException + { + log.debug("public Collection receiveAll(int num = " + num + ", long timeout = " + timeout + + "): called"); + + // Check that a timeout or message count was set. + if ((num < 1) && (timeout < 1)) + { + throw new IllegalArgumentException("At least one of message count (num) or timeout must be set."); + } + + // Ensure that the reply queue for this conversation exists. + initQueueForId(conversationId); + BlockingQueue queue = idsToQueues.get(conversationId); + + // Used to collect the received messages in. + Collection result = new ArrayList(); + + // Used to indicate when the timeout or message count has expired. + boolean receiveMore = true; + + int messageCount = 0; + + // Receive messages until the timeout or message count expires. + do + { + try + { + Message next = null; + + // Try to receive the message with a timeout if one has been set. + if (timeout > 0) + { + next = queue.poll(timeout, TimeUnit.MILLISECONDS); + + // Check if the timeout expired, and stop receiving if so. + if (next == null) + { + receiveMore = false; + } + } + // Receive the message without a timeout. + else + { + next = queue.take(); + } + + // Increment the message count if a message was received. + messageCount += (next != null) ? 1 : 0; + + // Check if all the requested messages were received, and stop receiving if so. + if ((num > 0) && (messageCount >= num)) + { + receiveMore = false; + } + + // Keep the reply-to destination to send replies to. + sendDestination = (next != null) ? next.getJMSReplyTo() : sendDestination; + + if (next != null) + { + result.add(next); + } + } + catch (InterruptedException e) + { + // Restore the threads interrupted status. + Thread.currentThread().interrupt(); + + // Stop receiving but return the messages received so far. + receiveMore = false; + } + } + while (receiveMore); + + return result; + } + + /** + * Completes the conversation. Any correlation id's pertaining to the conversation are no longer valid, and any + * incoming messages using them will go to the dead letter box. + */ + public void end() + { + log.debug("public void end(): called"); + + // Ensure that the thread local for the current thread is cleaned up. + // Conversation settings = threadLocals.get(); + // long conversationId = settings.conversationId; + // threadLocals.remove(); + + // Ensure that its queue is removed from the queue map. + BlockingQueue queue = idsToQueues.remove(conversationId); + + // Move any outstanding messages on the threads conversation id into the dead letter box. + queue.drainTo(deadLetterBox); + } + } + + /** + * Implements the message listener for this conversation handler. + */ + protected class Receiver implements MessageListener + { + /** + * Handles all incoming messages in the ongoing conversations. These messages are split up by correaltion id + * and placed into queues. + * + * @param message The incoming message. + */ + public void onMessage(Message message) + { + log.debug("public void onMessage(Message message = " + message + "): called"); + + try + { + Long conversationId = Long.parseLong(message.getJMSCorrelationID()); + + // Find the converstaion queue to place the message on. If there is no conversation for the message id, + // the the dead letter box queue is used. + BlockingQueue queue = idsToQueues.get(conversationId); + queue = (queue == null) ? deadLetterBox : queue; + + queue.put(message); + } + catch (JMSException e) + { + throw new RuntimeException(e); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + } +} diff --git a/java/integrationtests/src/resources/org/apache/qpid/interop/connection.properties b/java/integrationtests/src/resources/org/apache/qpid/interop/connection.properties new file mode 100644 index 0000000000..a5fb611dfa --- /dev/null +++ b/java/integrationtests/src/resources/org/apache/qpid/interop/connection.properties @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory +connectionfactory.broker = amqp://guest:guest@clientid/?brokerlist='tcp://localhost:5672' -- cgit v1.2.1