From a249dbcb7d2d3211ef3f5a32b49a867ab5e9407c Mon Sep 17 00:00:00 2001 From: Robert Greig Date: Thu, 15 Mar 2007 10:12:57 +0000 Subject: Commit of interop test stuff prior to M2 branch. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@518559 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/util/PropertiesUtils.java | 199 +++++++++++++++ .../java/org/apache/qpid/util/ReflectionUtils.java | 228 +++++++++++++++++ .../apache/qpid/util/ReflectionUtilsException.java | 44 ++++ .../interop/coordinator/CoordinatingTestCase.java | 7 +- .../interop/coordinator/TestClientDetails.java | 2 + .../apache/qpid/interop/testclient/TestClient.java | 269 +++++++++++++++------ .../org/apache/qpid/util/ClasspathScanner.java | 121 +++++++-- .../org/apache/qpid/util/ConversationHelper.java | 144 +++++++++-- 8 files changed, 898 insertions(+), 116 deletions(-) create mode 100644 java/common/src/main/java/org/apache/qpid/util/PropertiesUtils.java create mode 100644 java/common/src/main/java/org/apache/qpid/util/ReflectionUtils.java create mode 100644 java/common/src/main/java/org/apache/qpid/util/ReflectionUtilsException.java (limited to 'java') diff --git a/java/common/src/main/java/org/apache/qpid/util/PropertiesUtils.java b/java/common/src/main/java/org/apache/qpid/util/PropertiesUtils.java new file mode 100644 index 0000000000..aa21841256 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/util/PropertiesUtils.java @@ -0,0 +1,199 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.util; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.util.Iterator; +import java.util.Properties; + +import org.apache.log4j.Logger; + +/** + * PropertiesHelper defines some static methods which are useful when working with properties + * files. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Read properties from an input stream + *
Read properties from a file + *
Read properties from a URL + *
Read properties given a path to a file + *
Trim any whitespace from property values + *
+ */ +public class PropertiesUtils +{ + /** Used for logging. */ + private static final Logger log = Logger.getLogger(PropertiesUtils.class); + + /** + * Get properties from an input stream. + * + * @param is The input stream. + * + * @return The properties loaded from the input stream. + * + * @throws IOException If the is an I/O error reading from the stream. + */ + public static Properties getProperties(InputStream is) throws IOException + { + log.debug("getProperties(InputStream): called"); + + // Create properties object laoded from input stream + Properties properties = new Properties(); + + properties.load(is); + + return properties; + } + + /** + * Get properties from a file. + * + * @param file The file. + * + * @return The properties loaded from the file. + * + * @throws IOException If there is an I/O error reading from the file. + */ + public static Properties getProperties(File file) throws IOException + { + log.debug("getProperties(File): called"); + + // Open the file as an input stream + InputStream is = new FileInputStream(file); + + // Create properties object loaded from the stream + Properties properties = getProperties(is); + + // Close the file + is.close(); + + return properties; + } + + /** + * Get properties from a url. + * + * @param url The URL. + * + * @return The properties loaded from the url. + * + * @throws IOException If there is an I/O error reading from the URL. + */ + public static Properties getProperties(URL url) throws IOException + { + log.debug("getProperties(URL): called"); + + // Open the URL as an input stream + InputStream is = url.openStream(); + + // Create properties object loaded from the stream + Properties properties = getProperties(is); + + // Close the url + is.close(); + + return properties; + } + + /** + * Get properties from a path name. The path name may refer to either a file or a URL. + * + * @param pathname The path name. + * + * @return The properties loaded from the file or URL. + * + * @throws IOException If there is an I/O error reading from the URL or file named by the path. + */ + public static Properties getProperties(String pathname) throws IOException + { + log.debug("getProperties(String): called"); + + // Check that the path is not null + if (pathname == null) + { + return null; + } + + // Check if the path is a URL + if (isURL(pathname)) + { + // The path is a URL + return getProperties(new URL(pathname)); + } + else + { + // Assume the path is a file name + return getProperties(new File(pathname)); + } + } + + /** + * Trims whitespace from property values. This method returns a new set of properties + * the same as the properties specified as an argument but with any white space removed by + * the {@link java.lang.String#trim} method. + * + * @param properties The properties to trim whitespace from. + * + * @return The white space trimmed properties. + */ + public static Properties trim(Properties properties) + { + Properties trimmedProperties = new Properties(); + + // Loop over all the properties + for (Iterator i = properties.keySet().iterator(); i.hasNext();) + { + String next = (String) i.next(); + String nextValue = properties.getProperty(next); + + // Trim the value if it is not null + if (nextValue != null) + { + nextValue.trim(); + } + + // Store the trimmed value in the trimmed properties + trimmedProperties.setProperty(next, nextValue); + } + + return trimmedProperties; + } + + /** + * Helper method. Guesses whether a string is a URL or not. A String is considered to be a url if it begins with + * http:, ftp:, or uucp:. + * + * @param name The string to test for being a URL. + * + * @return True if the string is a URL and false if not. + */ + private static boolean isURL(String name) + { + return (name.toLowerCase().startsWith("http:") || name.toLowerCase().startsWith("ftp:") + || name.toLowerCase().startsWith("uucp:")); + } +} diff --git a/java/common/src/main/java/org/apache/qpid/util/ReflectionUtils.java b/java/common/src/main/java/org/apache/qpid/util/ReflectionUtils.java new file mode 100644 index 0000000000..724f504474 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/util/ReflectionUtils.java @@ -0,0 +1,228 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.util; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +/** + * Provides helper methods for operating on classes and methods using reflection. Reflection methods tend to return + * a lot of checked exception so writing code to use them can be tedious and harder to read, especially when such errors + * are not expected to occur. This class always works with {@link ReflectionUtilsException}, which is a runtime exception, + * to wrap the checked exceptions raised by the standard Java reflection methods. Code using it does not normally + * expect these errors to occur, usually does not have a recovery mechanism for them when they do, but is cleaner, + * quicker to write and easier to read in the majority of cases. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Look up Classes by name. + *
Instantiate Classes by no-arg constructor. + *
+ */ +public class ReflectionUtils +{ + /** + * Gets the Class object for a named class. + * + * @param className The class to get the Class object for. + * + * @return The Class object for the named class. + */ + public static Class forName(String className) + { + try + { + return Class.forName(className); + } + catch (ClassNotFoundException e) + { + throw new ReflectionUtilsException("ClassNotFoundException whilst finding class.", e); + } + } + + /** + * Creates an instance of a Class, instantiated through its no-args constructor. + * + * @param cls The Class to instantiate. + * @param The Class type. + * + * @return An instance of the class. + */ + public static T newInstance(Class cls) + { + try + { + return cls.newInstance(); + } + catch (InstantiationException e) + { + throw new ReflectionUtilsException("InstantiationException whilst instantiating class.", e); + } + catch (IllegalAccessException e) + { + throw new ReflectionUtilsException("IllegalAccessException whilst instantiating class.", e); + } + } + + /** + * Calls a named method on an object with a specified set of parameters, any Java access modifier are overridden. + * + * @param o The object to call. + * @param method The method name to call. + * @param params The parameters to pass. + * @param paramClasses The argument types. + * + * @return The return value from the method call. + */ + public static Object callMethodOverridingIllegalAccess(Object o, String method, Object[] params, Class[] paramClasses) + { + // Get the objects class. + Class cls = o.getClass(); + + // Get the classes of the parameters. + /*Class[] paramClasses = new Class[params.length]; + + for (int i = 0; i < params.length; i++) + { + paramClasses[i] = params[i].getClass(); + }*/ + + try + { + // Try to find the matching method on the class. + Method m = cls.getDeclaredMethod(method, paramClasses); + + // Make it accessible. + m.setAccessible(true); + + // Invoke it with the parameters. + return m.invoke(o, params); + } + catch (NoSuchMethodException e) + { + throw new RuntimeException(e); + } + catch (IllegalAccessException e) + { + throw new RuntimeException(e); + } + catch (InvocationTargetException e) + { + throw new RuntimeException(e); + } + } + + /** + * Calls a named method on an object with a specified set of parameters. + * + * @param o The object to call. + * @param method The method name to call. + * @param params The parameters to pass. + * + * @return The return value from the method call. + */ + public static Object callMethod(Object o, String method, Object[] params) + { + // Get the objects class. + Class cls = o.getClass(); + + // Get the classes of the parameters. + Class[] paramClasses = new Class[params.length]; + + for (int i = 0; i < params.length; i++) + { + paramClasses[i] = params[i].getClass(); + } + + try + { + // Try to find the matching method on the class. + Method m = cls.getMethod(method, paramClasses); + + // Invoke it with the parameters. + return m.invoke(o, params); + } + catch (NoSuchMethodException e) + { + throw new RuntimeException(e); + } + catch (IllegalAccessException e) + { + throw new RuntimeException(e); + } + catch (InvocationTargetException e) + { + throw new RuntimeException(e); + } + } + + /** + * Calls a constuctor witht the specified arguments. + * + * @param constructor The constructor. + * @param args The arguments. + * @param The Class type. + * + * @return An instance of the class that the constructor is for. + */ + public static T newInstance(Constructor constructor, Object[] args) + { + try + { + return constructor.newInstance(args); + } + catch (InstantiationException e) + { + throw new RuntimeException(e); + } + catch (IllegalAccessException e) + { + throw new RuntimeException(e); + } + catch (InvocationTargetException e) + { + throw new RuntimeException(e); + } + } + + /** + * Gets the constructor of a class that takes the specified set of arguments if any matches. If no matching + * constructor is found then a runtime exception is raised. + * + * @param cls The class to get a constructor from. + * @param args The arguments to match. + * @param The class type. + * + * @return The constructor. + */ + public static Constructor getConstructor(Class cls, Class[] args) + { + try + { + return cls.getConstructor(args); + } + catch (NoSuchMethodException e) + { + throw new RuntimeException(e); + } + } +} diff --git a/java/common/src/main/java/org/apache/qpid/util/ReflectionUtilsException.java b/java/common/src/main/java/org/apache/qpid/util/ReflectionUtilsException.java new file mode 100644 index 0000000000..20499641ac --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/util/ReflectionUtilsException.java @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.util; + +/** + * Wraps a checked exception that occurs when {@link ReflectionUtils} encounters checked exceptions using standard + * Java reflection methods. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Wrap a checked reflection exception. + *
+ */ +public class ReflectionUtilsException extends RuntimeException +{ + /** + * Creates a runtime reflection exception, from a checked one. + * + * @param message The message. + * @param cause The causing exception. + */ + public ReflectionUtilsException(String message, Throwable cause) + { + super(message, cause); + } +} diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java index 12faa64528..efeda78abf 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java @@ -67,8 +67,8 @@ public abstract class CoordinatingTestCase extends TestCase * @param allClients The list of all possible test clients that may accept the invitation. * @param testProperties The test case definition. */ - public CoordinatingTestCase(TestClientDetails sender, TestClientDetails receiver, - Collection allClients, Properties testProperties) + public void TestCase(TestClientDetails sender, TestClientDetails receiver, Collection allClients, + Properties testProperties) { } /** @@ -83,8 +83,7 @@ public abstract class CoordinatingTestCase extends TestCase * * @return The test results from the senders and receivers. */ - protected Object[] sequenceTest(TestClientDetails sender, TestClientDetails receiver, - Collection allParticipatingClients, Properties testProperties) + protected Object[] sequenceTest(TestClientDetails sender, TestClientDetails receiver, Properties testProperties) { // Check if the sender and recevier did not accept the invite to this test. { diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/TestClientDetails.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/TestClientDetails.java index 3a201b6899..fcfb5a08fd 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/TestClientDetails.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/TestClientDetails.java @@ -28,8 +28,10 @@ package org.apache.qpid.interop.coordinator; public class TestClientDetails { /** The test clients name. */ + public String clientName; /* The test clients unqiue sequence number. Not currently used. */ /** The routing key of the test clients control topic. */ + public String privateControlKey; } diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java index 2c04a8e52b..a623687a0f 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java @@ -20,12 +20,22 @@ */ package org.apache.qpid.interop.testclient; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; -import javax.jms.Message; -import javax.jms.MessageListener; +import javax.jms.*; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; +import org.apache.log4j.Logger; + +import org.apache.qpid.util.ClasspathScanner; import org.apache.qpid.util.CommandLineParser; +import org.apache.qpid.util.PropertiesUtils; /** * Implements a test client as described in the interop testing spec @@ -49,76 +59,26 @@ import org.apache.qpid.util.CommandLineParser; */ public class TestClient implements MessageListener { + private static Logger log = Logger.getLogger(TestClient.class); + /** Holds the URL of the broker to run the tests on. */ String brokerUrl; /** Holds the virtual host to run the tests on. If null, then the default virtual host is used. */ String virtualHost; - /** Defines an enumeration of the control message types and handling behaviour for each. */ - protected enum ControlMessages implements MessageListener - { - INVITE_COMPULSORY - { - public void onMessage(Message message) - { - // Reply with the client name in an Enlist message. - } - }, - INVITE - { - public void onMessage(Message message) - { - // Extract the test properties. - - // Check if the requested test case is available. - { - // Make the requested test case the current test case. + /** Holds all the test cases loaded from the classpath. */ + Map testCases = new HashMap(); - // Reply by accepting the invite in an Enlist message. - } - } - }, - ASSIGN_ROLE - { - public void onMessage(Message message) - { - // Extract the test properties. + InteropClientTestCase currentTestCase; - // Reply by accepting the role in an Accept Role message. - } - }, - START - { - public void onMessage(Message message) - { - // Start the current test case. + public static final String CONNECTION_PROPERTY = "connectionfactory.broker"; + public static final String CONNECTION_NAME = "broker"; + public static final String CLIENT_NAME = "java"; + public static final String DEFAULT_CONNECTION_PROPS_RESOURCE = "org/apache/qpid/interop/client/connection.properties"; - // Generate the report from the test case and reply with it as a Report message. - } - }, - STATUS_REQUEST - { - public void onMessage(Message message) - { - // Generate the report from the test case and reply with it as a Report message. - } - }, - UNKNOWN - { - public void onMessage(Message message) - { - // Log a warning about this but otherwise ignore it. - } - }; - - /** - * Handles control messages appropriately depending on the message type. - * - * @param message The incoming message to handle. - */ - public abstract void onMessage(Message message); - } + private MessageProducer producer; + private Session session; public TestClient(String brokerUrl, String virtualHost) { @@ -172,42 +132,199 @@ public class TestClient implements MessageListener // Create a test client and start it running. TestClient client = new TestClient(brokerUrl, virtualHost); - client.start(); + + try + { + client.start(); + } + catch (Exception e) + { + log.error("The test client was unable to start.", e); + System.exit(1); + } } - private void start() + private void start() throws JMSException { // Use a class path scanner to find all the interop test case implementations. + Collection> testCaseClasses = + ClasspathScanner.getMatches(InteropClientTestCase.class, "^TestCase.*", true); // Create all the test case implementations and index them by the test names. + for (Class nextClass : testCaseClasses) + { + try + { + InteropClientTestCase testCase = nextClass.newInstance(); + testCases.put(testCase.getName(), testCase); + } + catch (InstantiationException e) + { + log.warn("Could not instantiate test case class: " + nextClass.getName(), e); + // Ignored. + } + catch (IllegalAccessException e) + { + log.warn("Could not instantiate test case class due to illegal access: " + nextClass.getName(), e); + // Ignored. + } + } // Open a connection to communicate with the coordinator on. + Connection connection = createConnection(DEFAULT_CONNECTION_PROPS_RESOURCE, brokerUrl, virtualHost); + + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Set this up to listen for control messages. + MessageConsumer consumer = session.createConsumer(session.createTopic("iop.control." + CLIENT_NAME)); + consumer.setMessageListener(this); // Create a producer to send replies with. + producer = session.createProducer(null); + + // Start listening for incoming control messages. + connection.start(); } /** - * Handles all incoming control messages. + * Establishes a JMS connection using a properties file and qpids built in JNDI implementation. This is a simple + * convenience method for code that does anticipate handling connection failures. All exceptions that indicate + * that the connection has failed, are wrapped as rutime exceptions, preumably handled by a top level failure + * handler. * - * @param message The incoming message. + * @todo Make username/password configurable. Allow multiple urls for fail over. Once it feels right, move it + * to a Utils library class. + * + * @param connectionPropsResource The name of the connection properties file. + * @param brokerUrl The broker url to connect to, null to use the default from the properties. + * @param virtualHost The virtual host to connectio to, null to use the default. + * + * @return A JMS conneciton. */ - public void onMessage(Message message) + private static Connection createConnection(String connectionPropsResource, String brokerUrl, String virtualHost) { - // Delegate the message handling to the message type specific handler. - extractMessageType(message).onMessage(message); + try + { + Properties connectionProps = + PropertiesUtils.getProperties(TestClient.class.getClassLoader().getResourceAsStream( + connectionPropsResource)); + + String connectionString = + "amqp://guest:guest/" + ((virtualHost != null) ? virtualHost : "") + "?brokerlist='" + brokerUrl + "'"; + connectionProps.setProperty(CONNECTION_PROPERTY, connectionString); + + Context ctx = new InitialContext(connectionProps); + + ConnectionFactory cf = (ConnectionFactory) ctx.lookup(CONNECTION_NAME); + Connection connection = cf.createConnection(); + + return connection; + } + catch (IOException e) + { + throw new RuntimeException(e); + } + catch (NamingException e) + { + throw new RuntimeException(e); + } + catch (JMSException e) + { + throw new RuntimeException(e); + } } /** - * Determines the control messsage type of incoming messages. - * - * @param message The message to determine the type of. + * Handles all incoming control messages. * - * @return The control message type of the message. + * @param message The incoming message. */ - protected ControlMessages extractMessageType(Message message) + public void onMessage(Message message) { - return null; + try + { + String controlType = message.getStringProperty("CONTROL_TYPE"); + String testName = message.getStringProperty("TEST_NAME"); + + // Check if the message is a test invite. + if ("INVITE".equals(controlType)) + { + String testCaseName = message.getStringProperty("TEST_NAME"); + + // Flag used to indicate that an enlist should be sent. Only enlist to compulsory invites or invites + // for which test cases exist. + boolean enlist = false; + + if (testCaseName != null) + { + // Check if the requested test case is available. + InteropClientTestCase testCase = testCases.get(testCaseName); + + if (testCase != null) + { + // Make the requested test case the current test case. + currentTestCase = testCase; + enlist = true; + } + } + else + { + enlist = true; + } + + if (enlist) + { + // Reply with the client name in an Enlist message. + Message enlistMessage = session.createMessage(); + enlistMessage.setStringProperty("CONTROL_TYPE", "ENLIST"); + enlistMessage.setStringProperty("CLIENT_NAME", CLIENT_NAME); + enlistMessage.setStringProperty("CLIENT_PRIVATE_CONTROL_KEY", "iop.control." + CLIENT_NAME); + enlistMessage.setJMSCorrelationID(message.getJMSCorrelationID()); + + producer.send(message.getJMSReplyTo(), enlistMessage); + } + } + else if ("ASSIGN_ROLE".equals(controlType)) + { + // Assign the role to the current test case. + String roleName = message.getStringProperty(""); + InteropClientTestCase.Roles role = Enum.valueOf(InteropClientTestCase.Roles.class, roleName); + + currentTestCase.assignRole(role, message); + + // Reply by accepting the role in an Accept Role message. + Message acceptRoleMessage = session.createMessage(); + acceptRoleMessage.setStringProperty("CONTROL_TYPE", "ACCEPT_ROLE"); + acceptRoleMessage.setJMSCorrelationID(message.getJMSCorrelationID()); + + producer.send(message.getJMSReplyTo(), acceptRoleMessage); + } + else if ("START".equals(controlType) || "STATUS_REQUEST".equals(controlType)) + { + if ("START".equals(controlType)) + { + // Start the current test case. + currentTestCase.start(); + } + + // Generate the report from the test case and reply with it as a Report message. + Message reportMessage = currentTestCase.getReport(session); + reportMessage.setStringProperty("CONTROL_TYPE", "REPORT"); + reportMessage.setJMSCorrelationID(message.getJMSCorrelationID()); + + producer.send(message.getJMSReplyTo(), reportMessage); + } + else + { + // Log a warning about this but otherwise ignore it. + log.warn("Got an unknown control message: " + message); + } + } + catch (JMSException e) + { + // Log a warning about this, but otherwise ignore it. + log.warn("A JMSException occurred whilst handling a message."); + log.debug("Got JMSException whilst handling message: " + message, e); + } } } diff --git a/java/integrationtests/src/main/java/org/apache/qpid/util/ClasspathScanner.java b/java/integrationtests/src/main/java/org/apache/qpid/util/ClasspathScanner.java index 1dd00da53b..35946e6c4e 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/util/ClasspathScanner.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/util/ClasspathScanner.java @@ -22,6 +22,10 @@ package org.apache.qpid.util; import java.io.File; import java.util.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.log4j.Logger; /** * An ClasspathScanner scans the classpath for classes that implement an interface or extend a base class and have names @@ -38,10 +42,12 @@ import java.util.*; * Responsibilities Collaborations * Find all classes matching type and name pattern on the classpath. * + * + * @todo Add logic to scan jars as well as directories. */ public class ClasspathScanner { - static final int SUFFIX_LENGTH = ".class".length(); + private static final Logger log = Logger.getLogger(ClasspathScanner.class); /** * Scans the classpath and returns all classes that extend a specified class and match a specified name. @@ -49,58 +55,124 @@ public class ClasspathScanner * that have a default constructor). * * @param matchingClass The class or interface to match. - * @param matchingRegexp The reular expression to match against the class name. + * @param matchingRegexp The regular expression to match against the class name. * @param beanOnly Flag to indicate that onyl classes with default constructors should be matched. * * @return All the classes that match this collector. */ - public static Collection> getMatches(Class matchingClass, String matchingRegexp, boolean beanOnly) + public static Collection> getMatches(Class matchingClass, String matchingRegexp, + boolean beanOnly) { + // Build a compiled regular expression from the pattern to match. + Pattern matchPattern = Pattern.compile(matchingRegexp); + String classPath = System.getProperty("java.class.path"); - Map result = new HashMap(); + Map> result = new HashMap>(); + // Find matching classes starting from all roots in the classpath. for (String path : splitClassPath(classPath)) { - gatherFiles(new File(path), "", result); + gatherFiles(new File(path), "", result, matchPattern, matchingClass); } return result.values(); } - private static void gatherFiles(File classRoot, String classFileName, Map result) + /** + * Finds all matching classes rooted at a given location in the file system. If location is a directory it + * is recursively examined. + * + * @param classRoot The root of the current point in the file system being examined. + * @param classFileName The name of the current file or directory to examine. + * @param result The accumulated mapping from class names to classes that match the scan. + * + * @todo Recursion ok as file system depth is not likely to exhaust the stack. Might be better to replace with + * iteration. + */ + private static void gatherFiles(File classRoot, String classFileName, Map> result, + Pattern matchPattern, Class matchClass) { File thisRoot = new File(classRoot, classFileName); + // If the current location is a file, check if it is a matching class. if (thisRoot.isFile()) { - if (matchesName(classFileName)) + // Check that the file has a matching name. + if (matchesName(classFileName, matchPattern)) { String className = classNameFromFile(classFileName); - result.put(className, className); + + // Check that the class has matching type. + try + { + Class candidateClass = Class.forName(className); + + Class matchedClass = matchesClass(candidateClass, matchClass); + + if (matchedClass != null) + { + result.put(className, matchedClass); + } + } + catch (ClassNotFoundException e) + { + // Ignore this. The matching class could not be loaded. + log.debug("Got ClassNotFoundException, ignoring.", e); + } } return; } - - String[] contents = thisRoot.list(); - - if (contents != null) + // Otherwise the current location is a directory, so examine all of its contents. + else { - for (String content : contents) + String[] contents = thisRoot.list(); + + if (contents != null) { - gatherFiles(classRoot, classFileName + File.separatorChar + content, result); + for (String content : contents) + { + gatherFiles(classRoot, classFileName + File.separatorChar + content, result, matchPattern, matchClass); + } } } } - private static boolean matchesName(String classFileName) + /** + * Checks if the specified class file name corresponds to a class with name matching the specified regular expression. + * + * @param classFileName The class file name. + * @param matchPattern The regular expression pattern to match. + * + * @return true if the class name matches, false otherwise. + */ + private static boolean matchesName(String classFileName, Pattern matchPattern) { - return classFileName.endsWith(".class") && (classFileName.indexOf('$') < 0) && (classFileName.indexOf("Test") > 0); + String className = classNameFromFile(classFileName); + Matcher matcher = matchPattern.matcher(className); + + return matcher.matches(); } - private static boolean matchesInterface() + /** + * Checks if the specified class to compare extends the base class being scanned for. + * + * @param matchingClass The base class to match against. + * @param toMatch The class to match against the base class. + * + * @return The class to check, cast as an instance of the class to match if the class extends the base class, or + * null otherwise. + */ + private static Class matchesClass(Class matchingClass, Class toMatch) { - return false; + try + { + return matchingClass.asSubclass(toMatch); + } + catch (ClassCastException e) + { + return null; + } } /** @@ -125,17 +197,22 @@ public class ClasspathScanner } /** - * convert /a/b.class to a.b + * Translates from the filename of a class to its fully qualified classname. Files are named using forward slash + * seperators and end in ".class", whereas fully qualified class names use "." sperators and no ".class" ending. * - * @param classFileName + * @param classFileName The filename of the class to translate to a class name. * - * @return + * @return The fully qualified class name. */ private static String classNameFromFile(String classFileName) { + // Remove the .class ending. + String s = classFileName.substring(0, classFileName.length() - ".class".length()); - String s = classFileName.substring(0, classFileName.length() - SUFFIX_LENGTH); + // Turn / seperators in . seperators. String s2 = s.replace(File.separatorChar, '.'); + + // Knock off any leading . caused by a leading /. if (s2.startsWith(".")) { return s2.substring(1); diff --git a/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationHelper.java b/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationHelper.java index bda089045a..631cab9f35 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationHelper.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationHelper.java @@ -20,13 +20,13 @@ */ package org.apache.qpid.util; -import java.util.Collection; +import java.util.*; import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicLong; -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageListener; +import javax.jms.*; /** * A conversation helper, uses a message correlation id pattern to match up sent and received messages as a conversation @@ -89,7 +89,8 @@ import javax.jms.MessageListener; * transactional mode, commits must happen before receiving, or no replies will come in. (unless there were some * pending on the queue?). Also, having received on a particular session, must ensure that session is used for all * subsequent sends and receive at least until the transaction is committed. So a message selector must be used - * to restrict receives on that session to prevent it picking up messages bound for other conversations. + * to restrict receives on that session to prevent it picking up messages bound for other conversations. Or use + * a temporary response queue, with only that session listening to it. * * @todo Want something convenient that hides many details. Write out some example use cases to get the best feel for * it. Pass in connection, send destination, receive destination. Provide endConvo, send, receive @@ -101,6 +102,32 @@ import javax.jms.MessageListener; */ public class ConversationHelper { + /** Holds a map from correlation id's to queues. */ + private Map> idsToQueues = new HashMap>(); + + private Session session; + private MessageProducer producer; + private MessageConsumer consumer; + + Class> queueClass; + + BlockingQueue deadLetterBox = new LinkedBlockingQueue(); + + ThreadLocal threadLocals = + new ThreadLocal() + { + protected PerThreadSettings initialValue() + { + PerThreadSettings settings = new PerThreadSettings(); + settings.conversationId = conversationIdGenerator.getAndIncrement(); + + return settings; + } + }; + + /** Generates new coversation id's as needed. */ + AtomicLong conversationIdGenerator = new AtomicLong(); + /** * Creates a conversation helper on the specified connection with the default sending destination, and listening * to the specified receiving destination. @@ -109,19 +136,53 @@ public class ConversationHelper * @param sendDestination The default sending destiation for all messages. * @param receiveDestination The destination to listen to for incoming messages. * @param queueClass The queue implementation class. + * + * @throws JMSException All undelying JMSExceptions are allowed to fall through. */ public ConversationHelper(Connection connection, Destination sendDestination, Destination receiveDestination, - Class queueClass) - { } + Class> queueClass) throws JMSException + { + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + producer = session.createProducer(sendDestination); + consumer = session.createConsumer(receiveDestination); + + consumer.setMessageListener(new Receiver()); + + this.queueClass = queueClass; + } /** * Sends a message to the default sending location. The correlation id of the message will be assigned by this * method, overriding any previously set value. * * @param message The message to send. + * + * @throws JMSException All undelying JMSExceptions are allowed to fall through. + */ + public void send(Message message) throws JMSException + { + PerThreadSettings settings = threadLocals.get(); + long conversationId = settings.conversationId; + message.setJMSCorrelationID(Long.toString(conversationId)); + + // Ensure that the reply queue for this conversation exists. + initQueueForId(conversationId); + + producer.send(message); + } + + /** + * Ensures that the reply queue for a conversation exists. + * + * @param conversationId The conversation correlation id. */ - public void send(Message message) - { } + private void initQueueForId(long conversationId) + { + if (!idsToQueues.containsKey(conversationId)) + { + idsToQueues.put(conversationId, ReflectionUtils.>newInstance(queueClass)); + } + } /** * Gets the next message in an ongoing conversation. This method may block until such a message is received. @@ -130,7 +191,22 @@ public class ConversationHelper */ public Message receive() { - return null; + PerThreadSettings settings = threadLocals.get(); + long conversationId = settings.conversationId; + + // Ensure that the reply queue for this conversation exists. + initQueueForId(conversationId); + + BlockingQueue queue = idsToQueues.get(conversationId); + + try + { + return queue.take(); + } + catch (InterruptedException e) + { + return null; + } } /** @@ -138,7 +214,18 @@ public class ConversationHelper * conversation are no longer valid, and any incoming messages using them will go to the dead letter box. */ public void end() - { } + { + // Ensure that the thread local for the current thread is cleaned up. + PerThreadSettings settings = threadLocals.get(); + long conversationId = settings.conversationId; + threadLocals.remove(); + + // Ensure that its queue is removed from the queue map. + BlockingQueue queue = idsToQueues.remove(conversationId); + + // Move any outstanding messages on the threads conversation id into the dead letter box. + queue.drainTo(deadLetterBox); + } /** * Clears the dead letter box, returning all messages that were in it. @@ -147,7 +234,10 @@ public class ConversationHelper */ public Collection emptyDeadLetterBox() { - return null; + Collection result = new LinkedList(); + deadLetterBox.drainTo(result); + + return result; } /** @@ -162,6 +252,32 @@ public class ConversationHelper * @param message The incoming message. */ public void onMessage(Message message) - { } + { + try + { + Long conversationId = Long.parseLong(message.getJMSCorrelationID()); + + // Find the converstaion queue to place the message on. If there is no conversation for the message id, + // the the dead letter box queue is used. + BlockingQueue queue = idsToQueues.get(conversationId); + queue = (queue == null) ? deadLetterBox : queue; + + queue.put(message); + } + catch (JMSException e) + { + throw new RuntimeException(e); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + } + + protected class PerThreadSettings + { + /** Holds the correlation id for the current threads conversation. */ + long conversationId; } } -- cgit v1.2.1