From 5495e89711087f5f5896ebc30e25dae222a72da0 Mon Sep 17 00:00:00 2001 From: Robert Greig Date: Thu, 15 Mar 2007 10:12:57 +0000 Subject: Commit of interop test stuff prior to M2 branch. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@518559 13f79535-47bb-0310-9956-ffa450edef68 --- .../interop/coordinator/CoordinatingTestCase.java | 7 +- .../interop/coordinator/TestClientDetails.java | 2 + .../apache/qpid/interop/testclient/TestClient.java | 269 +++++++++++++++------ .../org/apache/qpid/util/ClasspathScanner.java | 121 +++++++-- .../org/apache/qpid/util/ConversationHelper.java | 144 +++++++++-- 5 files changed, 427 insertions(+), 116 deletions(-) (limited to 'qpid/java/integrationtests') diff --git a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java index 12faa64528..efeda78abf 100644 --- a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java +++ b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java @@ -67,8 +67,8 @@ public abstract class CoordinatingTestCase extends TestCase * @param allClients The list of all possible test clients that may accept the invitation. * @param testProperties The test case definition. */ - public CoordinatingTestCase(TestClientDetails sender, TestClientDetails receiver, - Collection allClients, Properties testProperties) + public void TestCase(TestClientDetails sender, TestClientDetails receiver, Collection allClients, + Properties testProperties) { } /** @@ -83,8 +83,7 @@ public abstract class CoordinatingTestCase extends TestCase * * @return The test results from the senders and receivers. */ - protected Object[] sequenceTest(TestClientDetails sender, TestClientDetails receiver, - Collection allParticipatingClients, Properties testProperties) + protected Object[] sequenceTest(TestClientDetails sender, TestClientDetails receiver, Properties testProperties) { // Check if the sender and recevier did not accept the invite to this test. { diff --git a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/TestClientDetails.java b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/TestClientDetails.java index 3a201b6899..fcfb5a08fd 100644 --- a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/TestClientDetails.java +++ b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/TestClientDetails.java @@ -28,8 +28,10 @@ package org.apache.qpid.interop.coordinator; public class TestClientDetails { /** The test clients name. */ + public String clientName; /* The test clients unqiue sequence number. Not currently used. */ /** The routing key of the test clients control topic. */ + public String privateControlKey; } diff --git a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java index 2c04a8e52b..a623687a0f 100644 --- a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java +++ b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java @@ -20,12 +20,22 @@ */ package org.apache.qpid.interop.testclient; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; -import javax.jms.Message; -import javax.jms.MessageListener; +import javax.jms.*; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; +import org.apache.log4j.Logger; + +import org.apache.qpid.util.ClasspathScanner; import org.apache.qpid.util.CommandLineParser; +import org.apache.qpid.util.PropertiesUtils; /** * Implements a test client as described in the interop testing spec @@ -49,76 +59,26 @@ import org.apache.qpid.util.CommandLineParser; */ public class TestClient implements MessageListener { + private static Logger log = Logger.getLogger(TestClient.class); + /** Holds the URL of the broker to run the tests on. */ String brokerUrl; /** Holds the virtual host to run the tests on. If null, then the default virtual host is used. */ String virtualHost; - /** Defines an enumeration of the control message types and handling behaviour for each. */ - protected enum ControlMessages implements MessageListener - { - INVITE_COMPULSORY - { - public void onMessage(Message message) - { - // Reply with the client name in an Enlist message. - } - }, - INVITE - { - public void onMessage(Message message) - { - // Extract the test properties. - - // Check if the requested test case is available. - { - // Make the requested test case the current test case. + /** Holds all the test cases loaded from the classpath. */ + Map testCases = new HashMap(); - // Reply by accepting the invite in an Enlist message. - } - } - }, - ASSIGN_ROLE - { - public void onMessage(Message message) - { - // Extract the test properties. + InteropClientTestCase currentTestCase; - // Reply by accepting the role in an Accept Role message. - } - }, - START - { - public void onMessage(Message message) - { - // Start the current test case. + public static final String CONNECTION_PROPERTY = "connectionfactory.broker"; + public static final String CONNECTION_NAME = "broker"; + public static final String CLIENT_NAME = "java"; + public static final String DEFAULT_CONNECTION_PROPS_RESOURCE = "org/apache/qpid/interop/client/connection.properties"; - // Generate the report from the test case and reply with it as a Report message. - } - }, - STATUS_REQUEST - { - public void onMessage(Message message) - { - // Generate the report from the test case and reply with it as a Report message. - } - }, - UNKNOWN - { - public void onMessage(Message message) - { - // Log a warning about this but otherwise ignore it. - } - }; - - /** - * Handles control messages appropriately depending on the message type. - * - * @param message The incoming message to handle. - */ - public abstract void onMessage(Message message); - } + private MessageProducer producer; + private Session session; public TestClient(String brokerUrl, String virtualHost) { @@ -172,42 +132,199 @@ public class TestClient implements MessageListener // Create a test client and start it running. TestClient client = new TestClient(brokerUrl, virtualHost); - client.start(); + + try + { + client.start(); + } + catch (Exception e) + { + log.error("The test client was unable to start.", e); + System.exit(1); + } } - private void start() + private void start() throws JMSException { // Use a class path scanner to find all the interop test case implementations. + Collection> testCaseClasses = + ClasspathScanner.getMatches(InteropClientTestCase.class, "^TestCase.*", true); // Create all the test case implementations and index them by the test names. + for (Class nextClass : testCaseClasses) + { + try + { + InteropClientTestCase testCase = nextClass.newInstance(); + testCases.put(testCase.getName(), testCase); + } + catch (InstantiationException e) + { + log.warn("Could not instantiate test case class: " + nextClass.getName(), e); + // Ignored. + } + catch (IllegalAccessException e) + { + log.warn("Could not instantiate test case class due to illegal access: " + nextClass.getName(), e); + // Ignored. + } + } // Open a connection to communicate with the coordinator on. + Connection connection = createConnection(DEFAULT_CONNECTION_PROPS_RESOURCE, brokerUrl, virtualHost); + + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Set this up to listen for control messages. + MessageConsumer consumer = session.createConsumer(session.createTopic("iop.control." + CLIENT_NAME)); + consumer.setMessageListener(this); // Create a producer to send replies with. + producer = session.createProducer(null); + + // Start listening for incoming control messages. + connection.start(); } /** - * Handles all incoming control messages. + * Establishes a JMS connection using a properties file and qpids built in JNDI implementation. This is a simple + * convenience method for code that does anticipate handling connection failures. All exceptions that indicate + * that the connection has failed, are wrapped as rutime exceptions, preumably handled by a top level failure + * handler. * - * @param message The incoming message. + * @todo Make username/password configurable. Allow multiple urls for fail over. Once it feels right, move it + * to a Utils library class. + * + * @param connectionPropsResource The name of the connection properties file. + * @param brokerUrl The broker url to connect to, null to use the default from the properties. + * @param virtualHost The virtual host to connectio to, null to use the default. + * + * @return A JMS conneciton. */ - public void onMessage(Message message) + private static Connection createConnection(String connectionPropsResource, String brokerUrl, String virtualHost) { - // Delegate the message handling to the message type specific handler. - extractMessageType(message).onMessage(message); + try + { + Properties connectionProps = + PropertiesUtils.getProperties(TestClient.class.getClassLoader().getResourceAsStream( + connectionPropsResource)); + + String connectionString = + "amqp://guest:guest/" + ((virtualHost != null) ? virtualHost : "") + "?brokerlist='" + brokerUrl + "'"; + connectionProps.setProperty(CONNECTION_PROPERTY, connectionString); + + Context ctx = new InitialContext(connectionProps); + + ConnectionFactory cf = (ConnectionFactory) ctx.lookup(CONNECTION_NAME); + Connection connection = cf.createConnection(); + + return connection; + } + catch (IOException e) + { + throw new RuntimeException(e); + } + catch (NamingException e) + { + throw new RuntimeException(e); + } + catch (JMSException e) + { + throw new RuntimeException(e); + } } /** - * Determines the control messsage type of incoming messages. - * - * @param message The message to determine the type of. + * Handles all incoming control messages. * - * @return The control message type of the message. + * @param message The incoming message. */ - protected ControlMessages extractMessageType(Message message) + public void onMessage(Message message) { - return null; + try + { + String controlType = message.getStringProperty("CONTROL_TYPE"); + String testName = message.getStringProperty("TEST_NAME"); + + // Check if the message is a test invite. + if ("INVITE".equals(controlType)) + { + String testCaseName = message.getStringProperty("TEST_NAME"); + + // Flag used to indicate that an enlist should be sent. Only enlist to compulsory invites or invites + // for which test cases exist. + boolean enlist = false; + + if (testCaseName != null) + { + // Check if the requested test case is available. + InteropClientTestCase testCase = testCases.get(testCaseName); + + if (testCase != null) + { + // Make the requested test case the current test case. + currentTestCase = testCase; + enlist = true; + } + } + else + { + enlist = true; + } + + if (enlist) + { + // Reply with the client name in an Enlist message. + Message enlistMessage = session.createMessage(); + enlistMessage.setStringProperty("CONTROL_TYPE", "ENLIST"); + enlistMessage.setStringProperty("CLIENT_NAME", CLIENT_NAME); + enlistMessage.setStringProperty("CLIENT_PRIVATE_CONTROL_KEY", "iop.control." + CLIENT_NAME); + enlistMessage.setJMSCorrelationID(message.getJMSCorrelationID()); + + producer.send(message.getJMSReplyTo(), enlistMessage); + } + } + else if ("ASSIGN_ROLE".equals(controlType)) + { + // Assign the role to the current test case. + String roleName = message.getStringProperty(""); + InteropClientTestCase.Roles role = Enum.valueOf(InteropClientTestCase.Roles.class, roleName); + + currentTestCase.assignRole(role, message); + + // Reply by accepting the role in an Accept Role message. + Message acceptRoleMessage = session.createMessage(); + acceptRoleMessage.setStringProperty("CONTROL_TYPE", "ACCEPT_ROLE"); + acceptRoleMessage.setJMSCorrelationID(message.getJMSCorrelationID()); + + producer.send(message.getJMSReplyTo(), acceptRoleMessage); + } + else if ("START".equals(controlType) || "STATUS_REQUEST".equals(controlType)) + { + if ("START".equals(controlType)) + { + // Start the current test case. + currentTestCase.start(); + } + + // Generate the report from the test case and reply with it as a Report message. + Message reportMessage = currentTestCase.getReport(session); + reportMessage.setStringProperty("CONTROL_TYPE", "REPORT"); + reportMessage.setJMSCorrelationID(message.getJMSCorrelationID()); + + producer.send(message.getJMSReplyTo(), reportMessage); + } + else + { + // Log a warning about this but otherwise ignore it. + log.warn("Got an unknown control message: " + message); + } + } + catch (JMSException e) + { + // Log a warning about this, but otherwise ignore it. + log.warn("A JMSException occurred whilst handling a message."); + log.debug("Got JMSException whilst handling message: " + message, e); + } } } diff --git a/qpid/java/integrationtests/src/main/java/org/apache/qpid/util/ClasspathScanner.java b/qpid/java/integrationtests/src/main/java/org/apache/qpid/util/ClasspathScanner.java index 1dd00da53b..35946e6c4e 100644 --- a/qpid/java/integrationtests/src/main/java/org/apache/qpid/util/ClasspathScanner.java +++ b/qpid/java/integrationtests/src/main/java/org/apache/qpid/util/ClasspathScanner.java @@ -22,6 +22,10 @@ package org.apache.qpid.util; import java.io.File; import java.util.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.log4j.Logger; /** * An ClasspathScanner scans the classpath for classes that implement an interface or extend a base class and have names @@ -38,10 +42,12 @@ import java.util.*; * Responsibilities Collaborations * Find all classes matching type and name pattern on the classpath. * + * + * @todo Add logic to scan jars as well as directories. */ public class ClasspathScanner { - static final int SUFFIX_LENGTH = ".class".length(); + private static final Logger log = Logger.getLogger(ClasspathScanner.class); /** * Scans the classpath and returns all classes that extend a specified class and match a specified name. @@ -49,58 +55,124 @@ public class ClasspathScanner * that have a default constructor). * * @param matchingClass The class or interface to match. - * @param matchingRegexp The reular expression to match against the class name. + * @param matchingRegexp The regular expression to match against the class name. * @param beanOnly Flag to indicate that onyl classes with default constructors should be matched. * * @return All the classes that match this collector. */ - public static Collection> getMatches(Class matchingClass, String matchingRegexp, boolean beanOnly) + public static Collection> getMatches(Class matchingClass, String matchingRegexp, + boolean beanOnly) { + // Build a compiled regular expression from the pattern to match. + Pattern matchPattern = Pattern.compile(matchingRegexp); + String classPath = System.getProperty("java.class.path"); - Map result = new HashMap(); + Map> result = new HashMap>(); + // Find matching classes starting from all roots in the classpath. for (String path : splitClassPath(classPath)) { - gatherFiles(new File(path), "", result); + gatherFiles(new File(path), "", result, matchPattern, matchingClass); } return result.values(); } - private static void gatherFiles(File classRoot, String classFileName, Map result) + /** + * Finds all matching classes rooted at a given location in the file system. If location is a directory it + * is recursively examined. + * + * @param classRoot The root of the current point in the file system being examined. + * @param classFileName The name of the current file or directory to examine. + * @param result The accumulated mapping from class names to classes that match the scan. + * + * @todo Recursion ok as file system depth is not likely to exhaust the stack. Might be better to replace with + * iteration. + */ + private static void gatherFiles(File classRoot, String classFileName, Map> result, + Pattern matchPattern, Class matchClass) { File thisRoot = new File(classRoot, classFileName); + // If the current location is a file, check if it is a matching class. if (thisRoot.isFile()) { - if (matchesName(classFileName)) + // Check that the file has a matching name. + if (matchesName(classFileName, matchPattern)) { String className = classNameFromFile(classFileName); - result.put(className, className); + + // Check that the class has matching type. + try + { + Class candidateClass = Class.forName(className); + + Class matchedClass = matchesClass(candidateClass, matchClass); + + if (matchedClass != null) + { + result.put(className, matchedClass); + } + } + catch (ClassNotFoundException e) + { + // Ignore this. The matching class could not be loaded. + log.debug("Got ClassNotFoundException, ignoring.", e); + } } return; } - - String[] contents = thisRoot.list(); - - if (contents != null) + // Otherwise the current location is a directory, so examine all of its contents. + else { - for (String content : contents) + String[] contents = thisRoot.list(); + + if (contents != null) { - gatherFiles(classRoot, classFileName + File.separatorChar + content, result); + for (String content : contents) + { + gatherFiles(classRoot, classFileName + File.separatorChar + content, result, matchPattern, matchClass); + } } } } - private static boolean matchesName(String classFileName) + /** + * Checks if the specified class file name corresponds to a class with name matching the specified regular expression. + * + * @param classFileName The class file name. + * @param matchPattern The regular expression pattern to match. + * + * @return true if the class name matches, false otherwise. + */ + private static boolean matchesName(String classFileName, Pattern matchPattern) { - return classFileName.endsWith(".class") && (classFileName.indexOf('$') < 0) && (classFileName.indexOf("Test") > 0); + String className = classNameFromFile(classFileName); + Matcher matcher = matchPattern.matcher(className); + + return matcher.matches(); } - private static boolean matchesInterface() + /** + * Checks if the specified class to compare extends the base class being scanned for. + * + * @param matchingClass The base class to match against. + * @param toMatch The class to match against the base class. + * + * @return The class to check, cast as an instance of the class to match if the class extends the base class, or + * null otherwise. + */ + private static Class matchesClass(Class matchingClass, Class toMatch) { - return false; + try + { + return matchingClass.asSubclass(toMatch); + } + catch (ClassCastException e) + { + return null; + } } /** @@ -125,17 +197,22 @@ public class ClasspathScanner } /** - * convert /a/b.class to a.b + * Translates from the filename of a class to its fully qualified classname. Files are named using forward slash + * seperators and end in ".class", whereas fully qualified class names use "." sperators and no ".class" ending. * - * @param classFileName + * @param classFileName The filename of the class to translate to a class name. * - * @return + * @return The fully qualified class name. */ private static String classNameFromFile(String classFileName) { + // Remove the .class ending. + String s = classFileName.substring(0, classFileName.length() - ".class".length()); - String s = classFileName.substring(0, classFileName.length() - SUFFIX_LENGTH); + // Turn / seperators in . seperators. String s2 = s.replace(File.separatorChar, '.'); + + // Knock off any leading . caused by a leading /. if (s2.startsWith(".")) { return s2.substring(1); diff --git a/qpid/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationHelper.java b/qpid/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationHelper.java index bda089045a..631cab9f35 100644 --- a/qpid/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationHelper.java +++ b/qpid/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationHelper.java @@ -20,13 +20,13 @@ */ package org.apache.qpid.util; -import java.util.Collection; +import java.util.*; import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicLong; -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageListener; +import javax.jms.*; /** * A conversation helper, uses a message correlation id pattern to match up sent and received messages as a conversation @@ -89,7 +89,8 @@ import javax.jms.MessageListener; * transactional mode, commits must happen before receiving, or no replies will come in. (unless there were some * pending on the queue?). Also, having received on a particular session, must ensure that session is used for all * subsequent sends and receive at least until the transaction is committed. So a message selector must be used - * to restrict receives on that session to prevent it picking up messages bound for other conversations. + * to restrict receives on that session to prevent it picking up messages bound for other conversations. Or use + * a temporary response queue, with only that session listening to it. * * @todo Want something convenient that hides many details. Write out some example use cases to get the best feel for * it. Pass in connection, send destination, receive destination. Provide endConvo, send, receive @@ -101,6 +102,32 @@ import javax.jms.MessageListener; */ public class ConversationHelper { + /** Holds a map from correlation id's to queues. */ + private Map> idsToQueues = new HashMap>(); + + private Session session; + private MessageProducer producer; + private MessageConsumer consumer; + + Class> queueClass; + + BlockingQueue deadLetterBox = new LinkedBlockingQueue(); + + ThreadLocal threadLocals = + new ThreadLocal() + { + protected PerThreadSettings initialValue() + { + PerThreadSettings settings = new PerThreadSettings(); + settings.conversationId = conversationIdGenerator.getAndIncrement(); + + return settings; + } + }; + + /** Generates new coversation id's as needed. */ + AtomicLong conversationIdGenerator = new AtomicLong(); + /** * Creates a conversation helper on the specified connection with the default sending destination, and listening * to the specified receiving destination. @@ -109,19 +136,53 @@ public class ConversationHelper * @param sendDestination The default sending destiation for all messages. * @param receiveDestination The destination to listen to for incoming messages. * @param queueClass The queue implementation class. + * + * @throws JMSException All undelying JMSExceptions are allowed to fall through. */ public ConversationHelper(Connection connection, Destination sendDestination, Destination receiveDestination, - Class queueClass) - { } + Class> queueClass) throws JMSException + { + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + producer = session.createProducer(sendDestination); + consumer = session.createConsumer(receiveDestination); + + consumer.setMessageListener(new Receiver()); + + this.queueClass = queueClass; + } /** * Sends a message to the default sending location. The correlation id of the message will be assigned by this * method, overriding any previously set value. * * @param message The message to send. + * + * @throws JMSException All undelying JMSExceptions are allowed to fall through. + */ + public void send(Message message) throws JMSException + { + PerThreadSettings settings = threadLocals.get(); + long conversationId = settings.conversationId; + message.setJMSCorrelationID(Long.toString(conversationId)); + + // Ensure that the reply queue for this conversation exists. + initQueueForId(conversationId); + + producer.send(message); + } + + /** + * Ensures that the reply queue for a conversation exists. + * + * @param conversationId The conversation correlation id. */ - public void send(Message message) - { } + private void initQueueForId(long conversationId) + { + if (!idsToQueues.containsKey(conversationId)) + { + idsToQueues.put(conversationId, ReflectionUtils.>newInstance(queueClass)); + } + } /** * Gets the next message in an ongoing conversation. This method may block until such a message is received. @@ -130,7 +191,22 @@ public class ConversationHelper */ public Message receive() { - return null; + PerThreadSettings settings = threadLocals.get(); + long conversationId = settings.conversationId; + + // Ensure that the reply queue for this conversation exists. + initQueueForId(conversationId); + + BlockingQueue queue = idsToQueues.get(conversationId); + + try + { + return queue.take(); + } + catch (InterruptedException e) + { + return null; + } } /** @@ -138,7 +214,18 @@ public class ConversationHelper * conversation are no longer valid, and any incoming messages using them will go to the dead letter box. */ public void end() - { } + { + // Ensure that the thread local for the current thread is cleaned up. + PerThreadSettings settings = threadLocals.get(); + long conversationId = settings.conversationId; + threadLocals.remove(); + + // Ensure that its queue is removed from the queue map. + BlockingQueue queue = idsToQueues.remove(conversationId); + + // Move any outstanding messages on the threads conversation id into the dead letter box. + queue.drainTo(deadLetterBox); + } /** * Clears the dead letter box, returning all messages that were in it. @@ -147,7 +234,10 @@ public class ConversationHelper */ public Collection emptyDeadLetterBox() { - return null; + Collection result = new LinkedList(); + deadLetterBox.drainTo(result); + + return result; } /** @@ -162,6 +252,32 @@ public class ConversationHelper * @param message The incoming message. */ public void onMessage(Message message) - { } + { + try + { + Long conversationId = Long.parseLong(message.getJMSCorrelationID()); + + // Find the converstaion queue to place the message on. If there is no conversation for the message id, + // the the dead letter box queue is used. + BlockingQueue queue = idsToQueues.get(conversationId); + queue = (queue == null) ? deadLetterBox : queue; + + queue.put(message); + } + catch (JMSException e) + { + throw new RuntimeException(e); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + } + + protected class PerThreadSettings + { + /** Holds the correlation id for the current threads conversation. */ + long conversationId; } } -- cgit v1.2.1