From 5518fd899d97459bcd8c45b850da447697a60fe8 Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Wed, 23 Apr 2008 23:56:38 +0000 Subject: QPID-832 sync from M2.x git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@651113 13f79535-47bb-0310-9956-ffa450edef68 --- .../Qpid.Integration.Tests/framework/alljava.csx | 7851 ++++++++++++++++++++ 1 file changed, 7851 insertions(+) create mode 100644 qpid/dotnet/Qpid.Integration.Tests/framework/alljava.csx (limited to 'qpid/dotnet/Qpid.Integration.Tests/framework/alljava.csx') diff --git a/qpid/dotnet/Qpid.Integration.Tests/framework/alljava.csx b/qpid/dotnet/Qpid.Integration.Tests/framework/alljava.csx new file mode 100644 index 0000000000..23ebd53a5b --- /dev/null +++ b/qpid/dotnet/Qpid.Integration.Tests/framework/alljava.csx @@ -0,0 +1,7851 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace Apache.Qpid.Integration.Tests.framework +{ + /// + /// Assertion models an assertion on a test . + /// + ///

+ ///
CRC Card
Responsibilities + ///
Indicate whether or not the assertion passes when applied. + ///
+ ///

+ public interface Assertion + { + /// + /// Applies the assertion. + /// + /// true if the assertion passes, false if it fails. + public bool apply(); + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System.Collections.Generic.LinkedList; +using System.Collections.Generic.IList; + +namespace Apache.Qpid.Integration.Tests.framework +{ + /// + /// AssertionBase is a base class for implenmenting assertions. It provides a mechanism to store error messages, and + /// report all error messages when its method is called. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Collect error messages. + ///
+ ///

+ public abstract class AssertionBase : Assertion + { + /// Holds the error messages. + IList errors = new LinkedList(); + + /// + /// Adds an error message to the assertion. + /// + /// An error message to add to the assertion. + public void addError(string error) + { + errors.add(error); + } + + /// + /// Prints all of the error messages in the assertion into a string. + /// + /// All of the error messages in the assertion as a string. + public string ToString() + { + string result = ""; + + for (string error : errors) + { + result += error; + } + + return result; + } + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace Apache.Qpid.Integration.Tests.framework +{ + /// + /// BrokerLifecycleAware is an awareness interface implemented by test cases that can run control the life-cycle of + /// the brokers on which they run. Its purpose is to expose additional instrumentation of brokers during testing, that + /// enables tests to use an automated failure mechanism to simulate broker failures, and to re-start failed brokers. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Indicate whether or not a test case is using an in-vm broker. + ///
Track which in-vm broker is currently in use. + ///
Accept setting of a failure mechanism. . + ///
+ ///

+ /// + /// Need to think about how to present the brokers through this interface. Thinking numbering the available + /// brokers from 1 will do. Then can kill 1 and assume failing onto 2. Restart 1 and kill 2 and fail back onto + /// 1 again? + public interface BrokerLifecycleAware + { + public void setInVmBrokers(); + + /// + /// Indicates whether or not a test case is using in-vm brokers. + /// + /// true if the test is using in-vm brokers, false otherwise. + public bool usingInVmBroker(); + + /// + /// Sets the currently live in-vm broker. + /// + /// The currently live in-vm broker. + public void setLiveBroker(int i); + + /// + /// Reports the currently live in-vm broker. + /// + /// The currently live in-vm broker. + public int getLiveBroker(); + + /// + /// Accepts a failure mechanism. + /// + /// The failure mechanism. + public void setFailureMechanism(CauseFailure failureMechanism); + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace Apache.Qpid.Integration.Tests.framework +{ + /// + /// CauseFailure provides a method to cause a failure in a messaging broker, usually used in conjunction with fail-over + /// or other failure mode testing. In some cases failures may be automated, for example by shutting down an in-vm broker, + /// or by sending a special control signal to a broker over a network connection. In other cases, it may be preferable + /// to ask a user interactively to cause a failure scenario, in which case an implementation may display a prompt or + /// dialog box asking for notification once the failure has been caused. The purpose of this interface is to abstract + /// the exact cause and nature of a failure out of failure test cases. + /// + ///

+ ///
CRC Card
Responsibilities + ///
Cause messaging broker failure. + ///
+ ///

+ public interface CauseFailure + { + /// Causes the active message broker to fail. + void causeFailure(); + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using Apache.Qpid.Integration.Tests.framework.CauseFailure; + +using java.io.IOException; + +namespace Apache.Qpid.Integration.Tests.framework +{ + /// + /// Causes a message broker failure by interactively prompting the user to cause it. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Cause messaging broker failure. + ///
+ ///

+ public class CauseFailureUserPrompt : CauseFailure + { + /// Causes the active message broker to fail. + public void causeFailure() + { + waitForUser("Cause a broker failure now, then press Return."); + } + + /// + /// Outputs a prompt to the console and waits for the user to press return. + /// + /// The prompt to display on the console. + private void waitForUser(string prompt) + { + System.out.println(prompt); + + try + { + System.in.read(); + } + catch (IOException e) + { + // Ignored. + } + + System.out.println("Continuing."); + } + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System.Collections.Generic.IList; + +namespace Apache.Qpid.Integration.Tests.framework +{ + /// + /// A Circuit is the basic test unit against which test cases are to be written. A circuit consists of two 'ends', an + /// instigating 'publisher' end and a more passive 'receivers' end. + /// + ///

Once created, the life-cycle of a circuit may be controlled by ing it, or ing it. + /// Once started, the circuit is ready to send messages over. Once closed the circuit can no longer be used. + /// + ///

The state of the circuit may be taken with the method, and asserted against by the + /// method. + /// + ///

There is a default test procedure which may be performed against the circuit. The outline of this procedure is: + /// + ///

+    /// Start the circuit.
+    /// Send test messages.
+    /// Request a status report.
+    /// Assert conditions on the publishing end of the circuit.
+    /// Assert conditions on the receiving end of the circuit.
+    /// Close the circuit.
+    /// Pass with no failed assertions or fail with a list of failed assertions.
+    /// 
+ /// + ///

+ ///
CRC Card
Responsibilities + ///
Supply the publishing and receiving ends of a test messaging circuit. + ///
Start the circuit running. + ///
Close the circuit down. + ///
Take a reading of the circuits state. + ///
Apply assertions against the circuits state. + ///
Send test messages over the circuit. + ///
Perform the default test procedue on the circuit. + ///
+ ///

+ public interface Circuit + { + /// + /// Gets the interface on the publishing end of the circuit. + /// + /// The publishing end of the circuit. + public Publisher getPublisher(); + + /// + /// Gets the interface on the receiving end of the circuit. + /// + /// The receiving end of the circuit. + public Receiver getReceiver(); + + /// + /// Connects and starts the circuit. After this method is called the circuit is ready to send messages. + public void start(); + + /// + /// Checks the test circuit. The effect of this is to gather the circuits state, for both ends of the circuit, + /// into a report, against which assertions may be checked. + public void check(); + + /// + /// Closes the circuit. All associated resources are closed. + public void close(); + + /// + /// Applied a list of assertions against the test circuit. The method should be called before doing + /// this, to ensure that the circuit has gathered its state into a report to assert against. + /// + /// The list of assertions to apply to the circuit. + /// + /// Any assertions that failed. + public IList applyAssertions(List assertions); + + /// + /// Runs the default test procedure against the circuit, and checks that all of the specified assertions hold. + /// + /// The number of messages to send using the default test procedure. + /// The list of assertions to apply. + /// + /// Any assertions that failed. + public IList test(int numMessages, List assertions); + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using javax.jms.*; + +namespace Apache.Qpid.Integration.Tests.framework +{ + /// + /// A CircuitEnd is a pair consisting of one message producer and one message consumer, that represents one end of a + /// test circuit. It is a standard unit of connectivity allowing a full-duplex conversation to be held, provided both + /// the consumer and producer are instantiated and configured. + /// + ///

+ ///
CRC Card
Responsibilities + ///
Provide a message producer for sending messages. + ///
Provide a message consumer for receiving messages. + ///
+ ///

+ /// + /// Update the so that it accepts these as the basic conversation + /// connection units. + public interface CircuitEnd + { + /// + /// Gets the message producer at this circuit end point. + /// + /// The message producer at with this circuit end point. + public MessageProducer getProducer(); + + /// + /// Gets the message consumer at this circuit end point. + /// + /// The message consumer at this circuit end point. + public MessageConsumer getConsumer(); + + /// + /// Send the specified message over the producer at this end point. + /// + /// The message to send. + /// + /// Any JMS exception occuring during the send is allowed to fall through. + public void send(Message message) throws JMSException; + + /// + /// Gets the JMS Session associated with this circuit end point. + /// + /// The JMS Session associated with this circuit end point. + public Session getSession(); + + /// + /// Closes the message producers and consumers and the sessions, associated with this circuit end point. + /// + /// Any JMSExceptions occurring during the close are allowed to fall through. + public void close() throws JMSException; + + /// + /// Returns the message monitor for reporting on received messages on this circuit end. + /// + /// The message monitor for this circuit end. + public MessageMonitor getMessageMonitor(); + + /// + /// Returns the exception monitor for reporting on exceptions received on this circuit end. + /// + /// The exception monitor for this circuit end. + public ExceptionMonitor getExceptionMonitor(); + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using javax.jms.*; + +namespace Apache.Qpid.Integration.Tests.framework +{ + /// + /// A CircuitEndBase is a pair consisting of one message producer and one message consumer, that represents one end of a + /// test circuit. It is a standard unit of connectivity allowing a full-duplex conversation to be held, provided both + /// the consumer and producer are instantiated and configured. + /// + ///

+ ///
CRC Card
Responsibilities + ///
Provide a message producer for sending messages. + ///
Provide a message consumer for receiving messages. + ///
+ ///

+ public class CircuitEndBase : CircuitEnd + { + /// Holds the single message producer. + MessageProducer producer; + + /// Holds the single message consumer. + MessageConsumer consumer; + + /// Holds the controlSession for the circuit end. + Session session; + + /// Holds the message monitor for the circuit end. + MessageMonitor messageMonitor; + + /// Holds the exception monitor for the circuit end. + ExceptionMonitor exceptionMonitor; + + /// + /// Creates a circuit end point on the specified producer, consumer and controlSession. Monitors are also configured + /// for messages and exceptions received by the circuit end. + /// + /// The message producer for the circuit end point. + /// The message consumer for the circuit end point. + /// The controlSession for the circuit end point. + /// The monitor to notify of all messages received by the circuit end. + /// The monitor to notify of all exceptions received by the circuit end. + public CircuitEndBase(MessageProducer producer, MessageConsumer consumer, Session session, MessageMonitor messageMonitor, + ExceptionMonitor exceptionMonitor) + { + this.producer = producer; + this.consumer = consumer; + this.session = session; + + this.messageMonitor = messageMonitor; + this.exceptionMonitor = exceptionMonitor; + } + + /// + /// Gets the message producer at this circuit end point. + /// + /// The message producer at with this circuit end point. + public MessageProducer getProducer() + { + return producer; + } + + /// + /// Gets the message consumer at this circuit end point. + /// + /// The message consumer at this circuit end point. + public MessageConsumer getConsumer() + { + return consumer; + } + + /// + /// Send the specified message over the producer at this end point. + /// + /// The message to send. + /// Any JMS exception occuring during the send is allowed to fall through. + public void send(Message message) throws JMSException + { + producer.send(message); + } + + /// + /// Gets the JMS Session associated with this circuit end point. + /// + /// The JMS Session associated with this circuit end point. + public Session getSession() + { + return session; + } + + /// + /// Closes the message producers and consumers and the sessions, associated with this circuit end point. + /// + /// Any JMSExceptions occurring during the close are allowed to fall through. + public void close() throws JMSException + { + if (producer != null) + { + producer.close(); + } + + if (consumer != null) + { + consumer.close(); + } + } + + /// + /// Returns the message monitor for reporting on received messages on this circuit end. + /// + /// The message monitor for this circuit end. + public MessageMonitor getMessageMonitor() + { + return messageMonitor; + } + + /// + /// Returns the exception monitor for reporting on exceptions received on this circuit end. + /// + /// The exception monitor for this circuit end. + public ExceptionMonitor getExceptionMonitor() + { + return exceptionMonitor; + } + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace Apache.Qpid.Integration.Tests.framework.clocksynch +{ + /// + /// ClockSynchFailureException represents failure of a to achieve synchronization. For example, + /// this could be because a reference signal is not available, or because a desired accurracy cannot be attained. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Represent failure to achieve synchronization. + ///
+ ///

+ public class ClockSynchFailureException extends Exception + { + /// + /// Creates a clock synch failure exception. + /// + /// The detail message (which is saved for later retrieval by the method). + /// The cause (which is saved for later retrieval by the method). (A null + /// value is permitted, and indicates that the cause is nonexistent or unknown.) + public ClockSynchFailureException(string message, Throwable cause) + { + super(message, cause); + } + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace Apache.Qpid.Integration.Tests.framework.clocksynch +{ + /// + /// ClockSynchronizer provides an interface through which two nodes may synchronize their clocks. It is expected that one + /// node will act as the reference clock, to which no delta need be applied, and the other node will act as the slave, + /// and which must apply a delta to its local clock to get a clock synchronized with the reference. + /// + ///

The slave side will initiate the computation of a clock delta by calling the method. This method + /// will not return until the delta has been computed, at which point there is a method to return its value, as well as + /// an estimate of the likely error (usually one standard deviation), in the synchronization. For convenience there is a + /// method to return the value of System.nanoTime() with the delta added in. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Trigger a clock synchronization. + ///
Compute a clock delta to apply to the local clock. + ///
Estimate the error in the synchronzation. + ///
+ ///

+ public interface ClockSynchronizer + { + /// + /// The slave side should call this to copute a clock delta with the reference. + /// + /// If synchronization cannot be achieved. + public void synch() throws ClockSynchFailureException; + + /// + /// Gets the clock delta in nano seconds. + /// + /// The clock delta in nano seconds. + public long getDelta(); + + /// + /// Gets an estimate of the clock error in nan seconds. + /// + /// An estimate of the clock error in nan seconds. + public long getEpsilon(); + + /// + /// Gets the local clock time with any computed delta added in. + /// + /// The local clock time with any computed delta added in. + public long nanoTime(); + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; + +using uk.co.thebadgerset.junit.extensions.ShutdownHookable; +using uk.co.thebadgerset.junit.extensions.Throttle; + +namespace Apache.Qpid.Integration.Tests.framework.clocksynch +{ + /// + /// ClockSynchThread is a convenient utility for running a thread that periodically synchronizes the clock against + /// a reference. Supply it with a and a and it will continually keep the + /// clock up-to-date at a rate determined by the throttle. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Continually sychronize the clock at a throttled rate. + ///
+ ///

+ public class ClockSynchThread extends Thread : ShutdownHookable + { + /// Used for debugging. + private static ILog log = LogManager.GetLogger(typeof(ClockSynchThread)); + + /// Holds the clock syncher for the synch thread. + private ClockSynchronizer clockSyncher; + + /// Holds the throttle to limit the synch rate. + private Throttle throttle; + + /// Flag to indicate that the periodic clock syncher should keep running. + bool doSynch = true; + + /// + /// Creates a clock synchronizer thread from a clock synchronizer and a throttle. + /// + /// The clock synchronizer. + /// The throttle. + public ClockSynchThread(ClockSynchronizer syncher, Throttle throttle) + { + this.clockSyncher = syncher; + this.throttle = throttle; + } + + /// Terminates the synchronization thread. + public void terminate() + { + doSynch = false; + } + + /// Continually updates the clock, until is called. + public void run() + { + while (doSynch) + { + // Perform a clock clockSynch. + try + { + // Wait controlled by the throttle before doing the next synch. + throttle.throttle(); + + clockSyncher.synch(); + log.debug("Clock synched, delta = " + clockSyncher.getDelta() + ", epsilon = " + clockSyncher.getEpsilon() + + "."); + } + // Terminate the synch thread if the synchronization cannot be achieved. + catch (ClockSynchFailureException e) + { + log.debug("Cannot synchronize the clock (reference service may be down). Terminating the synch thread."); + doSynch = false; + } + } + } + + /// + /// Gets the clock synchronizer that is kept continually up to date. + /// + /// The clock synchronizer that is kept continually up to date. + public ClockSynchronizer getClockSyncher() + { + return clockSyncher; + } + + /// + /// Supplies a shutdown hook, that terminates the synching thread. + /// + /// The shut down hook. + public Thread getShutdownHook() + { + return new Thread(new Runnable() + { + public void run() + { + doSynch = false; + } + }); + } + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace Apache.Qpid.Integration.Tests.framework.clocksynch +{ + + /// + /// LocalClockSynchronizer is a fake that simply calls System.nanoTime(). It exists so that + /// the same tests can be run distributed or locally, taking timings against the ClockSynchronizer interface without + /// being aware of how they are being run. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Supply the local clock with no delta. + ///
+ ///

+ public class LocalClockSynchronizer : ClockSynchronizer + { + /// + /// The slave side should call this to copute a clock delta with the reference. + /// + /// If synchronization cannot be achieved. + public void synch() throws ClockSynchFailureException + { } + + /// + /// Gets the clock delta in nano seconds. + /// + /// The clock delta in nano seconds. + public long getDelta() + { + return 0L; + } + + /// + /// Gets an estimate of the clock error in nan seconds. + /// + /// An estimate of the clock error in nan seconds. + public long getEpsilon() + { + return 0L; + } + + /// + /// Gets the local clock time with any computed delta added in. + /// + /// The local clock time with any computed delta added in. + public long nanoTime() + { + return System.nanoTime(); + } + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; + +using uk.co.thebadgerset.junit.extensions.ShutdownHookable; + +using java.io.IOException; +using java.net.*; +using java.nio.ByteBuffer; + +namespace Apache.Qpid.Integration.Tests.framework.clocksynch +{ + /// + /// UDPClockReference supplies a refernce clock signal (generated from System.nanoTime()). + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Supply a reference clock signal. + ///
+ ///

+ /// + /// Port hard coded. Make configurable. + /// + /// Errors rethrown as runtimes, or silently terminate the service. Could add better error handling if needed. + public class UDPClockReference : Runnable, ShutdownHookable + { + /// Used for debugging. + // private static ILog log = LogManager.GetLogger(typeof(UDPClockReference)); + + /// Defines the timeout to use when polling the socket for time requests. + private static final int TIMEOUT = 200; + + /// Defines the port to run the clock reference on. + public static final int REFERENCE_PORT = 4444; + + /// Holds the socket to receive clock reference requests on. + protected DatagramSocket socket = null; + + /// Flag used to indicate that the time server should keep running. Set to false to terminate. + protected bool publish = true; + + /// Creates a clock reference service on the standard port. + public UDPClockReference() + { + try + { + socket = new DatagramSocket(REFERENCE_PORT); + socket.setSoTimeout(TIMEOUT); + } + catch (SocketException e) + { + throw new RuntimeException(e); + } + } + + /// + /// Implements the run loop for this reference time server. This waits for incoming time requests, and replies to + /// any, with a message with the local time stamp in it. Periodically (controlled by ), the run + /// loop will check if the flag has been cleared, and terminate the reference time service if so. + /// + public void run() + { + byte[] buf = new byte[256]; + ByteBuffer bbuf = ByteBuffer.wrap(buf); + + while (publish) + { + try + { + // Wait for a reference time request. + DatagramPacket packet = new DatagramPacket(buf, buf.length); + bool timedOut = false; + + try + { + socket.receive(packet); + } + catch (SocketTimeoutException e) + { + timedOut = true; + } + + if (!timedOut) + { + // Work out from the received packet, where to reply to. + InetAddress address = packet.getAddress(); + int port = packet.getPort(); + + // Respond to the time request by sending back the local clock as the reference time. + bbuf.putLong(System.nanoTime()); + bbuf.flip(); + packet = new DatagramPacket(bbuf.array(), bbuf.capacity(), address, port); + + socket.send(packet); + } + } + catch (IOException e) + { + publish = false; + } + } + + socket.close(); + } + + /// + /// Supplies a shutdown hook. + /// + /// The shut down hook. + public Thread getShutdownHook() + { + return new Thread(new Runnable() + { + public void run() + { + publish = false; + } + }); + } + + /// + /// For testing purposes. Runs a reference clock on the default port. + /// + /// None. + public static void main(String[] args) + { + try + { + // Create the clock reference service. + UDPClockReference clock = new UDPClockReference(); + + // Set up a shutdown hook for it. + Runtime.getRuntime().addShutdownHook(clock.getShutdownHook()); + + // Start the service. + clock.run(); + } + catch (Exception e) + { + e.printStackTrace(); + System.exit(1); + } + } + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; + +using uk.co.thebadgerset.junit.extensions.util.CommandLineParser; +using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; + +using java.io.IOException; +using java.net.*; +using java.nio.ByteBuffer; +using java.util.Arrays; + +namespace Apache.Qpid.Integration.Tests.framework.clocksynch +{ + /// + /// UDPClockSynchronizer is a that sends pings as UDP datagrams, and uses the following simple + /// algorithm to perform clock synchronization: + /// + ///
    + ///
  1. Slave initiates synchronization with a Reference clock.
  2. + ///
  3. Slave stamps current local time on a "time request" message and sends to the Reference.
  4. + ///
  5. Upon receipt by Reference, Reference stamps Reference-time and returns.
  6. + ///
  7. Upon receipt by Slave, Slave subtracts current time from sent time and divides by two to compute latency. It + /// subtracts current time from Reference time to determine Slave-Reference time delta and adds in the + /// half-latency to get the correct clock delta.
  8. + ///
  9. The first result is immediately used to update the clock since it will get the local clock into at least + /// the right ballpark.
  10. + ///
  11. The Slave repeats steps 2 through 4, 15 more times.
  12. + ///
  13. The results of the packet receipts are accumulated and sorted in lowest-latency to highest-latency order. The + /// median latency is determined by picking the mid-point sample from this ordered list.
  14. + ///
  15. All samples outside 1 standard-deviation from the median are discarded and the remaining samples + /// are averaged using an arithmetic mean.
  16. + ///
+ /// + ///

The use of UDP datagrams, instead of TCP based communication eliminates the hidden delays that TCP can introduce, + /// as it can transparently re-order or re-send packets, or introduce delays as packets are naggled. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Trigger a clock synchronziation. + ///
Compute a clock delta to apply to the local clock. + ///
Estimate the error in the synchronzation. + ///
+ ///

+ public class UDPClockSynchronizer : ClockSynchronizer + { + /// Used for debugging. + // private static ILog log = LogManager.GetLogger(typeof(UDPClockSynchronizer)); + + /// Defines the timeout to use when waiting for responses to time requests. + private static final int TIMEOUT = 50; + + /// The clock delta. + private long delta = 0L; + + /// Holds an estimate of the clock error relative to the reference clock. + private long epsilon = 0L; + + /// Holds the address of the reference clock. + private InetAddress referenceAddress; + + /// Holds the socket to communicate with the reference service over. + private DatagramSocket socket; + + /// Used to control the shutdown in the main test loop. + private static bool doSynch = true; + + /// + /// Creates a clock synchronizer against the specified address for the reference. + /// + /// The address of the reference service. + public UDPClockSynchronizer(string address) + { + try + { + referenceAddress = InetAddress.getByName(address); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + } + + /// + /// The slave side should call this to compute a clock delta with the reference. + /// + /// If synchronization cannot be achieved, due to unavailability of the reference + /// time service. + public void synch() throws ClockSynchFailureException + { + try + { + socket = new DatagramSocket(); + socket.setSoTimeout(TIMEOUT); + + // Synchronize on a single ping, to get the clock into the right ball-park. + synch(1); + + // Synchronize on 15 pings. + synch(15); + + // And again, for greater accuracy, on 31. + synch(31); + + socket.close(); + } + catch (SocketException e) + { + throw new RuntimeException(e); + } + } + + /// + /// Updates the synchronization delta by performing the specified number of reference clock requests. + /// + /// The number of reference clock request cycles to perform. + /// + /// If synchronization cannot be achieved, due to unavailability of the reference + /// time service. + protected void synch(int n) throws ClockSynchFailureException + { + // log.debug("protected void synch(int n = " + n + "): called"); + + // Create an array of deltas by performing n reference pings. + long[] delta = new long[n]; + + for (int i = 0; i < n; i++) + { + delta[i] = ping(); + } + + // Reject any deltas that are larger than 1 s.d. above the median. + long median = median(delta); + long sd = standardDeviation(delta); + + // log.debug("median = " + median); + // log.debug("sd = " + sd); + + long[] tempDeltas = new long[n]; + int count = 0; + + for (int i = 0; i < n; i++) + { + if ((delta[i] <= (median + sd)) && (delta[i] >= (median - sd))) + { + tempDeltas[count] = delta[i]; + count++; + } + else + { + // log.debug("Rejected: " + delta[i]); + } + } + + System.arraycopy(tempDeltas, 0, delta, 0, count); + + // Estimate the delta as the mean of the remaining deltas. + this.delta += mean(delta); + + // Estimate the error as the standard deviation of the remaining deltas. + this.epsilon = standardDeviation(delta); + + // log.debug("this.delta = " + this.delta); + // log.debug("this.epsilon = " + this.epsilon); + } + + /// + /// Performs a single reference clock request cycle and returns the estimated delta relative to the local clock. + /// This is computed as the half-latency of the requst cycle, plus the reference clock, minus the local clock. + /// + /// The estimated clock delta. + /// + /// If the reference service is not responding. + protected long ping() throws ClockSynchFailureException + { + // log.debug("protected long ping(): called"); + + try + { + byte[] buf = new byte[256]; + + bool timedOut = false; + long start = 0L; + long refTime = 0L; + long localTime = 0L; + long latency = 0L; + int failCount = 0; + + // Keep trying the ping until it gets a response, or 10 tries in a row all time out. + do + { + // Start timing the request latency. + start = nanoTime(); + + // Get the reference time. + DatagramPacket packet = + new DatagramPacket(buf, buf.length, referenceAddress, UDPClockReference.REFERENCE_PORT); + socket.send(packet); + packet = new DatagramPacket(buf, buf.length); + + timedOut = false; + + try + { + socket.receive(packet); + } + catch (SocketTimeoutException e) + { + timedOut = true; + failCount++; + + continue; + } + + ByteBuffer bbuf = ByteBuffer.wrap(packet.getData()); + refTime = bbuf.getLong(); + + // Stop timing the request latency. + localTime = nanoTime(); + latency = localTime - start; + + // log.debug("refTime = " + refTime); + // log.debug("localTime = " + localTime); + // log.debug("start = " + start); + // log.debug("latency = " + latency); + // log.debug("delta = " + ((latency / 2) + (refTime - localTime))); + + } + while (timedOut && (failCount < 10)); + + // Fail completely if the fail count is too high. + if (failCount >= 10) + { + throw new ClockSynchFailureException("Clock reference not responding.", null); + } + + // Estimate delta as (ref clock + half-latency) - local clock. + return (latency / 2) + (refTime - localTime); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + /// + /// Gets the clock delta in nano seconds. + /// + /// The clock delta in nano seconds. + public long getDelta() + { + return delta; + } + + /// + /// Gets an estimate of the clock error in nan seconds. + /// + /// An estimate of the clock error in nan seconds. + public long getEpsilon() + { + return epsilon; + } + + /// + /// Gets the local clock time with any computed delta added in. + /// + /// The local clock time with any computed delta added in. + public long nanoTime() + { + return System.nanoTime() + delta; + } + + /// + /// Computes the median of a series of values. + /// + /// The values. + /// + /// The median. + public static long median(long[] values) + { + // log.debug("public static long median(long[] values = " + Arrays.ToString(values) + "): called"); + + long median; + + // Order the list of values. + long[] orderedValues = new long[values.length]; + System.arraycopy(values, 0, orderedValues, 0, values.length); + Arrays.sort(orderedValues); + + // Check if the median is computed from a pair of middle value. + if ((orderedValues.length % 2) == 0) + { + int middle = orderedValues.length / 2; + + median = (orderedValues[middle] + orderedValues[middle - 1]) / 2; + } + // The median is computed from a single middle value. + else + { + median = orderedValues[orderedValues.length / 2]; + } + + // log.debug("median = " + median); + + return median; + } + + /// + /// Computes the mean of a series of values. + /// + /// The values. + /// + /// The mean. + public static long mean(long[] values) + { + // log.debug("public static long mean(long[] values = " + Arrays.ToString(values) + "): called"); + + long total = 0L; + + for (long value : values) + { + total += value; + } + + long mean = total / values.length; + + // log.debug("mean = " + mean); + + return mean; + } + + /// + /// Computes the variance of series of values. + /// + /// The values. + /// + /// The variance of the values. + public static long variance(long[] values) + { + // log.debug("public static long variance(long[] values = " + Arrays.ToString(values) + "): called"); + + long mean = mean(values); + + long totalVariance = 0; + + for (long value : values) + { + long diff = (value - mean); + totalVariance += diff/// diff; + } + + long variance = totalVariance / values.length; + + // log.debug("variance = " + variance); + + return variance; + } + + /// + /// Computes the standard deviation of a series of values. + /// + /// The values. + /// + /// The standard deviation. + public static long standardDeviation(long[] values) + { + // log.debug("public static long standardDeviation(long[] values = " + Arrays.ToString(values) + "): called"); + + long sd = Double.valueOf(Math.sqrt(variance(values))).longValue(); + + // log.debug("sd = " + sd); + + return sd; + } + + /// + /// For testing purposes. Supply address of reference clock as arg 1. + /// + /// Address of reference clock as arg 1. + public static void main(String[] args) + { + ParsedProperties options = + new ParsedProperties(CommandLineParser.processCommandLine(args, + new CommandLineParser( + new String[][] + { + { "1", "Address of clock reference service.", "address", "true" } + }), System.getProperties())); + + string address = options.getProperty("1"); + + // Create a clock synchronizer. + UDPClockSynchronizer clockSyncher = new UDPClockSynchronizer(address); + + // Set up a shutdown hook for it. + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() + { + public void run() + { + doSynch = false; + } + })); + + // Repeat the clock synching until the user kills the progam. + while (doSynch) + { + // Perform a clock clockSynch. + try + { + clockSyncher.synch(); + + // Print out the clock delta and estimate of the error. + System.out.println("Delta = " + clockSyncher.getDelta()); + System.out.println("Epsilon = " + clockSyncher.getEpsilon()); + + try + { + Thread.sleep(250); + } + catch (InterruptedException e) + { + // Restore the interrupted status and terminate the loop. + Thread.currentThread().interrupt(); + doSynch = false; + } + } + // Terminate if the reference time service is unavailable. + catch (ClockSynchFailureException e) + { + doSynch = false; + } + } + } + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; + +using Apache.Qpid.Integration.Tests.framework.*; +using org.apache.qpid.util.ConversationFactory; + +using uk.co.thebadgerset.junit.extensions.TimingController; +using uk.co.thebadgerset.junit.extensions.TimingControllerAware; +using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; + +using javax.jms.Destination; +using javax.jms.JMSException; +using javax.jms.Message; +using javax.jms.Session; + +using System.Collections.Generic.LinkedList; +using System.Collections.Generic.IList; + +namespace Apache.Qpid.Integration.Tests.framework.distributedcircuit +{ + /// + /// DistributedCircuitImpl is a distributed implementation of the test . Many publishers and receivers + /// accross multiple machines may be combined to form a single test circuit. The test circuit extracts reports from + /// all of its publishers and receivers, and applies its assertions to these reports. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Supply the publishing and receiving ends of a test messaging circuit. + ///
Start the circuit running. + ///
Close the circuit down. + ///
Take a reading of the circuits state. + ///
Apply assertions against the circuits state. + ///
Send test messages over the circuit. + ///
Perform the default test procedue on the circuit. + ///
+ ///

+ /// + /// There is a short pause after receiving sender reports before asking for receiver reports, because receivers may + /// not have finished receiving all their test messages before the report request arrives. This is going to be a + /// problem for taking test timings and needs to be eliminiated. Suggested solution: have receiver send back reports + /// asynchronously, on test batch size boundaries, and do so automatically rather than having to have the report + /// request sent to them. Number each test run, or otherwise uniquely identify it, when a receiver does not get + /// any more messages on a test run for more than a timeout, it can assume the test is complete and send a final + /// report. On the coordinator end a future will need to be created to wait for all final reports to come in, and + /// to register results and timings for the test. This must work in such a way that a new test cycle can be started + /// without waiting for the results of the old one to come in. + /// + /// Add in setting of timing controller, from timing aware test cases. + public class DistributedCircuitImpl : Circuit, TimingControllerAware + { + /// Used for debugging purposes. + private static ILog log = LogManager.GetLogger(typeof(DistributedCircuitImpl)); + + /// Holds the conversation factory over which to coordinate the test. + protected ConversationFactory conversationFactory; + + /// Holds the controlSession over which to hold the control conversation. + protected Session controlSession; + + /// Holds the sender nodes in the test circuit. + protected IList senders; + + /// Holds the receiver nodes in the test circuit. + protected IList receivers; + + /// Holds the sender control conversations. + protected ConversationFactory.Conversation[] senderConversation; + + /// Holds the receiver control conversations. + protected ConversationFactory.Conversation[] receiverConversation; + + /// Holds the control topics for the senders in the test circuit. + protected Destination[] senderControlTopic; + + /// Holds the control topics for the receivers in the test circuit. + protected Destination[] receiverControlTopic; + + /// Holds the number of messages to send per test run. + protected int numMessages; + + /// + /// Holds the timing controller for the circuit. This is used to log test times asynchronously, when reciever nodes + /// return their reports after senders have completed a test case. + TimingController timingController; + + /// + /// Creates a distributed test circuit on the specified senders and receivers. + /// + /// The controlSession for all control conversations. + /// The senders. + /// The receivers. + /// A control conversation with the senders. + /// A control conversation with the receivers. + /// The senders control topic. + /// The receivers control topic. + protected DistributedCircuitImpl(Session session, IList senders, List receivers, + ConversationFactory.Conversation[] senderConversation, ConversationFactory.Conversation[] receiverConversation, + Destination[] senderControlTopic, Destination[] receiverControlTopic) + { + this.controlSession = session; + this.senders = senders; + this.receivers = receivers; + this.senderConversation = senderConversation; + this.receiverConversation = receiverConversation; + this.senderControlTopic = senderControlTopic; + this.receiverControlTopic = receiverControlTopic; + } + + /// + /// Creates a distributed test circuit from the specified test parameters, on the senders and receivers + /// given. + /// + /// The test parameters. + /// The sender ends in the test circuit. + /// The receiver ends in the test circuit. + /// A conversation factory for creating the control conversations with senders and receivers. + /// + /// A connected and ready to start, test circuit. + public static Circuit createCircuit(ParsedProperties testProps, IList senders, + IList receivers, ConversationFactory conversationFactory) + { + log.debug("public static Circuit createCircuit(ParsedProperties testProps, IList senders, " + + " IList receivers, ConversationFactory conversationFactory)"); + + try + { + Session session = conversationFactory.getSession(); + + // Create control conversations with each of the senders. + ConversationFactory.Conversation[] senderConversation = new ConversationFactory.Conversation[senders.size()]; + Destination[] senderControlTopic = new Destination[senders.size()]; + + for (int i = 0; i < senders.size(); i++) + { + TestClientDetails sender = senders.get(i); + + senderControlTopic[i] = session.createTopic(sender.privateControlKey); + senderConversation[i] = conversationFactory.startConversation(); + } + + log.debug("Sender conversations created."); + + // Create control conversations with each of the receivers. + ConversationFactory.Conversation[] receiverConversation = new ConversationFactory.Conversation[receivers.size()]; + Destination[] receiverControlTopic = new Destination[receivers.size()]; + + for (int i = 0; i < receivers.size(); i++) + { + TestClientDetails receiver = receivers.get(i); + + receiverControlTopic[i] = session.createTopic(receiver.privateControlKey); + receiverConversation[i] = conversationFactory.startConversation(); + } + + log.debug("Receiver conversations created."); + + // Assign the sender role to each of the sending test clients. + for (int i = 0; i < senders.size(); i++) + { + TestClientDetails sender = senders.get(i); + + Message assignSender = conversationFactory.getSession().createMessage(); + TestUtils.setPropertiesOnMessage(assignSender, testProps); + assignSender.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE"); + assignSender.setStringProperty("ROLE", "SENDER"); + + senderConversation[i].send(senderControlTopic[i], assignSender); + } + + log.debug("Sender role assignments sent."); + + // Assign the receivers role to each of the receiving test clients. + for (int i = 0; i < receivers.size(); i++) + { + TestClientDetails receiver = receivers.get(i); + + Message assignReceiver = session.createMessage(); + TestUtils.setPropertiesOnMessage(assignReceiver, testProps); + assignReceiver.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE"); + assignReceiver.setStringProperty("ROLE", "RECEIVER"); + + receiverConversation[i].send(receiverControlTopic[i], assignReceiver); + } + + log.debug("Receiver role assignments sent."); + + // Wait for the senders and receivers to confirm their roles. + for (int i = 0; i < senders.size(); i++) + { + senderConversation[i].receive(); + } + + log.debug("Got all sender role confirmations"); + + for (int i = 0; i < receivers.size(); i++) + { + receiverConversation[i].receive(); + } + + log.debug("Got all receiver role confirmations"); + + // Package everything up as a circuit. + return new DistributedCircuitImpl(session, senders, receivers, senderConversation, receiverConversation, + senderControlTopic, receiverControlTopic); + } + catch (JMSException e) + { + throw new RuntimeException("JMSException not handled."); + } + } + + /// + /// Used by tests cases that can supply a to set the + /// controller on an aware test. + /// + /// The timing controller. + public void setTimingController(TimingController controller) + { + this.timingController = controller; + } + + /// + /// Gets the interface on the publishing end of the circuit. + /// + /// The publishing end of the circuit. + public Publisher getPublisher() + { + throw new RuntimeException("Not Implemented."); + } + + /// + /// Gets the interface on the receiving end of the circuit. + /// + /// The receiving end of the circuit. + public Receiver getReceiver() + { + throw new RuntimeException("Not Implemented."); + } + + /// + /// Connects and starts the circuit. After this method is called the circuit is ready to send messages. + public void start() + { + log.debug("public void start(): called"); + + try + { + // Start the test on each of the senders. + Message start = controlSession.createMessage(); + start.setStringProperty("CONTROL_TYPE", "START"); + start.setIntProperty("MESSAGE_COUNT", numMessages); + + for (int i = 0; i < senders.size(); i++) + { + senderConversation[i].send(senderControlTopic[i], start); + } + + log.debug("All senders told to start their tests."); + } + catch (JMSException e) + { + throw new RuntimeException("Unhandled JMSException.", e); + } + } + + /// + /// Checks the test circuit. The effect of this is to gather the circuits state, for both ends of the circuit, + /// into a report, against which assertions may be checked. + /// + /// Replace the asynch receiver report thread with a choice of direct or asynch executor, so that asynch + /// or synch logging of test timings is optional. Also need to provide an onMessage method that is capable + /// of receiving timing reports that receivers will generate during an ongoing test, on the test sample + /// size boundaries. The message timing logging code should be factored out as a common method that can + /// be called in response to the final report responses, or the onMessage method. Another alternative is + /// to abandon the final report request altogether and just use the onMessage method? I think the two + /// differ though, as the final report is used to apply assertions, and the ongoing report is just for + /// periodic timing results... In which case, maybe there needs to be a way for the onMessage method + /// to process just some of the incoming messages, and forward the rest on to the conversion helper, as + /// a sort of pre-conversation helper filter? Make conversation expose its onMessage method (it should + /// already) and allow another delivery thread to filter the incoming messages to the conversation. + public void check() + { + log.debug("public void check(): called"); + + try + { + // Wait for all the test senders to return their reports. + for (int i = 0; i < senders.size(); i++) + { + Message senderReport = senderConversation[i].receive(); + log.debug("Sender " + senderReport.getStringProperty("CLIENT_NAME") + " reports message count: " + + senderReport.getIntProperty("MESSAGE_COUNT")); + log.debug("Sender " + senderReport.getStringProperty("CLIENT_NAME") + " reports message time: " + + senderReport.getLongProperty("TEST_TIME")); + } + + log.debug("Got all sender test reports."); + + // Apply sender assertions to pass/fail the tests. + + // Inject a short pause to give the receivers time to finish receiving their test messages. + TestUtils.pause(500); + + // Ask the receivers for their reports. + Message statusRequest = controlSession.createMessage(); + statusRequest.setStringProperty("CONTROL_TYPE", "STATUS_REQUEST"); + + for (int i = 0; i < receivers.size(); i++) + { + receiverConversation[i].send(receiverControlTopic[i], statusRequest); + } + + log.debug("All receiver test reports requested."); + + // Wait for all receiver reports to come in, but do so asynchronously. + Runnable gatherAllReceiverReports = + new Runnable() + { + public void run() + { + try + { + // Wait for all the receivers to send their reports. + for (int i = 0; i < receivers.size(); i++) + { + Message receiverReport = receiverConversation[i].receive(); + + string clientName = receiverReport.getStringProperty("CLIENT_NAME"); + int messageCount = receiverReport.getIntProperty("MESSAGE_COUNT"); + long testTime = receiverReport.getLongProperty("TEST_TIME"); + + log.debug("Receiver " + clientName + " reports message count: " + messageCount); + log.debug("Receiver " + receiverReport.getStringProperty("CLIENT_NAME") + + " reports message time: " + testTime); + + // Apply receiver assertions to pass/fail the tests. + + // Log the test timings on the asynchronous test timing controller. + /*try + { + timingController.completeTest(true, messageCount, testTime); + } + // The timing controll can throw InterruptedException is the current test is to be + // interrupted. + catch (InterruptedException e) + { + e.printStackTrace(); + }*/ + } + + log.debug("All receiver test reports received."); + } + catch (JMSException e) + { + throw new RuntimeException(e); + } + } + }; + + Thread receiverReportsThread = new Thread(gatherAllReceiverReports); + receiverReportsThread.start(); + + // return new Message[] { senderReport, receiverReport }; + + } + catch (JMSException e) + { + throw new RuntimeException("Unhandled JMSException.", e); + } + } + + /// Closes the circuit. All associated resources are closed. + public void close() + { + log.debug("public void close(): called"); + + // End the current test on all senders and receivers. + } + + /// + /// Applies a list of assertions against the test circuit. The method should be called before doing + /// this, to ensure that the circuit has gathered its state into a report to assert against. + /// + /// The list of assertions to apply. + /// + /// Any assertions that failed. + public IList applyAssertions(List assertions) + { + log.debug("public IList applyAssertions(List assertions = " + assertions + "): called"); + + IList failures = new LinkedList(); + + for (Assertion assertion : assertions) + { + if (!assertion.apply()) + { + failures.add(assertion); + } + } + + return failures; + } + + /// + /// Runs the default test procedure against the circuit, and checks that all of the specified assertions hold. + /// + /// The number of messages to send using the default test procedure. + /// The list of assertions to apply. + /// + /// Any assertions that failed. + /// + /// From check onwards needs to be handled as a future. The future must call back onto the test case to + /// report results asynchronously. + public IList test(int numMessages, List assertions) + { + log.debug("public IList test(int numMessages = " + numMessages + ", List assertions = " + + assertions + "): called"); + + // Keep the number of messages to send per test run, where the send method can reference it. + this.numMessages = numMessages; + + // Start the test running on all sender circuit ends. + start(); + + // Request status reports to be handed in. + check(); + + // Assert conditions on the publishing end of the circuit. + // Assert conditions on the receiving end of the circuit. + IList failures = applyAssertions(assertions); + + // Close the circuit ending the current test case. + close(); + + // Pass with no failed assertions or fail with a list of failed assertions. + return failures; + } + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using Apache.Qpid.Integration.Tests.framework.Assertion; +using Apache.Qpid.Integration.Tests.framework.Publisher; + +using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; + +namespace Apache.Qpid.Integration.Tests.framework.distributedcircuit +{ + /// + /// DistributedPublisherImpl represents the status of the publishing side of a test circuit. Its main purpose is to + /// provide assertions that can be applied to verify the behaviour of a non-local publisher. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Provide assertion that the publishers received no exceptions. + ///
Provide assertion that the publishers received a no consumers error code on every message. + ///
Provide assertion that the publishers received a no route error code on every message. + ///
+ ///

+ public class DistributedPublisherImpl : Publisher + { + /// + /// Provides an assertion that the publisher encountered no exceptions. + /// + /// The test configuration properties. + /// An assertion that the publisher encountered no exceptions. + public Assertion noExceptionsAssertion(ParsedProperties testProps) + { + throw new RuntimeException("Not implemented."); + } + + /// + /// Provides an assertion that the publisher got a no consumers exception on every message. + /// + /// An assertion that the publisher got a no consumers exception on every message. + public Assertion noConsumersAssertion() + { + throw new RuntimeException("Not implemented."); + } + + /// + /// Provides an assertion that the publisher got a no rout exception on every message. + /// + /// An assertion that the publisher got a no rout exception on every message. + public Assertion noRouteAssertion() + { + throw new RuntimeException("Not implemented."); + } + + /// + /// Provides an assertion that the AMQP channel was forcibly closed by an error condition. + /// + /// The test configuration properties. + /// An assertion that the AMQP channel was forcibly closed by an error condition. + public Assertion channelClosedAssertion(ParsedProperties testProps) + { + throw new RuntimeException("Not implemented."); + } + + /// + /// Provides an assertion that the publisher got a given exception during the test. + /// + /// The test configuration properties. + /// The exception class to check for. + /// An assertion that the publisher got a given exception during the test. + public Assertion exceptionAssertion(ParsedProperties testProps, Class exceptionClass) + { + throw new RuntimeException("Not implemented."); + } + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using Apache.Qpid.Integration.Tests.framework.Assertion; +using Apache.Qpid.Integration.Tests.framework.Receiver; + +using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; + +namespace Apache.Qpid.Integration.Tests.framework.distributedcircuit +{ + /// + /// DistributedReceiverImpl represents the status of the receiving side of a test circuit. Its main purpose is to + /// provide assertions that can be applied to verify the behaviour of a non-local receiver. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Provide assertion that the receivers received no exceptions. + ///
Provide assertion that the receivers received all test messages sent to it. + ///
+ ///

+ public class DistributedReceiverImpl : Receiver + { + /// + /// Provides an assertion that the receivers encountered no exceptions. + /// + /// The test configuration properties. + /// An assertion that the receivers encountered no exceptions. + public Assertion noExceptionsAssertion(ParsedProperties testProps) + { + throw new RuntimeException("Not implemented."); + } + + /// + /// Provides an assertion that the receivers got all messages that were sent to it. + /// + /// The test configuration properties. + /// An assertion that the receivers got all messages that were sent to it. + public Assertion allMessagesReceivedAssertion(ParsedProperties testProps) + { + throw new RuntimeException("Not implemented."); + } + + /// + /// Provides an assertion that the receivers got none of the messages that were sent to it. + /// + /// The test configuration properties. + /// An assertion that the receivers got none of the messages that were sent to it. + public Assertion noMessagesReceivedAssertion(ParsedProperties testProps) + { + throw new RuntimeException("Not implemented."); + } + + /// + /// Provides an assertion that the AMQP channel was forcibly closed by an error condition. + /// + /// The test configuration properties. + /// An assertion that the AMQP channel was forcibly closed by an error condition. + public Assertion channelClosedAssertion(ParsedProperties testProps) + { + throw new RuntimeException("Not implemented."); + } + + /// + /// Provides an assertion that the receiver got a given exception during the test. + /// + /// The test configuration properties. + /// The exception class to check for. An assertion that the receiver got a given exception during the test. + public Assertion exceptionAssertion(ParsedProperties testProps, Class exceptionClass) + { + throw new RuntimeException("Not implemented."); + } + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; + +using Apache.Qpid.Integration.Tests.framework.*; +using Apache.Qpid.Integration.Tests.framework.distributedtesting.TestClientControlledTest; + +using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; +using uk.co.thebadgerset.junit.extensions.util.TestContextProperties; + +using javax.jms.*; + +namespace Apache.Qpid.Integration.Tests.framework.distributedcircuit +{ + /// + /// A TestClientCircuitEnd is a that may be controlled from a + /// , and that forms a single publishing or + /// receiving end point in a distributed test . + /// + ///

When operating in the SENDER role, this circuit end is capable of acting as part of the default circuit test + /// procedure (described in the class comment for ). That is, it will + /// send the number of test messages required, using the test configuration parameters given in the test invite, and + /// return a report on its activities to the circuit controller. + /// + ///

When operation in the RECEIVER role, this circuit end acts as part of the default circuit test procedure. It will + /// receive test messages, on the setup specified in the test configuration parameters, and keep count of the messages + /// received, and time taken to receive them. When requested by the circuit controller to provide a report, it will + /// return this report of its activities. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Provide a message producer for sending messages. + /// , , + ///
Provide a message consumer for receiving messages. + /// , , + ///
Supply the name of the test case that this implements. + ///
Accept/Reject invites based on test parameters. + ///
Adapt to assigned roles. + ///
Perform test case actions. + ///
Generate test reports. + ///
+ ///

+ public class TestClientCircuitEnd : CircuitEnd, TestClientControlledTest + { + /// Used for debugging. + private static ILog log = LogManager.GetLogger(typeof(TestClientCircuitEnd)); + + /// Holds the test parameters. + ParsedProperties testProps; + + /// The number of test messages to send. + private int numMessages; + + /// The role to be played by the test. + private Roles role; + + /// The connection to send the test messages on. + private Connection connection; + + /// Holds the circuit end for this test. + CircuitEnd circuitEnd; + + /// + /// Holds a message monitor for this circuit end, either the monitor on the consumer when in RECEIVER more, or + /// a monitor updated on every message sent, when acting as a SENDER. + MessageMonitor messageMonitor; + + /// + /// Should provide the name of the test case that this class implements. The exact names are defined in the + /// interop testing spec. + /// + /// The name of the test case that this implements. + public string getName() + { + return "DEFAULT_CIRCUIT_TEST"; + } + + /// + /// Determines whether the test invite that matched this test case is acceptable. + /// + /// The invitation to accept or reject. + /// true to accept the invitation, false to reject it. + /// + /// Any JMSException resulting from reading the message are allowed to fall through. + public bool acceptInvite(Message inviteMessage) throws JMSException + { + log.debug("public bool acceptInvite(Message inviteMessage): called"); + + // Populate the test parameters from the invitation. + testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults); + + for (Object key : testProps.keySet()) + { + string propName = (String) key; + + // If the test parameters is overridden by the invitation, use it instead. + string inviteValue = inviteMessage.getStringProperty(propName); + + if (inviteValue != null) + { + testProps.setProperty(propName, inviteValue); + log.debug("Test invite supplied override to " + propName + " of " + inviteValue); + } + + } + + // Accept the invitation. + return true; + } + + /// + /// Assigns the role to be played by this test case. The test parameters are fully specified in the + /// assignment message. When this method return the test case will be ready to execute. + /// + /// The role to be played; sender or receivers. + /// The role assingment message, contains the full test parameters. + /// + /// Any JMSException resulting from reading the message are allowed to fall through. + public void assignRole(Roles role, Message assignRoleMessage) throws JMSException + { + log.debug("public void assignRole(Roles role, Message assignRoleMessage): called"); + + // Take note of the role to be played. + this.role = role; + + // Extract and retain the test parameters. + numMessages = 1; // assignRoleMessage.getIntProperty("NUM_MESSAGES"); + + // Connect using the test parameters. + connection = TestUtils.createConnection(testProps); + + // Create a circuit end that matches the assigned role and test parameters. + LocalCircuitFactory circuitFactory = new LocalCircuitFactory(); + + switch (role) + { + // Check if the sender role is being assigned, and set up a message producer if so. + case SENDER: + + // Set up the publisher. + circuitEnd = circuitFactory.createPublisherCircuitEnd(connection, testProps, 0L); + + // Create a custom message monitor that will be updated on every message sent. + messageMonitor = new MessageMonitor(); + + break; + + // Otherwise the receivers role is being assigned, so set this up to listen for messages. + case RECEIVER: + + // Set up the receiver. + circuitEnd = circuitFactory.createReceiverCircuitEnd(connection, testProps, 0L); + + // Use the message monitor from the consumer for stats. + messageMonitor = getMessageMonitor(); + + break; + } + + // Reset all messaging stats for the report. + messageMonitor.reset(); + + connection.start(); + } + + /// + /// Performs the test case actions. Returning from here, indicates that the sending role has completed its test. + /// + /// The number of test messages to send. + /// + /// Any JMSException resulting from reading the message are allowed to fall through. + /// + /// Add round robin on destinations where multiple destinations being used. + /// + /// Add rate limiting when rate limit specified on publishers. + /// + /// Add Max pending message size protection. The receiver will have to send back some acks once in a while, + /// to notify the publisher that its messages are being consumed. This makes the safety valve harder to + /// implement than in the single VM case. For example, if the limit is 1000 messages, might want to get back + /// an ack every 500, to notify the publisher that it can keep sending. What about pub/sub tests? Will it be + /// necessary to wait for an ack from every receiver? This will have the effect of rate limiting to slow + /// consumers too. + /// + /// Add commits on every commit batch size boundary. + public void start(int numMessages) throws JMSException + { + log.debug("public void start(): called"); + + // If in the SENDER role, send the specified number of test messages to the circuit destinations. + if (role.equals(Roles.SENDER)) + { + Message testMessage = getSession().createMessage(); + + for (int i = 0; i < numMessages; i++) + { + getProducer().send(testMessage); + + // Increment the message count and timings. + messageMonitor.onMessage(testMessage); + } + } + } + + /// + /// Gets a report on the actions performed by the test case in its assigned role. + /// + /// The controlSession to create the report message in. + /// The report message. + /// + /// Any JMSExceptions resulting from creating the report are allowed to fall through. + public Message getReport(Session session) throws JMSException + { + Message report = session.createMessage(); + report.setStringProperty("CONTROL_TYPE", "REPORT"); + + // Add the count of messages sent/received to the report. + report.setIntProperty("MESSAGE_COUNT", messageMonitor.getNumMessage()); + + // Add the time to send/receive messages to the report. + report.setLongProperty("TEST_TIME", messageMonitor.getTime()); + + // Add any exceptions detected to the report. + + return report; + } + + /// + /// Gets the message producer at this circuit end point. + /// + /// The message producer at with this circuit end point. + public MessageProducer getProducer() + { + return circuitEnd.getProducer(); + } + + /// + /// Gets the message consumer at this circuit end point. + /// + /// The message consumer at this circuit end point. + public MessageConsumer getConsumer() + { + return circuitEnd.getConsumer(); + } + + /// + /// Send the specified message over the producer at this end point. + /// + /// The message to send. + /// + /// Any JMS exception occuring during the send is allowed to fall through. + public void send(Message message) throws JMSException + { + // Send the message on the circuit ends producer. + circuitEnd.send(message); + } + + /// + /// Gets the JMS Session associated with this circuit end point. + /// + /// The JMS Session associated with this circuit end point. + public Session getSession() + { + return circuitEnd.getSession(); + } + + /// + /// Closes the message producers and consumers and the sessions, associated with this circuit end point. + /// + /// Any JMSExceptions occurring during the close are allowed to fall through. + public void close() throws JMSException + { + // Close the producer and consumer. + circuitEnd.close(); + } + + /// + /// Returns the message monitor for reporting on received messages on this circuit end. + /// + /// The message monitor for this circuit end. + public MessageMonitor getMessageMonitor() + { + return circuitEnd.getMessageMonitor(); + } + + /// + /// Returns the exception monitor for reporting on exceptions received on this circuit end. + /// + /// The exception monitor for this circuit end. + public ExceptionMonitor getExceptionMonitor() + { + return circuitEnd.getExceptionMonitor(); + } + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using junit.framework.Test; +using junit.framework.TestResult; +using junit.framework.TestSuite; + +using log4net; +using org.apache.log4j.NDC; + +using Apache.Qpid.Integration.Tests.framework.FrameworkBaseCase; +using Apache.Qpid.Integration.Tests.framework.MessagingTestConfigProperties; +using Apache.Qpid.Integration.Tests.framework.TestClientDetails; +using Apache.Qpid.Integration.Tests.framework.TestUtils; +using Apache.Qpid.Integration.Tests.framework.clocksynch.UDPClockReference; +using org.apache.qpid.util.ConversationFactory; + +using uk.co.thebadgerset.junit.extensions.TKTestRunner; +using uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator; +using uk.co.thebadgerset.junit.extensions.util.CommandLineParser; +using uk.co.thebadgerset.junit.extensions.util.MathUtils; +using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; +using uk.co.thebadgerset.junit.extensions.util.TestContextProperties; + +using javax.jms.*; + +using java.net.InetAddress; +using java.util.*; +using java.util.concurrent.LinkedBlockingQueue; + +namespace Apache.Qpid.Integration.Tests.framework.distributedtesting +{ + /// + ///

Implements the coordinator client described in the interop testing specification + /// (http://cwiki.apache.org/confluence/display/qpid/Interop+Testing+Specification). This coordinator is built on + /// top of the JUnit testing framework. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Find out what test clients are available. + ///
Decorate available tests to run on all available clients. + ///
Attach XML test result logger. + ///
Terminate the interop testing framework. + ///
+ ///

+ /// + /// Should accumulate failures over all tests, and return with success or fail code based on all results. May need + /// to write a special TestResult to do this properly. At the moment only the last one used will be tested for + /// errors, as the start method creates a fresh one for each test case run. + public class Coordinator extends TKTestRunner + { + /// Used for debugging. + private static ILog log = LogManager.GetLogger(typeof(Coordinator)); + + /// Used for reporting to the console. + private static ILog console = LogManager.GetLogger("CONSOLE"); + + /// Defines the possible distributed test engines available to run coordinated test cases with. + public enum TestEngine + { + /// Specifies the interop test engine. This tests all available clients in pairs. + INTEROP, + + /// Specifies the fanout test engine. This sets up one publisher role, and many reciever roles. + FANOUT + } + + /// + /// Holds the test context properties that provides the default test parameters, plus command line overrides. + /// This is initialized with the default test parameters, to which command line overrides may be applied. + protected static ParsedProperties testContextProperties = + TestContextProperties.getInstance(MessagingTestConfigProperties.defaults); + + /// Holds the URL of the broker to coordinate the tests on. + protected string brokerUrl; + + /// Holds the virtual host to coordinate the tests on. If null, then the default virtual host is used. + protected string virtualHost; + + /// Holds the list of all clients that enlisted, when the compulsory invite was issued. + protected Set enlistedClients = new HashSet(); + + /// Holds the conversation helper for the control conversation. + protected ConversationFactory conversationFactory; + + /// Holds the connection that the coordinating messages are sent over. + protected Connection connection; + + /// Holds the path of the directory to output test results too, if one is defined. + protected string reportDir; + + /// Holds the coordinating test engine type to run the tests through. + protected TestEngine engine; + + /// Flag that indicates that all test clients should be terminated upon completion of the test cases. + protected bool terminate; + + /// + /// Creates an interop test coordinator on the specified broker and virtual host. + /// + /// The number of times to repeat the test, or test batch size. + /// The length of time to run the tests for. -1 means no duration has been set. + /// The concurrency levels to ramp up to. + /// A delay in milliseconds between test runs. + /// The sets of 'size' parameters to pass to test. + /// The name of the test case to run. + /// The directory to output the test results to. + /// The name of the test run; used to name the output file. + /// Whether to print comments during test run. + /// The URL of the broker to connect to. + /// The virtual host to run all tests on. Optional, may be null. + /// The distributed test engine type to run the tests with. + /// true if test client nodes should be terminated at the end of the tests. + /// true if the CSV results listener should be attached. + /// true if the XML results listener should be attached. + /// List of factories for user specified decorators. + public Coordinator(Integer repetitions, Long duration, int[] threads, int delay, int[] params, string testCaseName, + string reportDir, string runName, bool verbose, string brokerUrl, string virtualHost, TestEngine engine, + bool terminate, bool csv, bool xml, IList decoratorFactories) + { + super(repetitions, duration, threads, delay, params, testCaseName, reportDir, runName, csv, xml, verbose, + decoratorFactories); + + log.debug("public Coordinator(Integer repetitions = " + repetitions + " , Long duration = " + duration + + ", int[] threads = " + Arrays.ToString(threads) + ", int delay = " + delay + ", int[] params = " + + Arrays.ToString(params) + ", string testCaseName = " + testCaseName + ", string reportDir = " + reportDir + + ", string runName = " + runName + ", bool verbose = " + verbose + ", string brokerUrl = " + brokerUrl + + ", string virtualHost =" + virtualHost + ", TestEngine engine = " + engine + ", bool terminate = " + + terminate + ", bool csv = " + csv + ", bool xml = " + xml + "): called"); + + // Retain the connection parameters. + this.brokerUrl = brokerUrl; + this.virtualHost = virtualHost; + this.reportDir = reportDir; + this.engine = engine; + this.terminate = terminate; + } + + /// + /// The entry point for the interop test coordinator. This client accepts the following command line arguments: + /// + ///

+ ///
-b The broker URL. Mandatory. + ///
-h The virtual host. Optional. + ///
-o The directory to output test results to. Optional. + ///
-e The type of test distribution engine to use. Optional. One of: interop, fanout. + ///
... Free arguments. The distributed test cases to run. + /// Mandatory. At least one must be defined. + ///
name=value Trailing argument define name/value pairs. Added to the test contenxt properties. + /// Optional. + ///
+ ///

+ /// The command line arguments. + public static void main(String[] args) + { + NDC.push("coordinator"); + log.debug("public static void main(String[] args = " + Arrays.ToString(args) + "): called"); + console.info("Qpid Distributed Test Coordinator."); + + // Override the default broker url to be localhost:5672. + testContextProperties.setProperty(MessagingTestConfigProperties.BROKER_PROPNAME, "tcp://localhost:5672"); + + try + { + // Use the command line parser to evaluate the command line with standard handling behaviour (print errors + // and usage then exist if there are errors). + // Any options and trailing name=value pairs are also injected into the test context properties object, + // to override any defaults that may have been set up. + ParsedProperties options = + new ParsedProperties(CommandLineParser.processCommandLine(args, + new CommandLineParser( + new String[][] + { + { "b", "The broker URL.", "broker", "false" }, + { "h", "The virtual host to use.", "virtual host", "false" }, + { "o", "The name of the directory to output test timings to.", "dir", "false" }, + { + "e", "The test execution engine to use. Default is interop.", "engine", "interop", + "^interop$|^fanout$", "true" + }, + { "t", "Terminate test clients on completion of tests.", null, "false" }, + { "-csv", "Output test results in CSV format.", null, "false" }, + { "-xml", "Output test results in XML format.", null, "false" }, + { + "-trefaddr", "To specify an alternative to hostname for time singal reference.", + "address", "false" + }, + { + "c", "The number of tests to run concurrently.", "num", "false", + MathUtils.SEQUENCE_REGEXP + }, + { "r", "The number of times to repeat each test.", "num", "false" }, + { + "d", "The length of time to run the tests for.", "duration", "false", + MathUtils.DURATION_REGEXP + }, + { + "f", "The maximum rate to call the tests at.", "frequency", "false", + "^([1-9][0-9]*)/([1-9][0-9]*)$" + }, + { "s", "The size parameter to run tests with.", "size", "false", MathUtils.SEQUENCE_REGEXP }, + { "v", "Verbose mode.", null, "false" }, + { "n", "A name for this test run, used to name the output file.", "name", "true" }, + { + "X:decorators", "A list of additional test decorators to wrap the tests in.", + "\"class.name[:class.name]*\"", "false" + } + }), testContextProperties)); + + // Extract the command line options. + string brokerUrl = options.getProperty("b"); + string virtualHost = options.getProperty("h"); + string reportDir = options.getProperty("o"); + reportDir = (reportDir == null) ? "." : reportDir; + string testEngine = options.getProperty("e"); + TestEngine engine = "fanout".equals(testEngine) ? TestEngine.FANOUT : TestEngine.INTEROP; + bool terminate = options.getPropertyAsBoolean("t"); + bool csvResults = options.getPropertyAsBoolean("-csv"); + bool xmlResults = options.getPropertyAsBoolean("-xml"); + string threadsstring = options.getProperty("c"); + Integer repetitions = options.getPropertyAsInteger("r"); + string durationstring = options.getProperty("d"); + string paramsstring = options.getProperty("s"); + bool verbose = options.getPropertyAsBoolean("v"); + string testRunName = options.getProperty("n"); + string decorators = options.getProperty("X:decorators"); + + int[] threads = (threadsstring == null) ? null : MathUtils.parseSequence(threadsString); + int[] params = (paramsstring == null) ? null : MathUtils.parseSequence(paramsString); + Long duration = (durationstring == null) ? null : MathUtils.parseDuration(durationString); + + // If broker or virtual host settings were specified as command line options, override the defaults in the + // test context properties with them. + + // Collection all of the test cases to be run. + Collection> testCaseClasses = + new ArrayList>(); + + // Create a list of test decorator factories for use specified decorators to be applied. + IList decoratorFactories = parseDecorators(decorators); + + // Scan for available test cases using a classpath scanner. + // ClasspathScanner.getMatches(DistributedTestCase.class, "^Test.*", true); + + // Hard code the test classes till the classpath scanner is fixed. + // Collections.addAll(testCaseClasses, InteropTestCase1DummyRun.class, InteropTestCase2BasicP2P.class, + // InteropTestCase3BasicPubSub.class); + + // Parse all of the free arguments as test cases to run. + for (int i = 1; true; i++) + { + string nextFreeArg = options.getProperty(Integer.ToString(i)); + + // Terminate the loop once all free arguments have been consumed. + if (nextFreeArg == null) + { + break; + } + + try + { + Class nextClass = Class.forName(nextFreeArg); + + if (FrameworkBaseCase.class.isAssignableFrom(nextClass)) + { + testCaseClasses.add(nextClass); + console.info("Found distributed test case: " + nextFreeArg); + } + } + catch (ClassNotFoundException e) + { + console.info("Unable to instantiate the test case: " + nextFreeArg + "."); + } + } + + // Check that some test classes were actually found. + if (testCaseClasses.isEmpty()) + { + throw new RuntimeException( + "No test cases implementing FrameworkBaseCase were specified on the command line."); + } + + // Extract the names of all the test classes, to pass to the start method. + int i = 0; + String[] testClassNames = new String[testCaseClasses.size()]; + + for (Class testClass : testCaseClasses) + { + testClassNames[i++] = testClass.getName(); + } + + // Create a coordinator and begin its test procedure. + Coordinator coordinator = + new Coordinator(repetitions, duration, threads, 0, params, null, reportDir, testRunName, verbose, brokerUrl, + virtualHost, engine, terminate, csvResults, xmlResults, decoratorFactories); + + TestResult testResult = coordinator.start(testClassNames); + + // Return different error codes, depending on whether or not there were test failures. + if (testResult.failureCount() > 0) + { + System.exit(FAILURE_EXIT); + } + else + { + System.exit(SUCCESS_EXIT); + } + } + catch (Exception e) + { + log.debug("Top level handler caught execption.", e); + console.info(e.getMessage()); + e.printStackTrace(); + System.exit(EXCEPTION_EXIT); + } + } + + /// + /// Starts all of the test classes to be run by this coordinator. + /// + /// An array of all the coordinating test case implementations. + /// + /// A JUnit TestResult to run the tests with. + /// + /// Any underlying exceptions are allowed to fall through, and fail the test process. + public TestResult start(String[] testClassNames) throws Exception + { + log.debug("public TestResult start(String[] testClassNames = " + Arrays.ToString(testClassNames) + ": called"); + + // Connect to the broker. + connection = TestUtils.createConnection(TestContextProperties.getInstance()); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Destination controlTopic = session.createTopic("iop.control"); + Destination responseQueue = session.createQueue("coordinator"); + + conversationFactory = new ConversationFactory(connection, responseQueue, LinkedBlockingQueue.class); + ConversationFactory.Conversation conversation = conversationFactory.startConversation(); + + connection.start(); + + // Broadcast the compulsory invitation to find out what clients are available to test. + Message invite = session.createMessage(); + invite.setStringProperty("CONTROL_TYPE", "INVITE"); + invite.setJMSReplyTo(responseQueue); + + conversation.send(controlTopic, invite); + + // Wait for a short time, to give test clients an opportunity to reply to the invitation. + Collection enlists = conversation.receiveAll(0, 500); + enlistedClients = extractEnlists(enlists); + + for (TestClientDetails client : enlistedClients) + { + log.debug("Got enlisted test client: " + client); + console.info("Test node " + client.clientName + " available."); + } + + // Start the clock reference service running. + UDPClockReference clockReference = new UDPClockReference(); + Thread clockRefThread = new Thread(clockReference); + registerShutdownHook(clockReference); + clockRefThread.start(); + + // Broadcast to all clients to synchronize their clocks against the coordinators clock reference. + Message clockSynchRequest = session.createMessage(); + clockSynchRequest.setStringProperty("CONTROL_TYPE", "CLOCK_SYNCH"); + + string localAddress = InetAddress.getByName(InetAddress.getLocalHost().getHostName()).getHostAddress(); + clockSynchRequest.setStringProperty("ADDRESS", localAddress); + + conversation.send(controlTopic, clockSynchRequest); + + // Run the test in the suite using JUnit. + TestResult result = null; + + for (string testClassName : testClassNames) + { + // Record the current test class, so that the test results can be output to a file incorporating this name. + this.currentTestClassName = testClassName; + + result = super.start(new String[] { testClassName }); + } + + // At this point in time, all tests have completed. Broadcast the shutdown message, if the termination option + // was set on the command line. + if (terminate) + { + Message terminate = session.createMessage(); + terminate.setStringProperty("CONTROL_TYPE", "TERMINATE"); + + conversation.send(controlTopic, terminate); + } + + return result; + } + + /// + /// For a collection of enlist messages, this method pulls out of the client details for the enlisting clients. + /// + /// The enlist messages. + /// + /// A set of enlisting clients, extracted from the enlist messages. + /// + /// Any underlying JMSException is allowed to fall through. + public static Set extractEnlists(Collection enlists) throws JMSException + { + log.debug("public static Set extractEnlists(Collection enlists = " + enlists + + "): called"); + + Set enlistedClients = new HashSet(); + + // Retain the list of all available clients. + for (Message enlist : enlists) + { + TestClientDetails clientDetails = new TestClientDetails(); + clientDetails.clientName = enlist.getStringProperty("CLIENT_NAME"); + clientDetails.privateControlKey = enlist.getStringProperty("CLIENT_PRIVATE_CONTROL_KEY"); + + string replyType = enlist.getStringProperty("CONTROL_TYPE"); + + if ("ENLIST".equals(replyType)) + { + enlistedClients.add(clientDetails); + } + else if ("DECLINE".equals(replyType)) + { + log.debug("Test client " + clientDetails.clientName + " declined the invite."); + } + else + { + log.warn("Got an unknown reply type, " + replyType + ", to the invite."); + } + } + + return enlistedClients; + } + + /// + /// Runs a test or suite of tests, using the super class implemenation. This method wraps the test to be run + /// in any test decorators needed to add in the coordinators ability to invite test clients to participate in + /// tests. + /// + /// The test to run. + /// Undocumented. Nothing in the JUnit javadocs to say what this is for. + /// + /// The results of the test run. + public TestResult doRun(Test test, bool wait) + { + log.debug("public TestResult doRun(Test \"" + test + "\", bool " + wait + "): called"); + + // Wrap all tests in the test suite with WrappedSuiteTestDecorators. This is quite ugly and a bit baffling, + // but the reason it is done is because the JUnit implementation of TestDecorator has some bugs in it. + WrappedSuiteTestDecorator targetTest = null; + + if (test instanceof TestSuite) + { + log.debug("targetTest is a TestSuite"); + + TestSuite suite = (TestSuite) test; + + int numTests = suite.countTestCases(); + log.debug("There are " + numTests + " in the suite."); + + for (int i = 0; i < numTests; i++) + { + Test nextTest = suite.testAt(i); + log.debug("suite.testAt(" + i + ") = " + nextTest); + + if (nextTest instanceof FrameworkBaseCase) + { + log.debug("nextTest is a FrameworkBaseCase"); + } + } + + targetTest = new WrappedSuiteTestDecorator(suite); + log.debug("Wrapped with a WrappedSuiteTestDecorator."); + } + + // Apply any optional user specified decorators. + targetTest = applyOptionalUserDecorators(targetTest); + + // Wrap the tests in a suitable distributed test decorator, to perform the invite/test cycle. + targetTest = newTestDecorator(targetTest, enlistedClients, conversationFactory, connection); + + // TestSuite suite = new TestSuite(); + // suite.addTest(targetTest); + + // Wrap the tests in a scaled test decorator to them them as a 'batch' in one thread. + // targetTest = new ScaledTestDecorator(targetTest, new int[] { 1 }); + + return super.doRun(targetTest, wait); + } + + /// + /// Creates a wrapped test decorator, that is capable of inviting enlisted clients to participate in a specified + /// test. This is the test engine that sets up the roles and sequences a distributed test case. + /// + /// The test decorator to wrap. + /// The enlisted clients available to run the test. + /// The conversation factory used to build conversation helper over the specified connection. + /// The connection to talk to the enlisted clients over. + /// + /// An invititing test decorator, that invites all the enlisted clients to participate in tests, in pairs. + protected DistributedTestDecorator newTestDecorator(WrappedSuiteTestDecorator targetTest, + Set enlistedClients, ConversationFactory conversationFactory, Connection connection) + { + switch (engine) + { + case FANOUT: + return new FanOutTestDecorator(targetTest, enlistedClients, conversationFactory, connection); + case INTEROP: + default: + return new InteropTestDecorator(targetTest, enlistedClients, conversationFactory, connection); + } + } + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using junit.framework.TestResult; + +using log4net; + +using Apache.Qpid.Integration.Tests.framework.FrameworkBaseCase; +using Apache.Qpid.Integration.Tests.framework.TestClientDetails; +using Apache.Qpid.Integration.Tests.framework.sequencers.CircuitFactory; +using org.apache.qpid.util.ConversationFactory; + +using uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator; + +using javax.jms.Connection; +using javax.jms.Destination; +using javax.jms.JMSException; +using javax.jms.Message; + +using java.util.*; + +namespace Apache.Qpid.Integration.Tests.framework.distributedtesting +{ + /// + /// DistributedTestDecorator is a base class for writing test decorators that invite test clients to participate in + /// distributed test cases. It provides a helper method, , that broadcasts an invitation and + /// returns the set of test clients that are available to particiapte in the test. + /// + ///

When used to wrap a test, it replaces the default implementations + /// with a suitable circuit factory for distributed tests. Concrete implementations can use this to configure the sending + /// and receiving roles on the test. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Broadcast test invitations and collect enlists. . + ///
+ ///

+ public abstract class DistributedTestDecorator extends WrappedSuiteTestDecorator + { + /// Used for debugging. + private static ILog log = LogManager.GetLogger(typeof(DistributedTestDecorator)); + + /// Holds the contact information for all test clients that are available and that may take part in the test. + Set allClients; + + /// Holds the conversation helper for the control level conversation for coordinating the test through. + ConversationFactory conversationFactory; + + /// Holds the connection that the control conversation is held over. + Connection connection; + + /// Holds the underlying test suite that this decorator wraps. + WrappedSuiteTestDecorator testSuite; + + /// Holds the control topic, on which test invitations are broadcast. + protected Destination controlTopic; + + /// + /// Creates a wrapped suite test decorator from another one. + /// + /// The test suite. + /// The list of all clients that responded to the compulsory invite. + /// The conversation helper for the control level, test coordination conversation. + /// The connection that the coordination messages are sent over. + public DistributedTestDecorator(WrappedSuiteTestDecorator suite, Set availableClients, + ConversationFactory controlConversation, Connection controlConnection) + { + super(suite); + + log.debug("public DistributedTestDecorator(WrappedSuiteTestDecorator suite, Set allClients = " + + availableClients + ", ConversationHelper controlConversation = " + controlConversation + "): called"); + + testSuite = suite; + allClients = availableClients; + conversationFactory = controlConversation; + connection = controlConnection; + + // Set up the test control topic. + try + { + controlTopic = conversationFactory.getSession().createTopic("iop.control"); + } + catch (JMSException e) + { + throw new RuntimeException("Unable to create the coordinating control topic to broadcast test invites on.", e); + } + } + + /// + /// Should run all of the tests in the wrapped test suite. + /// + /// The the results object to monitor the test results with. + public abstract void run(TestResult testResult); + + /// + /// Should provide the distributed test sequencer to pass to + /// tests. + /// + /// A distributed test sequencer. + public abstract CircuitFactory getTestSequencer(); + + /// + /// Broadcasts an invitation to participate in a coordinating test case to find out what clients are available to + /// run the test case. + /// + /// The coordinating test case to broadcast an inviate for. + /// + /// A set of test clients that accepted the invitation. + protected Set signupClients(FrameworkBaseCase coordTest) + { + // Broadcast the invitation to find out what clients are available to test. + Set enlists; + try + { + Message invite = conversationFactory.getSession().createMessage(); + + ConversationFactory.Conversation conversation = conversationFactory.startConversation(); + + invite.setStringProperty("CONTROL_TYPE", "INVITE"); + invite.setStringProperty("TEST_NAME", coordTest.getTestCaseNameForTestMethod(coordTest.getName())); + + conversation.send(controlTopic, invite); + + // Wait for a short time, to give test clients an opportunity to reply to the invitation. + Collection replies = conversation.receiveAll(allClients.size(), 500); + enlists = Coordinator.extractEnlists(replies); + } + catch (JMSException e) + { + throw new RuntimeException("There was a JMSException during the invite/enlist conversation.", e); + } + + return enlists; + } + + /// + /// Prints a string summarizing this test decorator, mainly for debugging purposes. + /// + /// string representation for debugging purposes. + public string ToString() + { + return "DistributedTestDecorator: [ testSuite = " + testSuite + " ]"; + } + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using junit.framework.Test; +using junit.framework.TestResult; + +using log4net; + +using Apache.Qpid.Integration.Tests.framework.DropInTest; +using Apache.Qpid.Integration.Tests.framework.FrameworkBaseCase; +using Apache.Qpid.Integration.Tests.framework.TestClientDetails; +using Apache.Qpid.Integration.Tests.framework.sequencers.CircuitFactory; +using Apache.Qpid.Integration.Tests.framework.sequencers.FanOutCircuitFactory; +using org.apache.qpid.util.ConversationFactory; + +using uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator; + +using javax.jms.Connection; +using javax.jms.JMSException; +using javax.jms.Message; +using javax.jms.MessageListener; + +using System.Collections.Generic.Collection; +using java.util.Iterator; +using java.util.Set; + +namespace Apache.Qpid.Integration.Tests.framework.distributedtesting +{ + /// + /// FanOutTestDecorator is an that runs one test client in the sender role, and the remainder + /// in the receivers role. It also has the capability to listen for new test cases joining the test beyond the initial start + /// point. This feature can be usefull when experimenting with adding more load, in the form of more test clients, to assess + /// its impact on a running test. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Execute coordinated test cases. + ///
Accept test clients joining a running test. + ///
+ ///

+ public class FanOutTestDecorator extends DistributedTestDecorator : MessageListener + { + /// Used for debugging. + private static ILog log = LogManager.GetLogger(typeof(FanOutTestDecorator)); + + /// Holds the currently running test case. + FrameworkBaseCase currentTest = null; + + /// + /// Creates a wrapped suite test decorator from another one. + /// + /// The test suite. + /// The list of all clients that responded to the compulsory invite. + /// The conversation helper for the control level, test coordination conversation. + /// The connection that the coordination messages are sent over. + public FanOutTestDecorator(WrappedSuiteTestDecorator suite, Set availableClients, + ConversationFactory controlConversation, Connection controlConnection) + { + super(suite, availableClients, controlConversation, controlConnection); + + log.debug("public DistributedTestDecorator(WrappedSuiteTestDecorator suite, Set allClients = " + + availableClients + ", ConversationHelper controlConversation = " + controlConversation + "): called"); + + testSuite = suite; + allClients = availableClients; + conversationFactory = controlConversation; + connection = controlConnection; + + // Sign available clients up to the test. + for (Test test : getAllUnderlyingTests()) + { + FrameworkBaseCase coordTest = (FrameworkBaseCase) test; + + // Get all of the clients able to participate in the test. + Set enlists = signupClients(coordTest); + + // Check that there were some clients available. + if (enlists.size() == 0) + { + throw new RuntimeException("No clients to test with"); + } + + // Create a distributed test circuit factory for the test. + CircuitFactory circuitFactory = getTestSequencer(); + + // Set up the first client in the sender role, and the remainder in the receivers role. + Iterator clients = enlists.iterator(); + circuitFactory.setSender(clients.next()); + + while (clients.hasNext()) + { + // Set the sending and receiving client details on the test case. + circuitFactory.setReceiver(clients.next()); + } + + // Pass down the connection to hold the coordinating conversation over. + circuitFactory.setConversationFactory(conversationFactory); + + // If the current test case is a drop-in test, set it up as the currently running test for late joiners to + // add in to. Otherwise the current test field is set to null, to indicate that late joiners are not allowed. + currentTest = (coordTest instanceof DropInTest) ? coordTest : null; + + // Execute the test case. + coordTest.setCircuitFactory(circuitFactory); + } + } + + /// + /// Broadcasts a test invitation and accepts enlists from participating clients. The wrapped test cases are run + /// with one test client in the sender role, and the remaining test clients in the receiving role. + /// + ///

Any JMSExceptions during the invite/enlist conversation will be allowed to fall through as runtime + /// exceptions, resulting in the non-completion of the test run. + ///

+ /// The the results object to monitor the test results with. + /// + /// Better error recovery for failure of the invite/enlist conversation could be added. + public void run(TestResult testResult) + { + log.debug("public void run(TestResult testResult): called"); + + // Listen for late joiners on the control topic. + try + { + conversationFactory.getSession().createConsumer(controlTopic).setMessageListener(this); + } + catch (JMSException e) + { + throw new RuntimeException("Unable to set up the message listener on the control topic.", e); + } + + // Run all of the test cases in the test suite. + /*for (Test test : getAllUnderlyingTests()) + { + FrameworkBaseCase coordTest = (FrameworkBaseCase) test; + + // Get all of the clients able to participate in the test. + Set enlists = signupClients(coordTest); + + // Check that there were some clients available. + if (enlists.size() == 0) + { + throw new RuntimeException("No clients to test with"); + } + + // Create a distributed test circuit factory for the test. + CircuitFactory circuitFactory = getTestSequencer(); + + // Set up the first client in the sender role, and the remainder in the receivers role. + Iterator clients = enlists.iterator(); + circuitFactory.setSender(clients.next()); + + while (clients.hasNext()) + { + // Set the sending and receiving client details on the test case. + circuitFactory.setReceiver(clients.next()); + } + + // Pass down the connection to hold the coordinating conversation over. + circuitFactory.setConversationFactory(conversationFactory); + + // If the current test case is a drop-in test, set it up as the currently running test for late joiners to + // add in to. Otherwise the current test field is set to null, to indicate that late joiners are not allowed. + currentTest = (coordTest instanceof DropInTest) ? coordTest : null; + + // Execute the test case. + coordTest.setCircuitFactory(circuitFactory); + }*/ + + // Run all of the test cases in the test suite. + for (Test test : getAllUnderlyingTests()) + { + FrameworkBaseCase coordTest = (FrameworkBaseCase) test; + + coordTest.run(testResult); + + currentTest = null; + } + } + + /// + /// Should provide the distributed test sequencer to pass to + /// tests. + /// + /// A distributed test sequencer. + public CircuitFactory getTestSequencer() + { + return new FanOutCircuitFactory(); + } + + /// + /// Listens to incoming messages on the control topic. If the messages are 'join' messages, signalling a new + /// test client wishing to join the current test, then the new client will be added to the current test in the + /// receivers role. + /// + /// The incoming control message. + public void onMessage(Message message) + { + try + { + // Check if the message is from a test client attempting to join a running test, and join it to the current + // test case if so. + if (message.getStringProperty("CONTROL_TYPE").equals("JOIN") && (currentTest != null)) + { + ((DropInTest) currentTest).lateJoin(message); + } + } + // There is not a lot can be done with this error, so it is deliberately ignored. + catch (JMSException e) + { + log.debug("Unable to process message:" + message); + } + } + + /// + /// Prints a string summarizing this test decorator, mainly for debugging purposes. + /// + /// string representation for debugging purposes. + public string ToString() + { + return "FanOutTestDecorator: [ testSuite = " + testSuite + " ]"; + } + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using junit.framework.Test; +using junit.framework.TestResult; + +using log4net; + +using Apache.Qpid.Integration.Tests.framework.FrameworkBaseCase; +using Apache.Qpid.Integration.Tests.framework.TestClientDetails; +using Apache.Qpid.Integration.Tests.framework.sequencers.CircuitFactory; +using Apache.Qpid.Integration.Tests.framework.sequencers.InteropCircuitFactory; +using org.apache.qpid.util.ConversationFactory; + +using uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator; + +using javax.jms.Connection; + +using java.util.*; + +namespace Apache.Qpid.Integration.Tests.framework.distributedtesting +{ + /// + /// DistributedTestDecorator is a test decorator, written to implement the interop test specification. Given a list + /// of enlisted test clients, that are available to run interop tests, this decorator invites them to participate + /// in each test in the wrapped test suite. Amongst all the clients that respond to the invite, all pairs are formed, + /// and each pairing (in both directions, but excluding the reflexive pairings) is split into a sender and receivers + /// role and a test case run between them. Any enlisted combinations that do not accept a test invite are automatically + /// failed. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Broadcast test invitations and collect enlists. . + ///
Output test failures for clients unwilling to run the test case. + ///
Execute distributed test cases. + ///
Fail non-participating pairings. + ///
+ ///

+ public class InteropTestDecorator extends DistributedTestDecorator + { + /// Used for debugging. + private static ILog log = LogManager.GetLogger(typeof(InteropTestDecorator)); + + /// + /// Creates a wrapped suite test decorator from another one. + /// + /// The test suite. + /// The list of all clients that responded to the compulsory invite. + /// The conversation helper for the control level, test coordination conversation. + /// The connection that the coordination messages are sent over. + public InteropTestDecorator(WrappedSuiteTestDecorator suite, Set availableClients, + ConversationFactory controlConversation, Connection controlConnection) + { + super(suite, availableClients, controlConversation, controlConnection); + } + + /// + /// Broadcasts a test invitation and accetps enlisting from participating clients. The wrapped test case is + /// then repeated for every combination of test clients (provided the wrapped test case extends + /// . + /// + ///

Any JMSExceptions during the invite/enlist conversation will be allowed to fall through as runtime exceptions, + /// resulting in the non-completion of the test run. + ///

+ /// Better error recovery for failure of the invite/enlist conversation could be added. + /// The the results object to monitor the test results with. + public void run(TestResult testResult) + { + log.debug("public void run(TestResult testResult): called"); + + Collection tests = testSuite.getAllUnderlyingTests(); + + for (Test test : getAllUnderlyingTests()) + { + FrameworkBaseCase coordTest = (FrameworkBaseCase) test; + + // Broadcast the invitation to find out what clients are available to test. + Set enlists = signupClients(coordTest); + + // Compare the list of willing clients to the list of all available. + Set optOuts = new HashSet(allClients); + optOuts.removeAll(enlists); + + // Output test failures for clients that will not particpate in the test. + Set> failPairs = allPairs(optOuts, allClients); + + for (List failPair : failPairs) + { + // Create a distributed test circuit factory for the test. + CircuitFactory circuitFactory = getTestSequencer(); + + // Create an automatic failure test for the opted out test pair. + FrameworkBaseCase failTest = new OptOutTestCase("testOptOut"); + circuitFactory.setSender(failPair.get(0)); + circuitFactory.setReceiver(failPair.get(1)); + failTest.setCircuitFactory(circuitFactory); + + failTest.run(testResult); + } + + // Loop over all combinations of clients, willing to run the test. + Set> enlistedPairs = allPairs(enlists, enlists); + + for (List enlistedPair : enlistedPairs) + { + // Create a distributed test circuit factory for the test. + CircuitFactory circuitFactory = getTestSequencer(); + + // Set the sending and receiving client details on the test circuitFactory. + circuitFactory.setSender(enlistedPair.get(0)); + circuitFactory.setReceiver(enlistedPair.get(1)); + + // Pass down the connection to hold the coordination conversation over. + circuitFactory.setConversationFactory(conversationFactory); + + // Execute the test case. + coordTest.setCircuitFactory(circuitFactory); + coordTest.run(testResult); + } + } + } + + /// + /// Should provide the distributed test sequencer to pass to + /// tests. + /// + /// A distributed test sequencer. + public CircuitFactory getTestSequencer() + { + return new InteropCircuitFactory(); + } + + /// + /// Produces all pairs of combinations of elements from two sets. The ordering of the elements in the pair is + /// important, that is the pair is distinct from ; both pairs are generated. For any element, i, in + /// both the left and right sets, the reflexive pair is not generated. + /// + /// The left set. + /// The right set. + /// @param The type of the content of the pairs. + ///
+ /// All pairs formed from the permutations of all elements of the left and right sets. + private Set> allPairs(Set left, Set right) + { + log.debug("private Set> allPairs(Set left = " + left + ", Set right = " + right + "): called"); + + Set> results = new HashSet>(); + + // Form all pairs from left to right. + // Form all pairs from right to left. + for (E le : left) + { + for (E re : right) + { + if (!le.equals(re)) + { + results.add(new Pair(le, re)); + results.add(new Pair(re, le)); + } + } + } + + log.debug("results = " + results); + + return results; + } + + /// A simple implementation of a pair, using a list. + private class Pair extends ArrayList + { + /// + /// Creates a new pair of elements. + /// + /// The first element. + /// The second element. + public Pair(T first, T second) + { + super(); + super.add(first); + super.add(second); + } + } + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using Apache.Qpid.Integration.Tests.framework.sequencers.CircuitFactory; +using Apache.Qpid.Integration.Tests.framework.FrameworkBaseCase; + +namespace Apache.Qpid.Integration.Tests.framework.distributedtesting +{ + /// + /// An OptOutTestCase is a test case that automatically fails. It is used when a list of test clients has been generated + /// from a compulsory invite, but only some of those clients have responded to a specific test case invite. The clients + /// that did not respond, may automatically be given a fail for some tests. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Fail the test with a suitable reason. + ///
+ ///

+ public class OptOutTestCase extends FrameworkBaseCase + { + /// + /// Creates a new coordinating test case with the specified name. + /// + /// The test case name. + public OptOutTestCase(string name) + { + super(name); + } + + /// Generates an appropriate test failure assertion. + public void testOptOut() + { + CircuitFactory circuitFactory = getCircuitFactory(); + + fail("One of " + circuitFactory.getSender() + " and " + getCircuitFactory().getReceivers() + + " opted out of the test."); + } + + /// + /// Should provide a translation from the junit method name of a test to its test case name as defined in the + /// interop testing specification. For example the method "testP2P" might map onto the interop test case name + /// "TC2_BasicP2P". + /// + /// The name of the JUnit test method. + /// The name of the corresponding interop test case. + public string getTestCaseNameForTestMethod(string methodName) + { + return "OptOutTest"; + } + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; +using org.apache.log4j.NDC; + +using Apache.Qpid.Integration.Tests.framework.MessagingTestConfigProperties; +using Apache.Qpid.Integration.Tests.framework.TestUtils; +using Apache.Qpid.Integration.Tests.framework.clocksynch.ClockSynchThread; +using Apache.Qpid.Integration.Tests.framework.clocksynch.UDPClockSynchronizer; +using org.apache.qpid.util.ReflectionUtils; +using org.apache.qpid.util.ReflectionUtilsException; + +using uk.co.thebadgerset.junit.extensions.SleepThrottle; +using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; +using uk.co.thebadgerset.junit.extensions.util.TestContextProperties; + +using javax.jms.*; + +using java.util.*; + +namespace Apache.Qpid.Integration.Tests.framework.distributedtesting +{ + /// + /// Implements a test client as described in the interop testing spec + /// (http://cwiki.apache.org/confluence/display/qpid/Interop+Testing+Specification). A test client is an agent that + /// reacts to control message sequences send by the test . + /// + ///

+ ///
Messages Handled by TestClient
Message Action + ///
Invite(compulsory) Reply with Enlist. + ///
Invite(test case) Reply with Enlist if test case available. + ///
AssignRole(test case) Reply with Accept Role if matches an enlisted test. Keep test parameters. + ///
Start Send test messages defined by test parameters. Send report on messages sent. + ///
Status Request Send report on messages received. + ///
Terminate Terminate the test client. + ///
ClockSynch Synch clock against the supplied UDP address. + ///
+ /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Handle all incoming control messages. + ///
Configure and look up test cases by name. + ///
+ ///

+ public class TestClient : MessageListener + { + /// Used for debugging. + private static ILog log = LogManager.GetLogger(typeof(TestClient)); + + /// Used for reporting to the console. + private static ILog console = LogManager.GetLogger("CONSOLE"); + + /// Holds the default identifying name of the test client. + public static final string CLIENT_NAME = "java"; + + /// Holds the URL of the broker to run the tests on. + public static string brokerUrl; + + /// Holds the virtual host to run the tests on. If null, then the default virtual host is used. + public static string virtualHost; + + /// + /// Holds the test context properties that provides the default test parameters, plus command line overrides. + /// This is initialized with the default test parameters, to which command line overrides may be applied. + /// + public static ParsedProperties testContextProperties = + TestContextProperties.getInstance(MessagingTestConfigProperties.defaults); + + /// Holds all the test cases loaded from the classpath. + Map testCases = new HashMap(); + + /// Holds the test case currently being run by this client. + protected TestClientControlledTest currentTestCase; + + /// Holds the connection to the broker that the test is being coordinated on. + protected Connection connection; + + /// Holds the message producer to hold the test coordination over. + protected MessageProducer producer; + + /// Holds the JMS controlSession for the test coordination. + protected Session session; + + /// Holds the name of this client, with a default value. + protected string clientName = CLIENT_NAME; + + /// This flag indicates that the test client should attempt to join the currently running test case on start up. + protected bool join; + + /// Holds the clock synchronizer for the test node. + ClockSynchThread clockSynchThread; + + /// + /// Creates a new interop test client, listenting to the specified broker and virtual host, with the specified client + /// identifying name. + /// + /// The url of the broker to connect to. + /// The virtual host to conect to. + /// The client name to use. + /// Flag to indicate that this client should attempt to join running tests. + public TestClient(string pBrokerUrl, string pVirtualHost, string clientName, bool join) + { + log.debug("public TestClient(string pBrokerUrl = " + pBrokerUrl + ", string pVirtualHost = " + pVirtualHost + + ", string clientName = " + clientName + ", bool join = " + join + "): called"); + + // Retain the connection parameters. + brokerUrl = pBrokerUrl; + virtualHost = pVirtualHost; + this.clientName = clientName; + this.join = join; + } + + /// + /// The entry point for the interop test coordinator. This client accepts the following command line arguments: + /// + ///

+ ///
-b The broker URL. Optional. + ///
-h The virtual host. Optional. + ///
-n The test client name. Optional. + ///
name=value Trailing argument define name/value pairs. Added to system properties. Optional. + ///
+ ///

+ /// The command line arguments. + public static void main(String[] args) + { + log.debug("public static void main(String[] args = " + Arrays.ToString(args) + "): called"); + console.info("Qpid Distributed Test Client."); + + // Override the default broker url to be localhost:5672. + testContextProperties.setProperty(MessagingTestConfigProperties.BROKER_PROPNAME, "tcp://localhost:5672"); + + // Use the command line parser to evaluate the command line with standard handling behaviour (print errors + // and usage then exist if there are errors). + // Any options and trailing name=value pairs are also injected into the test context properties object, + // to override any defaults that may have been set up. + ParsedProperties options = + new ParsedProperties(uk.co.thebadgerset.junit.extensions.util.CommandLineParser.processCommandLine(args, + new uk.co.thebadgerset.junit.extensions.util.CommandLineParser( + new String[][] + { + { "b", "The broker URL.", "broker", "false" }, + { "h", "The virtual host to use.", "virtual host", "false" }, + { "o", "The name of the directory to output test timings to.", "dir", "false" }, + { "n", "The name of the test client.", "name", "false" }, + { "j", "Join this test client to running test.", "false" } + }), testContextProperties)); + + // Extract the command line options. + string brokerUrl = options.getProperty("b"); + string virtualHost = options.getProperty("h"); + string clientName = options.getProperty("n"); + clientName = (clientName == null) ? CLIENT_NAME : clientName; + bool join = options.getPropertyAsBoolean("j"); + + // To distinguish logging output set up an NDC on the client name. + NDC.push(clientName); + + // Create a test client and start it running. + TestClient client = new TestClient(brokerUrl, virtualHost, clientName, join); + + // Use a class path scanner to find all the interop test case implementations. + // Hard code the test classes till the classpath scanner is fixed. + Collection> testCaseClasses = + new ArrayList>(); + // ClasspathScanner.getMatches(TestClientControlledTest.class, "^TestCase.*", true); + testCaseClasses.addAll(loadTestCases("org.apache.qpid.interop.clienttestcases.TestCase1DummyRun", + "org.apache.qpid.interop.clienttestcases.TestCase2BasicP2P", + "org.apache.qpid.interop.clienttestcases.TestCase3BasicPubSub", + "org.apache.qpid.interop.clienttestcases.TestCase4P2PMessageSize", + "org.apache.qpid.interop.clienttestcases.TestCase5PubSubMessageSize", + "Apache.Qpid.Integration.Tests.framework.distributedcircuit.TestClientCircuitEnd")); + + try + { + client.start(testCaseClasses); + } + catch (Exception e) + { + log.error("The test client was unable to start.", e); + console.info(e.getMessage()); + System.exit(1); + } + } + + /// + /// Parses a list of class names, and loads them if they are available on the class path. + /// + /// The names of the classes to load. + /// + /// A list of the loaded test case classes. + public static IList> loadTestCases(String... classNames) + { + IList> testCases = + new LinkedList>(); + + for (string className : classNames) + { + try + { + Class cls = ReflectionUtils.forName(className); + testCases.add((Class) cls); + } + catch (ReflectionUtilsException e) + { + // Ignore, class could not be found, so test not available. + console.warn("Requested class " + className + " cannot be found, ignoring it."); + } + catch (ClassCastException e) + { + // Ignore, class was not of correct type to be a test case. + console.warn("Requested class " + className + " is not an instance of TestClientControlledTest."); + } + } + + return testCases; + } + + /// + /// Starts the interop test client running. This causes it to start listening for incoming test invites. + /// + /// The classes of the available test cases. The test case names from these are used to + /// matchin incoming test invites against. + /// + /// Any underlying JMSExceptions are allowed to fall through. + protected void start(Collection> testCaseClasses) throws JMSException + { + log.debug("protected void start(Collection> testCaseClasses = " + + testCaseClasses + "): called"); + + // Create all the test case implementations and index them by the test names. + for (Class nextClass : testCaseClasses) + { + try + { + TestClientControlledTest testCase = nextClass.newInstance(); + testCases.put(testCase.getName(), testCase); + } + catch (InstantiationException e) + { + log.warn("Could not instantiate test case class: " + nextClass.getName(), e); + // Ignored. + } + catch (IllegalAccessException e) + { + log.warn("Could not instantiate test case class due to illegal access: " + nextClass.getName(), e); + // Ignored. + } + } + + // Open a connection to communicate with the coordinator on. + connection = TestUtils.createConnection(testContextProperties); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Set this up to listen for control messages. + Topic privateControlTopic = session.createTopic("iop.control." + clientName); + MessageConsumer consumer = session.createConsumer(privateControlTopic); + consumer.setMessageListener(this); + + Topic controlTopic = session.createTopic("iop.control"); + MessageConsumer consumer2 = session.createConsumer(controlTopic); + consumer2.setMessageListener(this); + + // Create a producer to send replies with. + producer = session.createProducer(null); + + // If the join flag was set, then broadcast a join message to notify the coordinator that a new test client + // is available to join the current test case, if it supports it. This message may be ignored, or it may result + // in this test client receiving a test invite. + if (join) + { + Message joinMessage = session.createMessage(); + + joinMessage.setStringProperty("CONTROL_TYPE", "JOIN"); + joinMessage.setStringProperty("CLIENT_NAME", clientName); + joinMessage.setStringProperty("CLIENT_PRIVATE_CONTROL_KEY", "iop.control." + clientName); + producer.send(controlTopic, joinMessage); + } + + // Start listening for incoming control messages. + connection.start(); + } + + /// + /// Handles all incoming control messages. + /// + /// The incoming message. + public void onMessage(Message message) + { + NDC.push(clientName); + log.debug("public void onMessage(Message message = " + message + "): called"); + + try + { + string controlType = message.getStringProperty("CONTROL_TYPE"); + string testName = message.getStringProperty("TEST_NAME"); + + log.debug("Received control of type '" + controlType + "' for the test '" + testName + "'"); + + // Check if the message is a test invite. + if ("INVITE".equals(controlType)) + { + // Flag used to indicate that an enlist should be sent. Only enlist to compulsory invites or invites + // for which test cases exist. + bool enlist = false; + + if (testName != null) + { + log.debug("Got an invite to test: " + testName); + + // Check if the requested test case is available. + TestClientControlledTest testCase = testCases.get(testName); + + if (testCase != null) + { + log.debug("Found implementing class for test '" + testName + "', enlisting for it."); + + // Check if the test case will accept the invitation. + enlist = testCase.acceptInvite(message); + + log.debug("The test case " + + (enlist ? " accepted the invite, enlisting for it." + : " did not accept the invite, not enlisting.")); + + // Make the requested test case the current test case. + currentTestCase = testCase; + } + else + { + log.debug("Received an invite to the test '" + testName + "' but this test is not known."); + } + } + else + { + log.debug("Got a compulsory invite, enlisting for it."); + + enlist = true; + } + + if (enlist) + { + // Reply with the client name in an Enlist message. + Message enlistMessage = session.createMessage(); + enlistMessage.setStringProperty("CONTROL_TYPE", "ENLIST"); + enlistMessage.setStringProperty("CLIENT_NAME", clientName); + enlistMessage.setStringProperty("CLIENT_PRIVATE_CONTROL_KEY", "iop.control." + clientName); + enlistMessage.setJMSCorrelationID(message.getJMSCorrelationID()); + + log.debug("Sending enlist message '" + enlistMessage + "' to " + message.getJMSReplyTo()); + + producer.send(message.getJMSReplyTo(), enlistMessage); + } + else + { + // Reply with the client name in an Decline message. + Message enlistMessage = session.createMessage(); + enlistMessage.setStringProperty("CONTROL_TYPE", "DECLINE"); + enlistMessage.setStringProperty("CLIENT_NAME", clientName); + enlistMessage.setStringProperty("CLIENT_PRIVATE_CONTROL_KEY", "iop.control." + clientName); + enlistMessage.setJMSCorrelationID(message.getJMSCorrelationID()); + + log.debug("Sending decline message '" + enlistMessage + "' to " + message.getJMSReplyTo()); + + producer.send(message.getJMSReplyTo(), enlistMessage); + } + } + else if ("ASSIGN_ROLE".equals(controlType)) + { + // Assign the role to the current test case. + string roleName = message.getStringProperty("ROLE"); + + log.debug("Got a role assignment to role: " + roleName); + + TestClientControlledTest.Roles role = Enum.valueOf(TestClientControlledTest.Roles.class, roleName); + + currentTestCase.assignRole(role, message); + + // Reply by accepting the role in an Accept Role message. + Message acceptRoleMessage = session.createMessage(); + acceptRoleMessage.setStringProperty("CLIENT_NAME", clientName); + acceptRoleMessage.setStringProperty("CONTROL_TYPE", "ACCEPT_ROLE"); + acceptRoleMessage.setJMSCorrelationID(message.getJMSCorrelationID()); + + log.debug("Sending accept role message '" + acceptRoleMessage + "' to " + message.getJMSReplyTo()); + + producer.send(message.getJMSReplyTo(), acceptRoleMessage); + } + else if ("START".equals(controlType) || "STATUS_REQUEST".equals(controlType)) + { + if ("START".equals(controlType)) + { + log.debug("Got a start notification."); + + // Extract the number of test messages to send from the start notification. + int numMessages; + + try + { + numMessages = message.getIntProperty("MESSAGE_COUNT"); + } + catch (NumberFormatException e) + { + // If the number of messages is not specified, use the default of one. + numMessages = 1; + } + + // Start the current test case. + currentTestCase.start(numMessages); + } + else + { + log.debug("Got a status request."); + } + + // Generate the report from the test case and reply with it as a Report message. + Message reportMessage = currentTestCase.getReport(session); + reportMessage.setStringProperty("CLIENT_NAME", clientName); + reportMessage.setStringProperty("CONTROL_TYPE", "REPORT"); + reportMessage.setJMSCorrelationID(message.getJMSCorrelationID()); + + log.debug("Sending report message '" + reportMessage + "' to " + message.getJMSReplyTo()); + + producer.send(message.getJMSReplyTo(), reportMessage); + } + else if ("TERMINATE".equals(controlType)) + { + console.info("Received termination instruction from coordinator."); + + // Is a cleaner shutdown needed? + connection.close(); + System.exit(0); + } + else if ("CLOCK_SYNCH".equals(controlType)) + { + log.debug("Received clock synch command."); + string address = message.getStringProperty("ADDRESS"); + + log.debug("address = " + address); + + // Re-create (if necessary) and start the clock synch thread to synch the clock every ten seconds. + if (clockSynchThread != null) + { + clockSynchThread.terminate(); + } + + SleepThrottle throttle = new SleepThrottle(); + throttle.setRate(0.1f); + + clockSynchThread = new ClockSynchThread(new UDPClockSynchronizer(address), throttle); + clockSynchThread.start(); + } + else + { + // Log a warning about this but otherwise ignore it. + log.warn("Got an unknown control message, controlType = " + controlType + ", message = " + message); + } + } + catch (JMSException e) + { + // Log a warning about this, but otherwise ignore it. + log.warn("Got JMSException whilst handling message: " + message, e); + } + // Log any runtimes that fall through this message handler. These are fatal errors for the test client. + catch (RuntimeException e) + { + log.error("The test client message handler got an unhandled exception: ", e); + console.info("The message handler got an unhandled exception, terminating the test client."); + System.exit(1); + } + finally + { + NDC.pop(); + } + } + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using javax.jms.JMSException; +using javax.jms.Message; +using javax.jms.MessageListener; +using javax.jms.Session; + +namespace Apache.Qpid.Integration.Tests.framework.distributedtesting +{ + /// + /// TestClientControlledTest provides an interface that classes implementing test cases to run on a + /// node can use. Implementations must be Java beans, that is, to provide a default constructor and to implement the + /// method. + /// + ///

The methods specified in this interface are called when the receives control instructions to + /// apply to the test. There are control instructions to present the test case with the test invite, so that it may + /// choose whether or not to participate in the test, assign the test to play the sender or receiver role, start the + /// test and obtain the test status report. + /// + ///

+ ///
CRC Card
Responsibilities + ///
Supply the name of the test case that this implements. + ///
Accept/Reject invites based on test parameters. + ///
Adapt to assigned roles. + ///
Perform test case actions. + ///
Generate test reports. + ///
+ ///

+ public interface TestClientControlledTest + { + /// Defines the possible test case roles that an interop test case can take on. + public enum Roles + { + /// Specifies the sender role. + SENDER, + + /// Specifies the receivers role. + RECEIVER + } + + /// + /// Should provide the name of the test case that this class implements. The exact names are defined in the + /// interop testing spec. + /// + /// The name of the test case that this implements. + public string getName(); + + /// + /// Determines whether the test invite that matched this test case is acceptable. + /// + /// The invitation to accept or reject. + /// + /// true to accept the invitation, false to reject it. + /// + /// Any JMSException resulting from reading the message are allowed to fall through. + public bool acceptInvite(Message inviteMessage) throws JMSException; + + /// + /// Assigns the role to be played by this test case. The test parameters are fully specified in the + /// assignment message. When this method return the test case will be ready to execute. + /// + /// The role to be played; sender or receivers. + /// The role assingment message, contains the full test parameters. + /// + /// Any JMSException resulting from reading the message are allowed to fall through. + public void assignRole(Roles role, Message assignRoleMessage) throws JMSException; + + /// + /// Performs the test case actions. Returning from here, indicates that the sending role has completed its test. + /// + /// The number of test messages to send. + /// + /// Any JMSException resulting from reading the message are allowed to fall through. + public void start(int numMessages) throws JMSException; + + /// + /// Gets a report on the actions performed by the test case in its assigned role. + /// + /// The controlSession to create the report message in. + /// + /// The report message. + /// + /// Any JMSExceptions resulting from creating the report are allowed to fall through. + public Message getReport(Session session) throws JMSException; + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using javax.jms.JMSException; +using javax.jms.Message; + +namespace Apache.Qpid.Integration.Tests.framework +{ + /// + /// A DropIn test is a test case that can accept late joining test clients into a running test. This can be usefull, + /// for interactive experimentation. + /// + ///

+ ///
CRC Card
Responsibilities + ///
Accept late joining test clients. + ///
+ ///

+ public interface DropInTest + { + /// + /// Should accept a late joining client into a running test case. The client will be enlisted with a control message + /// with the 'CONTROL_TYPE' field set to the value 'LATEJOIN'. It should also provide values for the fields: + /// + ///

+ ///
CLIENT_NAME A unique name for the new client. + ///
CLIENT_PRIVATE_CONTROL_KEY The key for the route on which the client receives its control messages. + ///
+ ///

+ /// The late joiners join message. + /// + /// Any JMS Exception are allowed to fall through, indicating that the join failed. + public void lateJoin(Message message) throws JMSException; + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; + +using javax.jms.ExceptionListener; +using javax.jms.JMSException; + +using java.io.PrintWriter; +using java.io.StringWriter; +using java.util.ArrayList; +using System.Collections.Generic.IList; + +namespace Apache.Qpid.Integration.Tests.framework +{ + /// + /// An exception monitor, listens for JMS exception on a connection or consumer. It record all exceptions that it receives + /// and provides methods to test the number and type of exceptions received. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Record all exceptions received. + ///
+ ///

+ public class ExceptionMonitor : ExceptionListener + { + /// Used for debugging. + private static ILog log = LogManager.GetLogger(typeof(ExceptionMonitor)); + + /// Holds the received exceptions. + IList exceptions = new ArrayList(); + + /// + /// Receives incoming exceptions. + /// + /// The exception to record. + public synchronized void onException(JMSException e) + { + log.debug("public void onException(JMSException e): called", e); + + exceptions.add(e); + } + + /// + /// Checks that no exceptions have been received. + /// + /// true if no exceptions have been received, false otherwise. + public synchronized bool assertNoExceptions() + { + return exceptions.isEmpty(); + } + + /// + /// Checks that exactly one exception has been received. + /// + /// true if exactly one exception been received, false otherwise. + public synchronized bool assertOneJMSException() + { + return exceptions.size() == 1; + } + + /// + /// Checks that exactly one exception, with a linked cause of the specified type, has been received. + /// + /// The type of the linked cause. + /// + /// true if exactly one exception, with a linked cause of the specified type, been received, + /// false otherwise. + public synchronized bool assertOneJMSExceptionWithLinkedCause(Class aClass) + { + if (exceptions.size() == 1) + { + Exception e = exceptions.get(0); + + if (e instanceof JMSException) + { + JMSException jmse = (JMSException) e; + + Exception linkedCause = jmse.getLinkedException(); + + if ((linkedCause != null) && aClass.isInstance(linkedCause)) + { + return true; + } + } + } + + return false; + } + + /// + /// Checks that at least one exception of the the specified type, has been received. + /// + /// The type of the exception. + /// + /// true if at least one exception of the specified type has been received, false otherwise. + public synchronized bool assertExceptionOfType(Class exceptionClass) + { + // Start by assuming that the exception has no been received. + bool passed = false; + + // Scan all the exceptions for a match. + for (Exception e : exceptions) + { + if (exceptionClass.isInstance(e)) + { + passed = true; + + break; + } + } + + return passed; + } + + /// + /// Reports the number of exceptions held by this monitor. + /// + /// The number of exceptions held by this monitor. + public synchronized int size() + { + return exceptions.size(); + } + + /// + /// Clears the record of received exceptions. + /// + public synchronized void reset() + { + exceptions = new ArrayList(); + } + + /// + /// Provides a dump of the stack traces of all exceptions that this exception monitor was notified of. Mainly + /// use for debugging/test failure reporting purposes. + /// + /// A string containing a dump of the stack traces of all exceptions. + public synchronized string ToString() + { + string result = "ExceptionMonitor: holds " + exceptions.size() + " exceptions.\n\n"; + + for (Exception ex : exceptions) + { + result += getStackTrace(ex) + "\n"; + } + + return result; + } + + /// + /// Prints an exception stack trace into a string. + /// + /// The throwable to get the stack trace from. + /// + /// A string containing the throwables stack trace. + public static string getStackTrace(Throwable t) + { + StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw, true); + t.printStackTrace(pw); + pw.flush(); + sw.flush(); + + return sw.ToString(); + } + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; +using org.apache.log4j.NDC; + +using Apache.Qpid.Integration.Tests.framework.BrokerLifecycleAware; +using Apache.Qpid.Integration.Tests.framework.sequencers.CircuitFactory; + +using uk.co.thebadgerset.junit.extensions.AsymptoticTestCase; +using uk.co.thebadgerset.junit.extensions.SetupTaskAware; +using uk.co.thebadgerset.junit.extensions.SetupTaskHandler; +using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; +using uk.co.thebadgerset.junit.extensions.util.TestContextProperties; + +using java.util.ArrayList; +using System.Collections.Generic.IList; + +namespace Apache.Qpid.Integration.Tests.framework +{ + /// + /// FrameworkBaseCase provides a starting point for writing test cases against the test framework. Its main purpose is + /// to provide some convenience methods for testing. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Create and clean up in-vm brokers on every test case. + ///
Produce lists of assertions from assertion creation calls. + ///
Produce JUnit failures from assertion failures. + ///
Convert failed assertions to error messages. + ///
+ ///

+ public class FrameworkBaseCase extends AsymptoticTestCase : FrameworkTestContext, SetupTaskAware, + BrokerLifecycleAware + { + /// Used for debugging purposes. + private static ILog log = LogManager.GetLogger(typeof(FrameworkBaseCase)); + + /// Holds the test sequencer to create and run test circuits with. + protected CircuitFactory circuitFactory = new LocalCircuitFactory(); + + /// Used to read the tests configurable properties through. + protected ParsedProperties testProps; + + /// A default setup task processor to delegate setup tasks to. + protected SetupTaskHandler taskHandler = new SetupTaskHandler(); + + /// Flag used to track whether the test is in-vm or not. + protected bool isUsingInVM; + + /// Holds the failure mechanism. + protected CauseFailure failureMechanism = new CauseFailureUserPrompt(); + + /// + /// Creates a new test case with the specified name. + /// + /// The test case name. + public FrameworkBaseCase(string name) + { + super(name); + } + + /// + /// Returns the test case sequencer that provides test circuit, and test sequence implementations. The sequencer + /// that this base case returns by default is suitable for running a test circuit with both circuit ends colocated + /// on the same JVM. + /// + /// The test case sequencer. + protected CircuitFactory getCircuitFactory() + { + return circuitFactory; + } + + /// + /// Overrides the default test circuit factory. Test decorators can use this to supply distributed test sequencers or + /// other test circuit factory specializations. + /// + /// The new test circuit factory. + public void setCircuitFactory(CircuitFactory circuitFactory) + { + this.circuitFactory = circuitFactory; + } + + /// + /// Reports the current test case name. + /// + /// The current test case name. + public TestCaseVector getTestCaseVector() + { + return new TestCaseVector(this.getName(), 0); + } + + /// + /// Reports the current test case parameters. + /// + /// The current test case parameters. + public MessagingTestConfigProperties getTestParameters() + { + return new MessagingTestConfigProperties(testProps); + } + + /// + /// Creates a list of assertions. + /// + /// The assertions to compile in a list. + /// + /// A list of assertions. + protected IList assertionList(Assertion... asserts) + { + IList result = new ArrayList(); + + for (Assertion assertion : asserts) + { + result.add(assertion); + } + + return result; + } + + /// + /// Generates a JUnit assertion exception (failure) if any assertions are passed into this method, also concatenating + /// all of the error messages in the assertions together to form an error message to diagnose the test failure with. + /// + /// The list of failed assertions. + protected static void assertNoFailures(List asserts) + { + log.debug("protected void assertNoFailures(List asserts = " + asserts + "): called"); + + // Check if there are no assertion failures, and return without doing anything if so. + if ((asserts == null) || asserts.isEmpty()) + { + return; + } + + // Compile all of the assertion failure messages together. + string errorMessage = assertionsToString(asserts); + + // Fail with the error message from all of the assertions. + fail(errorMessage); + } + + /// + /// Converts a list of failed assertions into an error message. + /// + /// The failed assertions. + /// + /// The error message. + protected static string assertionsToString(List asserts) + { + string errorMessage = ""; + + for (Assertion assertion : asserts) + { + errorMessage += assertion.ToString() + "\n"; + } + + return errorMessage; + } + + /// + /// Ensures that the in-vm broker is created and initialized. + /// + /// + /// Any exceptions allowed to fall through and fail the test. + protected void setUp() throws Exception + { + NDC.push(getName()); + + testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults); + + // Process all optional setup tasks. This may include in-vm broker creation, if a decorator has added it. + taskHandler.runSetupTasks(); + } + + /// Ensures that the in-vm broker is cleaned up after each test run. + protected void tearDown() + { + NDC.pop(); + + // Process all optional tear down tasks. This may include in-vm broker clean up, if a decorator has added it. + taskHandler.runTearDownTasks(); + } + + /// + /// Adds the specified task to the tests setup. + /// + /// The task to add to the tests setup. + public void chainSetupTask(Runnable task) + { + taskHandler.chainSetupTask(task); + } + + /// + /// Adds the specified task to the tests tear down. + /// + /// The task to add to the tests tear down. + public void chainTearDownTask(Runnable task) + { + taskHandler.chainTearDownTask(task); + } + + /// + /// Should provide a translation from the junit method name of a test to its test case name as known to the test + /// clients that will run the test. The purpose of this is to convert the JUnit method name into the correct test + /// case name to place into the test invite. For example the method "testP2P" might map onto the interop test case + /// name "TC2_BasicP2P". + /// + /// The name of the JUnit test method. + /// + /// The name of the corresponding interop test case. + public string getTestCaseNameForTestMethod(string methodName) + { + return methodName; + } + + public void setInVmBrokers() + { + isUsingInVM = true; + } + + /// + /// Indicates whether or not a test case is using in-vm brokers. + /// + /// true if the test is using in-vm brokers, false otherwise. + public bool usingInVmBroker() + { + return isUsingInVM; + } + + /// + /// Sets the currently live in-vm broker. + /// + /// The currently live in-vm broker. + public void setLiveBroker(int i) + { } + + /// + /// Reports the currently live in-vm broker. + /// + /// The currently live in-vm broker. + public int getLiveBroker() + { + return 0; + } + + /// + /// Accepts a failure mechanism. + /// + /// The failure mechanism. + public void setFailureMechanism(CauseFailure failureMechanism) + { + this.failureMechanism = failureMechanism; + } + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace Apache.Qpid.Integration.Tests.framework +{ + + /// + /// A FrameworkTestContext provides context information to test code about the current test case being run; its name, its + /// parameters. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Provide the name of the current test case. + ///
Provide the test parameters. + ///
+ ///

+ public interface FrameworkTestContext + { + /// + /// Reports the current test case name. + /// + /// The current test case name. + TestCaseVector getTestCaseVector(); + + /// + /// Reports the current test case parameters. + /// + /// The current test case parameters. + MessagingTestConfigProperties getTestParameters(); + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; + +using Apache.Qpid.Integration.Tests.framework.*; + +using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; + +using javax.jms.*; + +using System.Collections.Generic.LinkedList; +using System.Collections.Generic.IList; + +namespace Apache.Qpid.Integration.Tests.framework.localcircuit +{ + /// + /// LocalCircuitImpl provides an implementation of the test circuit. This is a local only circuit implementation that + /// supports a single producer/consumer on each end of the circuit, with both ends of the circuit on the same JVM. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Supply the publishing and receiving ends of a test messaging circuit. + /// , + ///
Start the circuit running. + ///
Close the circuit down. + ///
Take a reading of the circuits state. + ///
Apply assertions against the circuits state. + ///
Send test messages over the circuit. + ///
Perform the default test procedure on the circuit. + ///
Provide access to connection and controlSession exception monitors. + ///
+ ///

+ public class LocalCircuitImpl : Circuit + { + /// Used for debugging. + private static ILog log = LogManager.GetLogger(typeof(LocalCircuitImpl)); + + /// Holds the test configuration for the circuit. + private ParsedProperties testProps; + + /// Holds the publishing end of the circuit. + private LocalPublisherImpl publisher; + + /// Holds the receiving end of the circuit. + private LocalReceiverImpl receiver; + + /// Holds the connection for the publishing end of the circuit. + private Connection connection; + + /// Holds the exception listener for the connection on the publishing end of the circuit. + private ExceptionMonitor connectionExceptionMonitor; + + /// Holds the exception listener for the controlSession on the publishing end of the circuit. + private ExceptionMonitor exceptionMonitor; + + /// + /// Creates a test circuit using the specified test parameters. The publisher, receivers, connection and + /// connection monitor must already have been created, to assemble the circuit. + /// + /// The test parameters. + /// The test publisher. + /// The test receivers. + /// The connection. + /// The connection exception monitor. + public LocalCircuitImpl(ParsedProperties testProps, LocalPublisherImpl publisher, LocalReceiverImpl receiver, + Connection connection, ExceptionMonitor connectionExceptionMonitor) + { + this.testProps = testProps; + this.publisher = publisher; + this.receiver = receiver; + this.connection = connection; + this.connectionExceptionMonitor = connectionExceptionMonitor; + this.exceptionMonitor = new ExceptionMonitor(); + + // Set this as the parent circuit on the publisher and receivers. + publisher.setCircuit(this); + receiver.setCircuit(this); + } + + /// + /// Gets the interface on the publishing end of the circuit. + /// + /// The publishing end of the circuit. + public Publisher getPublisher() + { + return publisher; + } + + /// + /// Gets the local publishing circuit end, for direct manipulation. + /// + /// The local publishing circuit end. + public CircuitEnd getLocalPublisherCircuitEnd() + { + return publisher; + } + + /// + /// Gets the interface on the receiving end of the circuit. + /// + /// The receiving end of the circuit. + public Receiver getReceiver() + { + return receiver; + } + + /// + /// Gets the local receiving circuit end, for direct manipulation. + /// + /// The local receiving circuit end. + public CircuitEnd getLocalReceiverCircuitEnd() + { + return receiver; + } + + /// + /// Checks the test circuit. The effect of this is to gather the circuits state, for both ends of the circuit, + /// into a report, against which assertions may be checked. + /// + public void check() + { } + + /// + /// Applied a list of assertions against the test circuit. The method should be called before doing + /// this, to ensure that the circuit has gathered its state into a report to assert against. + /// + /// The list of assertions to apply. + /// Any assertions that failed. + public IList applyAssertions(List assertions) + { + IList failures = new LinkedList(); + + for (Assertion assertion : assertions) + { + if (!assertion.apply()) + { + failures.add(assertion); + } + } + + return failures; + } + + /// Connects and starts the circuit. After this method is called the circuit is ready to send messages. + public void start() + { } + + /// Closes the circuit. All associated resources are closed. + public void close() + { + try + { + publisher.close(); + receiver.close(); + connection.close(); + } + catch (JMSException e) + { + throw new RuntimeException("Got JMSException during close:" + e.getMessage(), e); + } + } + + /// Sends a message on the test circuit. The exact nature of the message sent is controlled by the test parameters. + protected void send() + { + // Cast the test properties into a typed interface for convenience. + MessagingTestConfigProperties props = new MessagingTestConfigProperties(testProps); + + bool transactional = props.getPublisherTransacted(); + bool rollback = props.getRollbackPublisher(); + + // Send a message through the publisher and log any exceptions raised. + try + { + CircuitEnd end = getLocalPublisherCircuitEnd(); + + end.send(createTestMessage(end)); + + if (rollback) + { + end.getSession().rollback(); + } + else if (transactional) + { + end.getSession().commit(); + } + } + catch (JMSException e) + { + exceptionMonitor.onException(e); + } + } + + /// + /// Runs the default test procedure against the circuit, and checks that all of the specified assertions hold. The + /// outline of the default test procedure is: + /// + ///

+        /// Start the circuit.
+        /// Send test messages.
+        /// Request a status report.
+        /// Assert conditions on the publishing end of the circuit.
+        /// Assert conditions on the receiving end of the circuit.
+        /// Close the circuit.
+        /// Pass with no failed assertions or fail with a list of failed assertions.
+        /// 
+ ///
+ /// The number of messages to send using the default test procedure. + /// The list of assertions to apply. + /// Any assertions that failed. + public IList test(int numMessages, List assertions) + { + // Start the test circuit. + start(); + + // Send the requested number of test messages. + for (int i = 0; i < numMessages; i++) + { + send(); + } + + // Inject a short pause to allow time for exceptions to come back asynchronously. + TestUtils.pause(500L); + + // Request a status report. + check(); + + // Clean up the publisher/receivers/controlSession/connections. + close(); + + // Apply all of the requested assertions, keeping record of any that fail. + IList failures = applyAssertions(assertions); + + // Return any failed assertions to the caller. + return failures; + } + + /// + /// Creates a message with the properties defined as per the test parameters. + /// + /// The circuit end to create the message on. + /// + /// The test message. + /// + /// Any JMSException occurring during creation of the message is allowed to fall through. + private Message createTestMessage(CircuitEnd client) throws JMSException + { + // Cast the test properties into a typed interface for convenience. + MessagingTestConfigProperties props = new MessagingTestConfigProperties(testProps); + + return TestUtils.createTestMessageOfSize(client.getSession(), props.getMessageSize()); + } + + /// + /// Gets the exception monitor for the publishing ends connection. + /// + /// The exception monitor for the publishing ends connection. + public ExceptionMonitor getConnectionExceptionMonitor() + { + return connectionExceptionMonitor; + } + + /// + /// Gets the exception monitor for the publishing ends controlSession. + /// + /// The exception monitor for the publishing ends controlSession. + public ExceptionMonitor getExceptionMonitor() + { + return exceptionMonitor; + } + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using Apache.Qpid.Integration.Tests.framework.*; + +using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; + +using javax.jms.MessageConsumer; +using javax.jms.MessageProducer; +using javax.jms.Session; + +namespace Apache.Qpid.Integration.Tests.framework.localcircuit +{ + /// + /// Provides an implementation of the interface and wraps a single message producer and consumer on + /// a single controlSession, as a . A local publisher also acts as a circuit end, because for a locally + /// located circuit the assertions may be applied directly, there does not need to be any inter-process messaging + /// between the publisher and its single circuit end, in order to ascertain its status. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Provide a message producer for sending messages. + ///
Provide a message consumer for receiving messages. + ///
Provide assertion that the publisher received no exceptions. + ///
Provide assertion that the publisher received a no consumers error code. + ///
Provide assertion that the publisher received a no route error code. + ///
+ ///

+ public class LocalPublisherImpl extends CircuitEndBase : Publisher + { + /// Holds a reference to the containing circuit. + protected LocalCircuitImpl circuit; + + /// + /// Creates a circuit end point on the specified producer, consumer and controlSession. Monitors are also configured + /// for messages and exceptions received by the circuit end. + /// + /// The message producer for the circuit end point. + /// The message consumer for the circuit end point. + /// The controlSession for the circuit end point. + /// The monitor to notify of all messages received by the circuit end. + /// The monitor to notify of all exceptions received by the circuit end. + public LocalPublisherImpl(MessageProducer producer, MessageConsumer consumer, Session session, + MessageMonitor messageMonitor, ExceptionMonitor exceptionMonitor) + { + super(producer, consumer, session, messageMonitor, exceptionMonitor); + } + + /// + /// Creates a circuit end point from the producer, consumer and controlSession in a circuit end base implementation. + /// + /// The circuit end base implementation to take producers and consumers from. + public LocalPublisherImpl(CircuitEndBase end) + { + super(end.getProducer(), end.getConsumer(), end.getSession(), end.getMessageMonitor(), end.getExceptionMonitor()); + } + + /// Provides an assertion that the publisher encountered no exceptions. + /// + /// The test configuration properties. + /// + /// An assertion that the publisher encountered no exceptions. + public Assertion noExceptionsAssertion(ParsedProperties testProps) + { + return new AssertionBase() + { + public bool apply() + { + bool passed = true; + ExceptionMonitor sessionExceptionMonitor = circuit.getExceptionMonitor(); + ExceptionMonitor connectionExceptionMonitor = circuit.getConnectionExceptionMonitor(); + + if (!connectionExceptionMonitor.assertNoExceptions()) + { + passed = false; + + addError("Was expecting no exceptions.\n"); + addError("Got the following exceptions on the connection, " + + circuit.getConnectionExceptionMonitor()); + } + + if (!sessionExceptionMonitor.assertNoExceptions()) + { + passed = false; + + addError("Was expecting no exceptions.\n"); + addError("Got the following exceptions on the producer, " + circuit.getExceptionMonitor()); + } + + return passed; + } + }; + } + + /// + /// Provides an assertion that the AMQP channel was forcibly closed by an error condition. + /// + /// The test configuration properties. + /// + /// An assertion that the AMQP channel was forcibly closed by an error condition. + public Assertion channelClosedAssertion(ParsedProperties testProps) + { + return new NotApplicableAssertion(testProps); + } + + /// + /// Provides an assertion that the publisher got a given exception during the test. + /// + /// The test configuration properties. + /// The exception class to check for. + /// + /// An assertion that the publisher got a given exception during the test. + public Assertion exceptionAssertion(ParsedProperties testProps, final Class exceptionClass) + { + return new AssertionBase() + { + public bool apply() + { + bool passed = true; + ExceptionMonitor connectionExceptionMonitor = circuit.getConnectionExceptionMonitor(); + + if (!connectionExceptionMonitor.assertExceptionOfType(exceptionClass)) + { + passed = false; + + addError("Was expecting linked exception type " + exceptionClass.getName() + + " on the connection.\n"); + addError((connectionExceptionMonitor.size() > 0) + ? ("Actually got the following exceptions on the connection, " + connectionExceptionMonitor) + : "Got no exceptions on the connection."); + } + + return passed; + } + }; + } + + /// + /// Sets the contianing circuit. + /// + /// The containing circuit. + public void setCircuit(LocalCircuitImpl circuit) + { + this.circuit = circuit; + } + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using Apache.Qpid.Integration.Tests.framework.*; + +using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; + +using javax.jms.MessageConsumer; +using javax.jms.MessageProducer; +using javax.jms.Session; + +namespace Apache.Qpid.Integration.Tests.framework.localcircuit +{ + /// + /// Provides an implementation of the interface that wraps a single message producer and consumer on + /// a single controlSession, as a . A local receiver also acts as a circuit end, because for a locally + /// located circuit the assertions may be applied directly, there does not need to be any inter process messaging + /// between the publisher and its single circuit end, in order to ascertain its status. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Provide a message producer for sending messages. + ///
Provide a message consumer for receiving messages. + ///
Provide assertion that the receivers received no exceptions. + ///
Provide assertion that the receivers received all test messages sent to it. + ///
+ ///

+ public class LocalReceiverImpl extends CircuitEndBase : Receiver + { + /// Holds a reference to the containing circuit. + private LocalCircuitImpl circuit; + + /// + /// Creates a circuit end point on the specified producer, consumer and controlSession. Monitors are also configured + /// for messages and exceptions received by the circuit end. + /// + /// The message producer for the circuit end point. + /// The message consumer for the circuit end point. + /// The controlSession for the circuit end point. + /// The monitor to notify of all messages received by the circuit end. + /// The monitor to notify of all exceptions received by the circuit end. + public LocalReceiverImpl(MessageProducer producer, MessageConsumer consumer, Session session, + MessageMonitor messageMonitor, ExceptionMonitor exceptionMonitor) + { + super(producer, consumer, session, messageMonitor, exceptionMonitor); + } + + /// + /// Creates a circuit end point from the producer, consumer and controlSession in a circuit end base implementation. + /// + /// The circuit end base implementation to take producers and consumers from. + public LocalReceiverImpl(CircuitEndBase end) + { + super(end.getProducer(), end.getConsumer(), end.getSession(), end.getMessageMonitor(), end.getExceptionMonitor()); + } + + /// + /// Provides an assertion that the receivers encountered no exceptions. + /// + /// The test configuration properties. + /// + /// An assertion that the receivers encountered no exceptions. + public Assertion noExceptionsAssertion(ParsedProperties testProps) + { + return new NotApplicableAssertion(testProps); + } + + /// + /// Provides an assertion that the AMQP channel was forcibly closed by an error condition. + /// + /// The test configuration properties. + /// + /// An assertion that the AMQP channel was forcibly closed by an error condition. + public Assertion channelClosedAssertion(ParsedProperties testProps) + { + return new NotApplicableAssertion(testProps); + } + + /// + /// Provides an assertion that the receivers got all messages that were sent to it. + /// + /// The test configuration properties. + /// + /// An assertion that the receivers got all messages that were sent to it. + public Assertion allMessagesReceivedAssertion(ParsedProperties testProps) + { + return new NotApplicableAssertion(testProps); + } + + /// + /// Provides an assertion that the receivers got none of the messages that were sent to it. + /// + /// The test configuration properties. + /// + /// An assertion that the receivers got none of the messages that were sent to it. + public Assertion noMessagesReceivedAssertion(ParsedProperties testProps) + { + return new NotApplicableAssertion(testProps); + } + + /// + /// Provides an assertion that the receiver got a given exception during the test. + /// + /// The test configuration properties. + /// The exception class to check for. An assertion that the receiver got a given exception during the test. + public Assertion exceptionAssertion(ParsedProperties testProps, Class exceptionClass) + { + return new NotApplicableAssertion(testProps); + } + + /// + /// Sets the contianing circuit. + /// + /// The containing circuit. + public void setCircuit(LocalCircuitImpl circuit) + { + this.circuit = circuit; + } + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; + +using Apache.Qpid.Integration.Tests.framework.localcircuit.LocalCircuitImpl; +using Apache.Qpid.Integration.Tests.framework.localcircuit.LocalPublisherImpl; +using Apache.Qpid.Integration.Tests.framework.localcircuit.LocalReceiverImpl; +using Apache.Qpid.Integration.Tests.framework.sequencers.CircuitFactory; +using org.apache.qpid.util.ConversationFactory; + +using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; + +using javax.jms.*; + +using System.Collections.Generic.IList; +using java.util.Properties; +using java.util.concurrent.atomic.AtomicLong; + +namespace Apache.Qpid.Integration.Tests.framework +{ + /// + /// LocalCircuitFactory is a circuit factory that creates test circuits with publishing and receiving ends rooted + /// on the same JVM. The ends of the circuit are presented as and interfaces, which + /// in turn provide methods to apply assertions to the circuit. The creation of the circuit ends, and the presentation + /// of the ends as publisher/receiver interfaces, are designed to be overriden, so that circuits and assertions that + /// use messaging features not available in JMS can be written. This provides an extension point for writing tests + /// against proprietary features of JMS implementations. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Provide a standard test procedure over a test circuit. + ///
Construct test circuits appropriate to a tests context. + ///
+ ///

+ public class LocalCircuitFactory : CircuitFactory + { + /// Used for debugging. + private static ILog log = LogManager.GetLogger(typeof(LocalCircuitFactory)); + + /// Used to create unique destination names for each test. + protected static AtomicLong uniqueDestsId = new AtomicLong(); + + /// + /// Holds a test coordinating conversation with the test clients. This should consist of assigning the test roles, + /// begining the test and gathering the test reports from the participants. + /// + /// The test circuit. + /// The list of assertions to apply to the test circuit. + /// The test case definition. + public void sequenceTest(Circuit testCircuit, IList assertions, Properties testProperties) + { + FrameworkBaseCase.assertNoFailures(testCircuit.test(1, assertions)); + } + + /// + /// Creates a test circuit for the test, configered by the test parameters specified. + /// + /// The test parameters. + /// + /// A test circuit. + public Circuit createCircuit(ParsedProperties testProperties) + { + Circuit result; + + // Cast the test properties into a typed interface for convenience. + MessagingTestConfigProperties props = new MessagingTestConfigProperties(testProperties); + + // Create a standard publisher/receivers test client pair on a shared connection, individual sessions. + try + { + // Get a unique offset to append to destination names to make them unique to the connection. + long uniqueId = uniqueDestsId.incrementAndGet(); + + // Set up the connection. + Connection connection = TestUtils.createConnection(testProperties); + + // Add the connection exception listener to assert on exception conditions with. + // ExceptionMonitor exceptionMonitor = new ExceptionMonitor(); + // connection.setExceptionListener(exceptionMonitor); + + // Set up the publisher. + CircuitEndBase publisherEnd = createPublisherCircuitEnd(connection, props, uniqueId); + + // Set up the receiver. + CircuitEndBase receiverEnd = createReceiverCircuitEnd(connection, props, uniqueId); + + // Start listening for incoming messages. + connection.start(); + + // Namespace everything up. + LocalPublisherImpl publisher = createPublisherFromCircuitEnd(publisherEnd); + LocalReceiverImpl receiver = createReceiverFromCircuitEnd(receiverEnd); + + result = new LocalCircuitImpl(testProperties, publisher, receiver, connection, publisher.getExceptionMonitor()); + } + catch (JMSException e) + { + throw new RuntimeException("Could not create publisher/receivers pair due to a JMSException.", e); + } + + return result; + } + + /// + /// Creates a local from a . Sub-classes may override this to provide more + /// specialized receivers if necessary. + /// + /// The receiving circuit end. + /// + /// A . + protected LocalReceiverImpl createReceiverFromCircuitEnd(CircuitEndBase receiverEnd) + { + return new LocalReceiverImpl(receiverEnd); + } + + /// + /// Creates a local from a . Sub-classes may override this to provide more + /// specialized receivers if necessary. + /// + /// The publishing circuit end. + /// + /// A . + protected LocalPublisherImpl createPublisherFromCircuitEnd(CircuitEndBase publisherEnd) + { + return new LocalPublisherImpl(publisherEnd); + } + + /// + /// Builds a circuit end suitable for the publishing side of a test circuit, from standard test parameters. + /// + /// The connection to build the circuit end on. + /// The test parameters to configure the circuit end construction. + /// A unique number to being numbering destinations from, to make this circuit unique. + /// + /// A circuit end suitable for the publishing side of a test circuit. + /// + /// Any underlying JMSExceptions are allowed to fall through and fail the creation. + public CircuitEndBase createPublisherCircuitEnd(Connection connection, ParsedProperties testProps, long uniqueId) + throws JMSException + { + log.debug( + "public CircuitEndBase createPublisherCircuitEnd(Connection connection, ParsedProperties testProps, long uniqueId = " + + uniqueId + "): called"); + + // Cast the test properties into a typed interface for convenience. + MessagingTestConfigProperties props = new MessagingTestConfigProperties(testProps); + + // Check that the test properties do not contain AMQP/Qpid specific settings, and fail if they do. + if (props.getImmediate() || props.getMandatory()) + { + throw new RuntimeException( + "Cannot create a pure JMS circuit as the test properties require AMQP specific options."); + } + + Session session = connection.createSession(props.getPublisherTransacted(), props.getAckMode()); + + Destination destination = + props.getPubsub() ? session.createTopic(props.getSendDestinationNameRoot() + "_" + uniqueId) + : session.createQueue(props.getSendDestinationNameRoot() + "_" + uniqueId); + + MessageProducer producer = props.getPublisherProducerBind() ? session.createProducer(destination) : null; + + MessageConsumer consumer = + props.getPublisherConsumerBind() + ? session.createConsumer(session.createQueue(props.getReceiveDestinationNameRoot() + "_" + uniqueId)) : null; + + MessageMonitor messageMonitor = new MessageMonitor(); + + if (consumer != null) + { + consumer.setMessageListener(messageMonitor); + } + + ExceptionMonitor exceptionMonitor = new ExceptionMonitor(); + connection.setExceptionListener(exceptionMonitor); + + if (!props.getPublisherConsumerActive() && (consumer != null)) + { + consumer.close(); + } + + return new CircuitEndBase(producer, consumer, session, messageMonitor, exceptionMonitor); + } + + /// + /// Builds a circuit end suitable for the receiving side of a test circuit, from standard test parameters. + /// + /// The connection to build the circuit end on. + /// The test parameters to configure the circuit end construction. + /// A unique number to being numbering destinations from, to make this circuit unique. + /// + /// A circuit end suitable for the receiving side of a test circuit. + /// + /// Any underlying JMSExceptions are allowed to fall through and fail the creation. + public CircuitEndBase createReceiverCircuitEnd(Connection connection, ParsedProperties testProps, long uniqueId) + throws JMSException + { + log.debug( + "public CircuitEndBase createReceiverCircuitEnd(Connection connection, ParsedProperties testProps, long uniqueId = " + + uniqueId + "): called"); + + // Cast the test properties into a typed interface for convenience. + MessagingTestConfigProperties props = new MessagingTestConfigProperties(testProps); + + // Check that the test properties do not contain AMQP/Qpid specific settings, and fail if they do. + if (props.getImmediate() || props.getMandatory()) + { + throw new RuntimeException( + "Cannot create a pure JMS circuit as the test properties require AMQP specific options."); + } + + Session session = connection.createSession(props.getPublisherTransacted(), props.getAckMode()); + + MessageProducer producer = + props.getReceiverProducerBind() + ? session.createProducer(session.createQueue(props.getReceiveDestinationNameRoot() + "_" + uniqueId)) : null; + + Destination destination = + props.getPubsub() ? session.createTopic(props.getSendDestinationNameRoot() + "_" + uniqueId) + : session.createQueue(props.getSendDestinationNameRoot() + "_" + uniqueId); + + MessageConsumer consumer = + props.getReceiverConsumerBind() + ? ((props.getDurableSubscription() && props.getPubsub()) + ? session.createDurableSubscriber((Topic) destination, "testsub") : session.createConsumer(destination)) + : null; + + MessageMonitor messageMonitor = new MessageMonitor(); + + if (consumer != null) + { + consumer.setMessageListener(messageMonitor); + } + + if (!props.getReceiverConsumerActive() && (consumer != null)) + { + consumer.close(); + } + + return new CircuitEndBase(producer, consumer, session, messageMonitor, null); + } + + /// + /// Sets the sender test client to coordinate the test with. + /// + /// The contact details of the sending client in the test. + public void setSender(TestClientDetails sender) + { + throw new RuntimeException("Not implemented."); + } + + /// + /// Sets the receiving test client to coordinate the test with. + /// + /// The contact details of the sending client in the test. + public void setReceiver(TestClientDetails receiver) + { + throw new RuntimeException("Not implemented."); + } + + /// + /// Supplies the sending test client. + /// + /// The sending test client. + public TestClientDetails getSender() + { + throw new RuntimeException("Not implemented."); + } + + /// + /// Supplies the receiving test client. + /// + /// The receiving test client. + public IList getReceivers() + { + throw new RuntimeException("Not implemented."); + } + + /// + /// Accepts the conversation factory over which to hold the test coordinating conversation. + /// + /// The conversation factory to coordinate the test over. + public void setConversationFactory(ConversationFactory conversationFactory) + { + throw new RuntimeException("Not implemented."); + } + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace Apache.Qpid.Integration.Tests.framework +{ + /// + /// MessageIdentityVector provides a message identification scheme, that matches individual messages with test cases. + /// Test messages are being sent by a number of test clients, sending messages over a set of routes, and being received + /// by another set of test clients. Each test is itself, being run within a test cycle, of which there could be many. It + /// is the job of the test coordinator to request and receive reports from the available test clients, on what has been + /// sent, what has been received, and what errors may have occurred, and to reconcile this information against the + /// assertions being applied by the test case. In order to be able to figure out which messages belong to which test, + /// there needs to be an identification scheme, that the coordinator can use to correlate messages in senders and + /// receiver reports. Every message sent in a test can be associated with this information. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Identify a test case, a handling client id, a circuit end within the client, and a test cycle number. + ///
+ ///

+ public class MessageIdentityVector + { + /// Holds the test case vector component of the message identity vector. + private TestCaseVector testCaseVector; + + /// The unique client id. + private string clientId; + + /// The unique circuit end number within the client id. + private int circuitEndId; + + /// + /// Creates a new identity vector for test messages. + /// + /// The name of the test case generating the messages. + /// The unique id of the client implementing a circuit end that is handling the messages. + /// The unique id number of the circuit end within the client. + /// The cycle iteration number of the test case. + public MessageIdentityVector(string testCase, string clientId, int circuitEndId, int testCycleNumber) + { + this.testCaseVector = new TestCaseVector(testCase, testCycleNumber); + this.clientId = clientId; + this.circuitEndId = circuitEndId; + } + + /// + /// Reports the test case vector component of the message identity vector. + /// + /// The test case vector component of the message identity vector. + public TestCaseVector getTestCaseVector() + { + return testCaseVector; + } + + /// + /// Reports the name of the test case. + /// + /// The name of the test case. + public string getTestCase() + { + return testCaseVector.getTestCase(); + } + + /// + /// Reports the test iteration cycle number within the test case. + /// + /// The test iteration cycle number within the test case. + public int getTestCycleNumber() + { + return testCaseVector.getTestCycleNumber(); + } + + /// + /// Resports the client id. + /// + /// The client id. + public string getClientId() + { + return clientId; + } + + /// + /// Reports the circuit end number within the test client. + /// + /// The circuit end number within the test client. + public int getCircuitEndId() + { + return circuitEndId; + } + + /// + /// Compares this identity vector with another for equality. All fields must match. + /// + /// The identity vector to compare with. + /// + /// true if the identity vector is identical to this one by all fields, false otherwise. + public bool equals(Object o) + { + if (this == o) + { + return true; + } + + if ((o == null) || (getClass() != o.getClass())) + { + return false; + } + + MessageIdentityVector that = (MessageIdentityVector) o; + + if (circuitEndId != that.circuitEndId) + { + return false; + } + + if ((clientId != null) ? (!clientId.equals(that.clientId)) : (that.clientId != null)) + { + return false; + } + + if ((testCaseVector != null) ? (!testCaseVector.equals(that.testCaseVector)) : (that.testCaseVector != null)) + { + return false; + } + + return true; + } + + /// + /// Computes a hash code for this identity vector based on all fields. + /// + /// A hash code for this identity vector based on all fields. + public int hashCode() + { + int result; + result = ((testCaseVector != null) ? testCaseVector.hashCode() : 0); + result = (31 * result) + ((clientId != null) ? clientId.hashCode() : 0); + result = (31 * result) + circuitEndId; + + return result; + } + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; + +using javax.jms.Message; +using javax.jms.MessageListener; + +using java.util.concurrent.atomic.AtomicInteger; + +namespace Apache.Qpid.Integration.Tests.framework +{ + /// + /// MessageMonitor is used to record information about messages received. This will provide methods to check various + /// properties, such as the type, number and content of messages received in order to verify the correct behaviour of + /// tests. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Count incoming messages. + ///
Record time ellapsed since the arrival of the first message. + ///
Reset all counts and timings. + ///
+ ///

+ public class MessageMonitor : MessageListener + { + /// Used for debugging. + private static ILog log = LogManager.GetLogger(typeof(MessageMonitor)); + + /// Holds the count of messages received since the last query. + protected AtomicInteger numMessages = new AtomicInteger(); + + /// Holds the time of arrival of the first message. + protected Long firstMessageTime = null; + + /// + /// Handles received messages. Does Nothing. + /// + /// The message. Ignored. + public void onMessage(Message message) + { + // log.debug("public void onMessage(Message message): called"); + + numMessages.getAndIncrement(); + } + + /// + /// Gets the count of messages. + /// + /// The count of messages. + public int getNumMessage() + { + if (firstMessageTime == null) + { + firstMessageTime = System.nanoTime(); + } + + return numMessages.get(); + } + + /// + /// Gets the time elapsed since the first message arrived, in nanos, or zero if no messages have arrived yet. + /// + /// The time elapsed since the first message arrived, in nanos, or zero if no messages have arrived yet. + public long getTime() + { + if (firstMessageTime != null) + { + return System.nanoTime() - firstMessageTime; + } + else + { + return 0L; + } + } + + /// Resets the message count and timer to zero. + public void reset() + { + numMessages.set(0); + firstMessageTime = null; + } + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; + +using javax.jms.Session; + +using java.util.Properties; + +namespace Apache.Qpid.Integration.Tests.framework +{ + /// + /// MessagingTestConfigProperties defines a set of property names and default values for specifying a messaging topology, + /// and test parameters for running a messaging test over that topology. A Properties object holding some of these + /// properties, superimposed onto the defaults, is used to establish test topologies and control test behaviour. + /// + ///

A complete list of the parameters, default values and comments on their usage is provided here: + /// + ///

+ ///
Parameters
Parameter Default Comments + ///
messageSize 0 Message size in bytes. Not including any headers. + ///
destinationName ping The root name to use to generate destination names to ping. + ///
persistent false Determines whether peristent delivery is used. + ///
transacted false Determines whether messages are sent/received in transactions. + ///
broker tcp://localhost:5672 Determines the broker to connect to. + ///
virtualHost test Determines the virtual host to send all ping over. + ///
rate 0 The maximum rate (in hertz) to send messages at. 0 means no limit. + ///
verbose false The verbose flag for debugging. Prints to console on every message. + ///
pubsub false Whether to ping topics or queues. Uses p2p by default. + ///
username guest The username to access the broker with. + ///
password guest The password to access the broker with. + ///
selector null Not used. Defines a message selector to filter pings with. + ///
destinationCount 1 The number of receivers listening to the pings. + ///
timeout 30000 In milliseconds. The timeout to stop waiting for replies. + ///
commitBatchSize 1 The number of messages per transaction in transactional mode. + ///
uniqueDests true Whether each receivers only listens to one ping destination or all. + ///
durableDests false Whether or not durable destinations are used. + ///
ackMode AUTO_ACK The message acknowledgement mode. Possible values are: + /// 0 - SESSION_TRANSACTED + /// 1 - AUTO_ACKNOWLEDGE + /// 2 - CLIENT_ACKNOWLEDGE + /// 3 - DUPS_OK_ACKNOWLEDGE + /// 257 - NO_ACKNOWLEDGE + /// 258 - PRE_ACKNOWLEDGE + ///
maxPending 0 The maximum size in bytes, of messages sent but not yet received. + /// Limits the volume of messages currently buffered on the client + /// or broker. Can help scale test clients by limiting amount of buffered + /// data to avoid out of memory errors. + ///
+ /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Provide the names and defaults of all test parameters. + ///
+ ///

+ /// + /// Put a type-safe wrapper around these properties, but continue to store the parameters as properties. This is + /// simply to ensure that it is a simple matter to serialize/deserialize string/string pairs onto messages. + public class MessagingTestConfigProperties extends ParsedProperties + { + // ====================== Connection Properties ================================== + + /// Holds the name of the default connection configuration. + public static final string CONNECTION_NAME = "broker"; + + /// Holds the name of the property to get the initial context factory name from. + public static final string INITIAL_CONTEXT_FACTORY_PROPNAME = "java.naming.factory.initial"; + + /// Defines the class to use as the initial context factory by default. + public static final string INITIAL_CONTEXT_FACTORY_DEFAULT = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"; + + /// Holds the name of the property to get the test broker url from. + public static final string BROKER_PROPNAME = "qpid.test.broker"; + + /// Holds the default broker url for the test. + public static final string BROKER_DEFAULT = "vm://:1"; + + /// Holds the name of the property to get the test broker virtual path. + public static final string VIRTUAL_HOST_PROPNAME = "virtualHost"; + + /// Holds the default virtual path for the test. + public static final string VIRTUAL_HOST_DEFAULT = ""; + + /// Holds the name of the property to get the broker access username from. + public static final string USERNAME_PROPNAME = "username"; + + /// Holds the default broker log on username. + public static final string USERNAME_DEFAULT = "guest"; + + /// Holds the name of the property to get the broker access password from. + public static final string PASSWORD_PROPNAME = "password"; + + /// Holds the default broker log on password. + public static final string PASSWORD_DEFAULT = "guest"; + + // ====================== Messaging Topology Properties ========================== + + /// Holds the name of the property to get the bind publisher procuder flag from. + public static final string PUBLISHER_PRODUCER_BIND_PROPNAME = "publisherProducerBind"; + + /// Holds the default value of the publisher producer flag. + public static final bool PUBLISHER_PRODUCER_BIND_DEFAULT = true; + + /// Holds the name of the property to get the bind publisher procuder flag from. + public static final string PUBLISHER_CONSUMER_BIND_PROPNAME = "publisherConsumerBind"; + + /// Holds the default value of the publisher consumer flag. + public static final bool PUBLISHER_CONSUMER_BIND_DEFAULT = false; + + /// Holds the name of the property to get the bind receivers procuder flag from. + public static final string RECEIVER_PRODUCER_BIND_PROPNAME = "receiverProducerBind"; + + /// Holds the default value of the receivers producer flag. + public static final bool RECEIVER_PRODUCER_BIND_DEFAULT = false; + + /// Holds the name of the property to get the bind receivers procuder flag from. + public static final string RECEIVER_CONSUMER_BIND_PROPNAME = "receiverConsumerBind"; + + /// Holds the default value of the receivers consumer flag. + public static final bool RECEIVER_CONSUMER_BIND_DEFAULT = true; + + /// Holds the name of the property to get the publishers consumer active flag from. + public static final string PUBLISHER_CONSUMER_ACTIVE_PROPNAME = "publisherConsumerActive"; + + /// Holds the default value of the publishers consumer active flag. + public static final bool PUBLISHER_CONSUMER_ACTIVE_DEFAULT = true; + + /// Holds the name of the property to get the receivers consumer active flag from. + public static final string RECEIVER_CONSUMER_ACTIVE_PROPNAME = "receiverConsumerActive"; + + /// Holds the default value of the receivers consumer active flag. + public static final bool RECEIVER_CONSUMER_ACTIVE_DEFAULT = true; + + /// Holds the name of the property to get the destination name root from. + public static final string SEND_DESTINATION_NAME_ROOT_PROPNAME = "sendDestinationRoot"; + + /// Holds the root of the name of the default destination to send to. + public static final string SEND_DESTINATION_NAME_ROOT_DEFAULT = "sendTo"; + + /// Holds the name of the property to get the destination name root from. + public static final string RECEIVE_DESTINATION_NAME_ROOT_PROPNAME = "receiveDestinationRoot"; + + /// Holds the root of the name of the default destination to send to. + public static final string RECEIVE_DESTINATION_NAME_ROOT_DEFAULT = "receiveFrom"; + + /// Holds the name of the proeprty to get the destination count from. + public static final string DESTINATION_COUNT_PROPNAME = "destinationCount"; + + /// Defines the default number of destinations to ping. + public static final int DESTINATION_COUNT_DEFAULT = 1; + + /// Holds the name of the property to get the p2p or pub/sub messaging mode from. + public static final string PUBSUB_PROPNAME = "pubsub"; + + /// Holds the pub/sub mode default, true means ping a topic, false means ping a queue. + public static final bool PUBSUB_DEFAULT = false; + + // ====================== JMS Options and Flags ================================= + + /// Holds the name of the property to get the test delivery mode from. + public static final string PERSISTENT_MODE_PROPNAME = "persistent"; + + /// Holds the message delivery mode to use for the test. + public static final bool PERSISTENT_MODE_DEFAULT = false; + + /// Holds the name of the property to get the test transactional mode from. + public static final string TRANSACTED_PUBLISHER_PROPNAME = "transactedPublisher"; + + /// Holds the transactional mode to use for the test. + public static final bool TRANSACTED_PUBLISHER_DEFAULT = false; + + /// Holds the name of the property to get the test transactional mode from. + public static final string TRANSACTED_RECEIVER_PROPNAME = "transactedReceiver"; + + /// Holds the transactional mode to use for the test. + public static final bool TRANSACTED_RECEIVER_DEFAULT = false; + + /// Holds the name of the property to set the no local flag from. + public static final string NO_LOCAL_PROPNAME = "noLocal"; + + /// Defines the default value of the no local flag to use when consuming messages. + public static final bool NO_LOCAL_DEFAULT = false; + + /// Holds the name of the property to get the message acknowledgement mode from. + public static final string ACK_MODE_PROPNAME = "ackMode"; + + /// Defines the default message acknowledgement mode. + public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE; + + /// Holds the name of the property to get the durable subscriptions flag from, when doing pub/sub messaging. + public static final string DURABLE_SUBSCRIPTION_PROPNAME = "durableSubscription"; + + /// Defines the default value of the durable subscriptions flag. + public static final bool DURABLE_SUBSCRIPTION_DEFAULT = false; + + // ====================== Qpid/AMQP Options and Flags ================================ + + /// Holds the name of the property to set the exclusive flag from. + public static final string EXCLUSIVE_PROPNAME = "exclusive"; + + /// Defines the default value of the exclusive flag to use when consuming messages. + public static final bool EXCLUSIVE_DEFAULT = false; + + /// Holds the name of the property to set the immediate flag from. + public static final string IMMEDIATE_PROPNAME = "immediate"; + + /// Defines the default value of the immediate flag to use when sending messages. + public static final bool IMMEDIATE_DEFAULT = false; + + /// Holds the name of the property to set the mandatory flag from. + public static final string MANDATORY_PROPNAME = "mandatory"; + + /// Defines the default value of the mandatory flag to use when sending messages. + public static final bool MANDATORY_DEFAULT = false; + + /// Holds the name of the property to get the durable destinations flag from. + public static final string DURABLE_DESTS_PROPNAME = "durableDests"; + + /// Default value for the durable destinations flag. + public static final bool DURABLE_DESTS_DEFAULT = false; + + /// Holds the name of the property to set the prefetch size from. + public static final string PREFETCH_PROPNAME = "prefetch"; + + /// Defines the default prefetch size to use when consuming messages. + public static final int PREFETCH_DEFAULT = 100; + + // ====================== Common Test Parameters ================================ + + /// Holds the name of the property to get the test message size from. + public static final string MESSAGE_SIZE_PROPNAME = "messageSize"; + + /// Used to set up a default message size. + public static final int MESSAGE_SIZE_DEAFULT = 0; + + /// Holds the name of the property to get the message rate from. + public static final string RATE_PROPNAME = "rate"; + + /// Defines the default rate (in pings per second) to send pings at. 0 means as fast as possible, no restriction. + public static final int RATE_DEFAULT = 0; + + /// Holds the name of the proeprty to get the. + public static final string SELECTOR_PROPNAME = "selector"; + + /// Holds the default message selector. + public static final string SELECTOR_DEFAULT = ""; + + /// Holds the name of the property to get the waiting timeout for response messages. + public static final string TIMEOUT_PROPNAME = "timeout"; + + /// Default time to wait before assuming that a ping has timed out. + public static final long TIMEOUT_DEFAULT = 30000; + + /// Holds the name of the property to get the commit batch size from. + public static final string TX_BATCH_SIZE_PROPNAME = "commitBatchSize"; + + /// Defines the default number of pings to send in each transaction when running transactionally. + public static final int TX_BATCH_SIZE_DEFAULT = 1; + + /// Holds the name of the property to set the maximum amount of pending message data for a producer to hold. + public static final string MAX_PENDING_PROPNAME = "maxPending"; + + /// Defines the default maximum quantity of pending message data to allow producers to hold. + public static final int MAX_PENDING_DEFAULT = 0; + + /// Holds the name of the property to get the publisher rollback flag from. + public static final string ROLLBACK_PUBLISHER_PROPNAME = "rollbackPublisher"; + + /// Holds the default publisher roll back setting. + public static final bool ROLLBACK_PUBLISHER_DEFAULT = false; + + /// Holds the name of the property to get the publisher rollback flag from. + public static final string ROLLBACK_RECEIVER_PROPNAME = "rollbackReceiver"; + + /// Holds the default publisher roll back setting. + public static final bool ROLLBACK_RECEIVER_DEFAULT = false; + + // ====================== Options that control the bahviour of the test framework. ========================= + + /// Holds the name of the property to get the behavioural mode of not applicable assertions. + public static final string NOT_APPLICABLE_ASSERTION_PROPNAME = "notApplicableAssertion"; + + /// Holds the default behavioral mode of not applicable assertions, which is logging them as a warning. + public static final string NOT_APPLICABLE_ASSERTION_DEFAULT = "warn"; + + /// Holds the name of the property to get the verbose mode proeprty from. + public static final string VERBOSE_PROPNAME = "verbose"; + + /// Holds the default verbose mode. + public static final bool VERBOSE_DEFAULT = false; + + /// Holds the default configuration properties. + public static ParsedProperties defaults = new ParsedProperties(); + + static + { + defaults.setPropertyIfNull(INITIAL_CONTEXT_FACTORY_PROPNAME, INITIAL_CONTEXT_FACTORY_DEFAULT); + defaults.setPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT); + defaults.setPropertyIfNull(VIRTUAL_HOST_PROPNAME, VIRTUAL_HOST_DEFAULT); + defaults.setPropertyIfNull(USERNAME_PROPNAME, USERNAME_DEFAULT); + defaults.setPropertyIfNull(PASSWORD_PROPNAME, PASSWORD_DEFAULT); + + defaults.setPropertyIfNull(PUBLISHER_PRODUCER_BIND_PROPNAME, PUBLISHER_PRODUCER_BIND_DEFAULT); + defaults.setPropertyIfNull(PUBLISHER_CONSUMER_BIND_PROPNAME, PUBLISHER_CONSUMER_BIND_DEFAULT); + defaults.setPropertyIfNull(RECEIVER_PRODUCER_BIND_PROPNAME, RECEIVER_PRODUCER_BIND_DEFAULT); + defaults.setPropertyIfNull(RECEIVER_CONSUMER_BIND_PROPNAME, RECEIVER_CONSUMER_BIND_DEFAULT); + defaults.setPropertyIfNull(PUBLISHER_CONSUMER_ACTIVE_PROPNAME, PUBLISHER_CONSUMER_ACTIVE_DEFAULT); + defaults.setPropertyIfNull(RECEIVER_CONSUMER_ACTIVE_PROPNAME, RECEIVER_CONSUMER_ACTIVE_DEFAULT); + defaults.setPropertyIfNull(SEND_DESTINATION_NAME_ROOT_PROPNAME, SEND_DESTINATION_NAME_ROOT_DEFAULT); + defaults.setPropertyIfNull(RECEIVE_DESTINATION_NAME_ROOT_PROPNAME, RECEIVE_DESTINATION_NAME_ROOT_DEFAULT); + defaults.setPropertyIfNull(DESTINATION_COUNT_PROPNAME, DESTINATION_COUNT_DEFAULT); + defaults.setPropertyIfNull(PUBSUB_PROPNAME, PUBSUB_DEFAULT); + + defaults.setPropertyIfNull(PERSISTENT_MODE_PROPNAME, PERSISTENT_MODE_DEFAULT); + defaults.setPropertyIfNull(TRANSACTED_PUBLISHER_PROPNAME, TRANSACTED_PUBLISHER_DEFAULT); + defaults.setPropertyIfNull(TRANSACTED_RECEIVER_PROPNAME, TRANSACTED_RECEIVER_DEFAULT); + defaults.setPropertyIfNull(NO_LOCAL_PROPNAME, NO_LOCAL_DEFAULT); + defaults.setPropertyIfNull(ACK_MODE_PROPNAME, ACK_MODE_DEFAULT); + defaults.setPropertyIfNull(DURABLE_SUBSCRIPTION_PROPNAME, DURABLE_SUBSCRIPTION_DEFAULT); + + defaults.setPropertyIfNull(EXCLUSIVE_PROPNAME, EXCLUSIVE_DEFAULT); + defaults.setPropertyIfNull(IMMEDIATE_PROPNAME, IMMEDIATE_DEFAULT); + defaults.setPropertyIfNull(MANDATORY_PROPNAME, MANDATORY_DEFAULT); + defaults.setPropertyIfNull(DURABLE_DESTS_PROPNAME, DURABLE_DESTS_DEFAULT); + defaults.setPropertyIfNull(PREFETCH_PROPNAME, PREFETCH_DEFAULT); + + defaults.setPropertyIfNull(MESSAGE_SIZE_PROPNAME, MESSAGE_SIZE_DEAFULT); + defaults.setPropertyIfNull(RATE_PROPNAME, RATE_DEFAULT); + defaults.setPropertyIfNull(SELECTOR_PROPNAME, SELECTOR_DEFAULT); + defaults.setPropertyIfNull(TIMEOUT_PROPNAME, TIMEOUT_DEFAULT); + defaults.setPropertyIfNull(TX_BATCH_SIZE_PROPNAME, TX_BATCH_SIZE_DEFAULT); + defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT); + defaults.setPropertyIfNull(ROLLBACK_PUBLISHER_PROPNAME, ROLLBACK_PUBLISHER_DEFAULT); + defaults.setPropertyIfNull(ROLLBACK_RECEIVER_PROPNAME, ROLLBACK_RECEIVER_DEFAULT); + + defaults.setPropertyIfNull(NOT_APPLICABLE_ASSERTION_PROPNAME, NOT_APPLICABLE_ASSERTION_DEFAULT); + defaults.setPropertyIfNull(VERBOSE_PROPNAME, VERBOSE_DEFAULT); + } + + /// Creates a test configuration based on the defaults. + public MessagingTestConfigProperties() + { + super(defaults); + } + + /// + /// Creates a test configuration based on the supplied properties. + /// + /// The test configuration. + public MessagingTestConfigProperties(Properties properties) + { + super(properties); + } + + /// + /// The size of test messages to send. + /// + /// The size of test messages to send. + public int getMessageSize() + { + return getPropertyAsInteger(MESSAGE_SIZE_PROPNAME); + } + + /// + /// Flag to indicate that the publishing producer should be set up to publish to a destination. + /// + /// Flag to indicate that the publishing producer should be set up to publish to a destination. + public bool getPublisherProducerBind() + { + return getPropertyAsBoolean(PUBLISHER_PRODUCER_BIND_PROPNAME); + } + + /// + /// Flag to indicate that the publishing consumer should be set up to receive from a destination. + /// + /// Flag to indicate that the publishing consumer should be set up to receive from a destination. + public bool getPublisherConsumerBind() + { + return getPropertyAsBoolean(PUBLISHER_CONSUMER_BIND_PROPNAME); + } + + /// + /// Flag to indicate that the receiving producer should be set up to publish to a destination. + /// + /// Flag to indicate that the receiving producer should be set up to publish to a destination. + public bool getReceiverProducerBind() + { + return getPropertyAsBoolean(RECEIVER_PRODUCER_BIND_PROPNAME); + } + + /// + /// Flag to indicate that the receiving consumer should be set up to receive from a destination. + /// + /// Flag to indicate that the receiving consumer should be set up to receive from a destination. + public bool getReceiverConsumerBind() + { + return getPropertyAsBoolean(RECEIVER_CONSUMER_BIND_PROPNAME); + } + + /// + /// Flag to indicate that the publishing consumer should be created and actively listening. + /// + /// Flag to indicate that the publishing consumer should be created. + public bool getPublisherConsumerActive() + { + return getPropertyAsBoolean(PUBLISHER_CONSUMER_ACTIVE_PROPNAME); + } + + /// + /// Flag to indicate that the receiving consumers should be created and actively listening. + /// + /// Flag to indicate that the receiving consumers should be created and actively listening. + public bool getReceiverConsumerActive() + { + return getPropertyAsBoolean(RECEIVER_CONSUMER_ACTIVE_PROPNAME); + } + + /// + /// A root to create all test destination names from. + /// + /// A root to create all test destination names from. + public string getSendDestinationNameRoot() + { + return getProperty(SEND_DESTINATION_NAME_ROOT_PROPNAME); + } + + /// + /// A root to create all receiving destination names from. + /// + /// A root to create all receiving destination names from. + public string getReceiveDestinationNameRoot() + { + return getProperty(RECEIVE_DESTINATION_NAME_ROOT_PROPNAME); + } + + /// + /// Flag to indicate that persistent messages should be used. + /// + /// Flag to indicate that persistent messages should be used. + public bool getPersistentMode() + { + return getPropertyAsBoolean(PERSISTENT_MODE_PROPNAME); + } + + /// + /// Flag to indicate that transactional messages should be sent by the publisher. + /// + /// Flag to indicate that transactional messages should be sent by the publisher. + public bool getPublisherTransacted() + { + return getPropertyAsBoolean(TRANSACTED_PUBLISHER_PROPNAME); + } + + /// + /// Flag to indicate that transactional receives should be used by the receiver. + /// + /// Flag to indicate that transactional receives should be used by the receiver. + public bool getReceiverTransacted() + { + return getPropertyAsBoolean(TRANSACTED_PUBLISHER_PROPNAME); + } + + /// + /// The name of the virtual host to run all tests over. + /// + /// The name of the virtual host to run all tests over. + public string getVirtualHost() + { + return getProperty(VIRTUAL_HOST_PROPNAME); + } + + /// + /// Limiting rate for each sender in messages per second, or zero for unlimited. + /// + /// Limiting rate for each sender in messages per second, or zero for unlimited. + public string getRate() + { + return getProperty(RATE_PROPNAME); + } + + /// + /// Flag to indicate that test messages should be received publish/subscribe style by all receivers. + /// + /// Flag to indicate that test messages should be received publish/subscribe style by all receivers. + public bool getPubsub() + { + return getPropertyAsBoolean(PUBSUB_PROPNAME); + } + + /// + /// The username credentials to run tests with. + /// + /// The username credentials to run tests with. + public string getUsername() + { + return getProperty(USERNAME_PROPNAME); + } + + /// + /// The password credentials to run tests with. + /// + /// The password credentials to run tests with. + public string getPassword() + { + return getProperty(PASSWORD_PROPNAME); + } + + /// + /// The timeout duration to fail tests on, should they receive no messages within it. + /// + /// The timeout duration to fail tests on, should they receive no messages within it. + public long getTimeout() + { + return getPropertyAsLong(TIMEOUT_PROPNAME); + } + + /// + /// The number of messages to batch into each transaction in transational tests. + /// + /// The number of messages to batch into each transaction in transational tests. + public int getTxBatchSize() + { + return getPropertyAsInteger(TX_BATCH_SIZE_PROPNAME); + } + + /// + /// Flag to indicate that tests should use durable destinations. + /// + /// Flag to indicate that tests should use durable destinations. + public bool getDurableDests() + { + return getPropertyAsBoolean(DURABLE_DESTS_PROPNAME); + } + + /// + /// The ack mode for message receivers to use. + /// + /// The ack mode for message receivers to use. + public int getAckMode() + { + return getPropertyAsInteger(ACK_MODE_PROPNAME); + } + + /// + /// Flag to indicate that tests should use durable subscriptions. + /// + /// Flag to indicate that tests should use durable subscriptions. + public bool getDurableSubscription() + { + return getPropertyAsBoolean(DURABLE_SUBSCRIPTION_PROPNAME); + } + + /// + /// The maximum amount of in-flight data, in bytes, that tests should send at any time. + /// + /// The maximum amount of in-flight data, in bytes, that tests should send at any time. + public int getMaxPending() + { + return getPropertyAsInteger(MAX_PENDING_PROPNAME); + } + + /// + /// The size of the prefetch queue to use. + /// + /// The size of the prefetch queue to use. + public int getPrefetch() + { + return getPropertyAsInteger(PREFETCH_PROPNAME); + } + + /// + /// Flag to indicate that subscriptions should be no-local. + /// + /// Flag to indicate that subscriptions should be no-local. + public bool getNoLocal() + { + return getPropertyAsBoolean(NO_LOCAL_PROPNAME); + } + + /// + /// Flag to indicate that subscriptions should be exclusive. + /// + /// Flag to indicate that subscriptions should be exclusive. + public bool getExclusive() + { + return getPropertyAsBoolean(EXCLUSIVE_PROPNAME); + } + + /// + /// Flag to indicate that messages must be delivered immediately. + /// + /// Flag to indicate that messages must be delivered immediately. + public bool getImmediate() + { + return getPropertyAsBoolean(IMMEDIATE_PROPNAME); + } + + /// + /// Flag to indicate that messages must be routable. + /// + /// Flag to indicate that messages must be routable. + public bool getMandatory() + { + return getPropertyAsBoolean(MANDATORY_PROPNAME); + } + + /// + /// Gets the value of a flag to indicate that the publisher should rollback all messages sent. + /// + /// A flag to indicate that the publisher should rollback all messages sent. + public bool getRollbackPublisher() + { + return getPropertyAsBoolean(ROLLBACK_PUBLISHER_PROPNAME); + } + + /// + /// Gets the value of a flag to indicate that the receiver should rollback all messages received, then receive them + /// again. + /// + /// A flag to indicate that the publisher should rollback all messages received. + public bool getRollbackReceiver() + { + return getPropertyAsBoolean(ROLLBACK_RECEIVER_PROPNAME); + } + + /// + /// Gets the behavioural mode of not applicable assertions. Should be one of 'quiet', 'warn' or 'fail'. + /// + /// The behavioural mode of not applicable assertions. + public string getNotApplicableAssertionMode() + { + return getProperty(NOT_APPLICABLE_ASSERTION_PROPNAME); + } + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; + +using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; + +namespace Apache.Qpid.Integration.Tests.framework +{ + /// + /// NotApplicableAssertion is a messaging assertion that can be used when an assertion requested by a test-case is not + /// applicable to the testing scenario. For example an assertion may relate to AMQP functionality, but a test case may be + /// being run over a non-AMQP JMS implementation, in which case the request to create the assertion may return this + /// instead of the proper assertion. The test framework is configurable to quietly drop these assertions, log them + /// as warnings to the console, or raise them as test failures. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Quitely pass. + ///
Log a warning. + ///
Raise a test failure. + ///
+ ///

+ public class NotApplicableAssertion : Assertion + { + /// Used for logging to the console. + private static ILog console = LogManager.GetLogger("CONSOLE." + NotApplicableAssertion.class.getName()); + + /// The possible behavioural modes of this assertion. + private enum Mode + { + /// Quietly ignore the assertion by passing. + Quiet, + + /// Ignore the assertion by passing but log a warning about it. + Warn, + + /// Fail the assertion. + Fail; + } + + /// The behavioural mode of the assertion. + private Mode mode; + + /// + /// Creates an assertion that is driven by the value of the 'notApplicableAssertion' property of the test + /// configuration. Its value should match one of 'quiet', 'warn' or 'fail' and if it does not it is automatically + /// read as 'fail'. + /// + /// The test configuration properties. + public NotApplicableAssertion(ParsedProperties testProperties) + { + // Cast the test properties into a typed interface for convenience. + MessagingTestConfigProperties props = new MessagingTestConfigProperties(testProperties); + + string modeName = props.getNotApplicableAssertionMode(); + + if ("quiet".equals(modeName)) + { + mode = Mode.Quiet; + } + else if ("warn".equals(modeName)) + { + mode = Mode.Warn; + } + else + { + mode = Mode.Fail; + } + } + + /// + /// Applies the assertion. + /// + /// true if the assertion passes, false if it fails. + public bool apply() + { + switch (mode) + { + case Quiet: + return true; + + case Warn: + console.warn("Warning: Not applicable assertion being ignored."); + + return true; + + case Fail: + default: + return false; + } + } + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; + +namespace Apache.Qpid.Integration.Tests.framework +{ + /// + /// A Publisher represents the status of the publishing side of a test circuit. Its main purpose is to provide assertions + /// that can be applied to test the behaviour of the publishers. + /// + ///

+ ///
CRC Card
Responsibilities + ///
Provide assertion that the publishers received no exceptions. + ///
+ ///

+ /// + /// There are mixtures of AMQP and JMS assertions in this interface. Either keep them here, but quietly (or with a + /// warning or error) drop them from test cases where they are not relevant, or push them down into sub-classes. + /// I am tempted to go with the dropping/warning/error approach, that would imply that it makes sense to pull + /// the assertions back from AMQPPublisher to here. + public interface Publisher + { + // Assertions that are meaningfull to AMQP and to JMS. + + /// + /// Provides an assertion that the publisher encountered no exceptions. + /// + /// The test configuration properties. + /// + /// An assertion that the publisher encountered no exceptions. + public Assertion noExceptionsAssertion(ParsedProperties testProps); + + // Assertions that are meaningfull only to AMQP. + + /// + /// Provides an assertion that the AMQP channel was forcibly closed by an error condition. + /// + /// The test configuration properties. + /// + /// An assertion that the AMQP channel was forcibly closed by an error condition. + public Assertion channelClosedAssertion(ParsedProperties testProps); + + // Assertions that are meaningfull only to Java/JMS. + + /// + /// Provides an assertion that the publisher got a given exception during the test. + /// + /// The test configuration properties. + /// The exception class to check for. + /// + /// An assertion that the publisher got a given exception during the test. + public Assertion exceptionAssertion(ParsedProperties testProps, Class exceptionClass); + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; + +namespace Apache.Qpid.Integration.Tests.framework +{ + /// + /// A Receiver is a that represents the status of the receiving side of a test circuit. Its main + /// purpose is to provide assertions that can be applied to check the behaviour of the receivers. + /// + ///

+ ///
CRC Card
Responsibilities + ///
Provide assertion that the receivers received no exceptions. + ///
Provide assertion that the receivers received all test messages sent to it. + ///
+ ///

+ /// + /// There are mixtures of AMQP and JMS assertions in this interface. Either keep them here, but quietly (or with a + /// warning or error) drop them from test cases where they are not relevant, or push them down into sub-classes. + /// I am tempted to go with the dropping/warning/error approach. + public interface Receiver + { + // Assertions that are meaningfull to AMQP and to JMS. + + /// + /// Provides an assertion that the receivers encountered no exceptions. + /// + /// The test configuration properties. + /// + /// An assertion that the receivers encountered no exceptions. + public Assertion noExceptionsAssertion(ParsedProperties testProps); + + /// + /// Provides an assertion that the receivers got all messages that were sent to it. + /// + /// The test configuration properties. + /// + /// An assertion that the receivers got all messages that were sent to it. + public Assertion allMessagesReceivedAssertion(ParsedProperties testProps); + + /// + /// Provides an assertion that the receivers got none of the messages that were sent to it. + /// + /// The test configuration properties. + /// + /// An assertion that the receivers got none of the messages that were sent to it. + public Assertion noMessagesReceivedAssertion(ParsedProperties testProps); + + // Assertions that are meaningfull only to AMQP. + + /// + /// Provides an assertion that the AMQP channel was forcibly closed by an error condition. + /// + /// The test configuration properties. + /// + /// An assertion that the AMQP channel was forcibly closed by an error condition. + public Assertion channelClosedAssertion(ParsedProperties testProps); + + // Assertions that are meaningfull only to Java/JMS. + + /// + /// Provides an assertion that the receiver got a given exception during the test. + /// + /// The test configuration properties. + /// The exception class to check for. + /// + /// An assertion that the receiver got a given exception during the test. + public Assertion exceptionAssertion(ParsedProperties testProps, Class exceptionClass); + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; + +using Apache.Qpid.Integration.Tests.framework.Circuit; +using Apache.Qpid.Integration.Tests.framework.TestClientDetails; +using org.apache.qpid.util.ConversationFactory; + +using System.Collections.Generic.LinkedList; +using System.Collections.Generic.IList; +using java.util.Properties; + +namespace Apache.Qpid.Integration.Tests.framework.sequencers +{ + /// + /// BaseCircuitFactory provides some functionality common to all s, such as the details of + /// all s that make up the end-points of + /// the circuits that the factory creates, and an active that can be used to generate + /// control conversations with those circuit end-points. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Hold the details of the sending and receiving end-points to create circuits from. + ///
Provide a conversation factory to create control conversations with the end-points. + ///
+ ///

+ public abstract class BaseCircuitFactory : CircuitFactory + { + /// Used for debugging. + private static ILog log = LogManager.GetLogger(typeof(BaseCircuitFactory)); + + /// Holds the contact details for the sending test client. + protected TestClientDetails sender; + + /// Holds the contact details for the receving test client. + protected IList receivers = new LinkedList(); + + /// Holds the conversation factory over which to coordinate the test. + protected ConversationFactory conversationFactory; + + /// + /// Creates a test circuit for the test, configered by the test parameters specified. + /// + /// The test parameters. + /// A test circuit. + public Circuit createCircuit(Properties testProperties) + { + throw new RuntimeException("Not implemented."); + } + + /// + /// Sets the sender test client to coordinate the test with. + /// + /// The contact details of the sending client in the test. + public void setSender(TestClientDetails sender) + { + log.debug("public void setSender(TestClientDetails sender = " + sender + "): called"); + + this.sender = sender; + } + + /// + /// Sets the receiving test client to coordinate the test with. + /// + /// The contact details of the sending client in the test. + public void setReceiver(TestClientDetails receiver) + { + log.debug("public void setReceiver(TestClientDetails receivers = " + receiver + "): called"); + + this.receivers.add(receiver); + } + + /// + /// Supplies the sending test client. + /// + /// The sending test client. + public TestClientDetails getSender() + { + return sender; + } + + /// + /// Supplies the receiving test client. + /// + /// The receiving test client. + public IList getReceivers() + { + return receivers; + } + + /// + /// Accepts the conversation factory over which to hold the test coordinating conversation. + /// + /// The conversation factory to coordinate the test over. + public void setConversationFactory(ConversationFactory conversationFactory) + { + this.conversationFactory = conversationFactory; + } + + /// + /// Provides the conversation factory for providing the distributed test sequencing conversations over the test + /// connection. + /// + /// The conversation factory to create test sequencing conversations with. + public ConversationFactory getConversationFactory() + { + return conversationFactory; + } + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using Apache.Qpid.Integration.Tests.framework.Assertion; +using Apache.Qpid.Integration.Tests.framework.Circuit; +using Apache.Qpid.Integration.Tests.framework.TestClientDetails; +using org.apache.qpid.util.ConversationFactory; + +using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; + +using javax.jms.JMSException; +using javax.jms.Message; + +using System.Collections.Generic.IList; +using System.Collections.Generic.IDictionary; +using java.util.Properties; + +namespace Apache.Qpid.Integration.Tests.framework.sequencers +{ + /// + /// A CircuitFactory is responsibile for creating test circuits appropriate to the context that a test case is + /// running in, and providing an implementation of a standard test procedure over a test circuit. + /// + ///

+ ///
CRC Card
Responsibilities + ///
Provide a standard test procedure over a test circuit. + ///
Construct test circuits appropriate to a tests context. + ///
+ ///

+ public interface CircuitFactory + { + /// + /// Holds a test coordinating conversation with the test clients. This should consist of assigning the test roles, + /// begining the test, gathering the test reports from the participants, and checking for assertion failures against + /// the test reports. + /// + /// The test circuit. + /// The list of assertions to apply to the test circuit. + /// The test case definition. + /// + /// @deprecated Use test circuits and Circuit.test instead. + public void sequenceTest(Circuit testCircuit, IList assertions, Properties testProperties); + + /// + /// Creates a test circuit for the test, configered by the test parameters specified. + /// + /// The test parameters. + /// + /// A test circuit. + public Circuit createCircuit(ParsedProperties testProperties); + + /// + /// Sets the sender test client to coordinate the test with. + /// + /// The contact details of the sending client in the test. + public void setSender(TestClientDetails sender); + + /// + /// Sets the receiving test client to coordinate the test with. + /// + /// The contact details of the sending client in the test. + public void setReceiver(TestClientDetails receiver); + + /// + /// Supplies the sending test client. + /// + /// The sending test client. + public TestClientDetails getSender(); + + /// + /// Supplies the receiving test client. + /// + /// The receiving test client. + public IList getReceivers(); + + /// + /// Accepts the conversation factory over which to hold the test coordinating conversation. + /// + /// The conversation factory to coordinate the test over. + public void setConversationFactory(ConversationFactory conversationFactory); + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; + +using Apache.Qpid.Integration.Tests.framework.Assertion; +using Apache.Qpid.Integration.Tests.framework.Circuit; +using Apache.Qpid.Integration.Tests.framework.TestClientDetails; +using Apache.Qpid.Integration.Tests.framework.TestUtils; +using Apache.Qpid.Integration.Tests.framework.distributedcircuit.DistributedCircuitImpl; +using org.apache.qpid.util.ConversationFactory; + +using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; + +using javax.jms.Destination; +using javax.jms.JMSException; +using javax.jms.Message; +using javax.jms.Session; + +using System.Collections.Generic.LinkedList; +using System.Collections.Generic.IList; +using java.util.Properties; + +namespace Apache.Qpid.Integration.Tests.framework.sequencers +{ + /// + /// FanOutCircuitFactory is a circuit factory that creates distributed test circuits. Given a set of participating + /// test client nodes, it assigns one node to the SENDER role and the remainder to the RECEIVER role. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Create distributed circuits from one to many test nodes, for fanout style testing. + ///
+ ///

+ /// + /// Adapt this to be an n*m topology circuit factory. Need to add circuit topology definitions to the test + /// parameters. Place n senders onto the available test clients, and m receivers. Where n or m is larger than + /// the available nodes, start stacking multiple test clients on each node. There will also be an option that + /// indicates whether nodes can play both roles, and how many nodes out of all available may be assigned to + /// each role. + /// + /// The createCircuit methods on this and InteropCircuitFactory are going to be identical. This is because the + /// partitioning into senders and receivers is already done by the test decorators. Either eliminate these factories + /// as unnesesary, or move the partitioning functionality into the factories, in which case the test decorators + /// can probably be merged or eliminated. There is confusion over the placement of responsibilities between the + /// factories and the test decorators... although the test decorators may well do more than just circuit creation + /// in the future. For example, there may have to be a special decorator for test repetition that does one circuit + /// creation, but the runs many tests over it, in which case the handling of responsibilities becomes clearer. + public class FanOutCircuitFactory extends BaseCircuitFactory + { + /// Used for debugging. + private static ILog log = LogManager.GetLogger(typeof(FanOutCircuitFactory)); + + /// + /// Creates a test circuit for the test, configered by the test parameters specified. + /// + /// The test parameters. + /// A test circuit. + public Circuit createCircuit(ParsedProperties testProperties) + { + log.debug("public Circuit createCircuit(ParsedProperties testProperties): called"); + + IList senders = new LinkedList(); + senders.add(getSender()); + IList receivers = getReceivers(); + ConversationFactory conversationFactory = getConversationFactory(); + + return DistributedCircuitImpl.createCircuit(testProperties, senders, receivers, conversationFactory); + } + + /// + /// Holds a test coordinating conversation with the test clients. This should consist of assigning the test roles, + /// begining the test, gathering the test reports from the participants, and checking for assertion failures against + /// the test reports. + /// + /// The test circuit. + /// The list of assertions to apply to the test circuit. + /// The test case definition. + /// + /// @deprecated Scheduled for removal once existing tests converted over to use test circuits. + public void sequenceTest(Circuit testCircuit, IList assertions, Properties testProperties) + { + log.debug("protected Message[] sequenceTest(Object... testProperties = " + testProperties + "): called"); + + TestClientDetails sender = getSender(); + IList receivers = getReceivers(); + ConversationFactory conversationFactory = getConversationFactory(); + + try + { + // Create a conversation on the sender clients private control route. + Session session = conversationFactory.getSession(); + Destination senderControlTopic = session.createTopic(sender.privateControlKey); + ConversationFactory.Conversation senderConversation = conversationFactory.startConversation(); + + // Assign the sender role to the sending test client. + Message assignSender = conversationFactory.getSession().createMessage(); + TestUtils.setPropertiesOnMessage(assignSender, testProperties); + assignSender.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE"); + assignSender.setStringProperty("ROLE", "SENDER"); + assignSender.setStringProperty("CLIENT_NAME", "Sustained_SENDER"); + + senderConversation.send(senderControlTopic, assignSender); + + // Wait for the sender to confirm its role. + senderConversation.receive(); + + // Assign the receivers roles. + for (TestClientDetails receiver : receivers) + { + assignReceiverRole(receiver, testProperties, true); + } + + // Start the test on the sender. + Message start = session.createMessage(); + start.setStringProperty("CONTROL_TYPE", "START"); + + senderConversation.send(senderControlTopic, start); + + // Wait for the test sender to return its report. + Message senderReport = senderConversation.receive(); + TestUtils.pause(500); + + // Ask the receivers for their reports. + Message statusRequest = session.createMessage(); + statusRequest.setStringProperty("CONTROL_TYPE", "STATUS_REQUEST"); + + // Gather the reports from all of the receiving clients. + + // Return all of the test reports, the senders report first. + // return new Message[] { senderReport }; + } + catch (JMSException e) + { + throw new RuntimeException("Unhandled JMSException."); + } + } + + /// + /// Assigns the receivers role to the specified test client that is to act as a receivers during the test. This method + /// does not always wait for the receiving clients to confirm their role assignments. This is because this method + /// may be called from an 'onMessage' method, when a client is joining the test at a later point in time, and it + /// is not possible to do a synchronous receive during an 'onMessage' method. There is a flag to indicate whether + /// or not to wait for role confirmations. + /// + /// The test client to assign the receivers role to. + /// The test parameters. + /// Indicates whether role confirmation should be waited for. + /// + /// Any JMSExceptions occurring during the conversation are allowed to fall through. + /// + /// @deprecated Scheduled for removal once existing tests converted over to use test circuits. + protected void assignReceiverRole(TestClientDetails receiver, Properties testProperties, bool confirm) + throws JMSException + { + log.info("assignReceiverRole(TestClientDetails receivers = " + receiver + ", Map testProperties = " + + testProperties + "): called"); + + ConversationFactory conversationFactory = getConversationFactory(); + + // Create a conversation with the receiving test client. + Session session = conversationFactory.getSession(); + Destination receiverControlTopic = session.createTopic(receiver.privateControlKey); + ConversationFactory.Conversation receiverConversation = conversationFactory.startConversation(); + + // Assign the receivers role to the receiving client. + Message assignReceiver = session.createMessage(); + TestUtils.setPropertiesOnMessage(assignReceiver, testProperties); + assignReceiver.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE"); + assignReceiver.setStringProperty("ROLE", "RECEIVER"); + assignReceiver.setStringProperty("CLIENT_NAME", receiver.clientName); + + receiverConversation.send(receiverControlTopic, assignReceiver); + + // Wait for the role confirmation to come back. + if (confirm) + { + receiverConversation.receive(); + } + } + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; + +using Apache.Qpid.Integration.Tests.framework.Assertion; +using Apache.Qpid.Integration.Tests.framework.Circuit; +using Apache.Qpid.Integration.Tests.framework.TestClientDetails; +using Apache.Qpid.Integration.Tests.framework.TestUtils; +using Apache.Qpid.Integration.Tests.framework.distributedcircuit.DistributedCircuitImpl; +using org.apache.qpid.util.ConversationFactory; + +using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; + +using javax.jms.Destination; +using javax.jms.JMSException; +using javax.jms.Message; +using javax.jms.Session; + +using System.Collections.Generic.LinkedList; +using System.Collections.Generic.IList; +using java.util.Properties; + +namespace Apache.Qpid.Integration.Tests.framework.sequencers +{ + /// + /// InteropCircuitFactory is a circuit factory that creates distributed test circuits. Given a set of participating + /// test client nodes, it assigns one node to the SENDER role and one the RECEIVER role. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Create distributed circuits from pairs of test nodes, for interop style testing. + ///
+ ///

+ /// + /// The partitioning of a set of nodes into sender and receiver roles is actually done by the interop test + /// decorator. See the todo comment in FanOutCircuitFactory about merging the factories with the decorators, or + /// more carefully dividing up responsibilities between them. + /// + /// The squenceTest code is deprecated, but currently still used by the interop tests. It will be removed once it + /// have been fully replaced by the default test procedure. + public class InteropCircuitFactory extends BaseCircuitFactory + { + /// Used for debugging. + private static ILog log = LogManager.GetLogger(typeof(InteropCircuitFactory)); + + /// + /// Creates a test circuit for the test, configered by the test parameters specified. + /// + /// The test parameters. + /// A test circuit. + public Circuit createCircuit(ParsedProperties testProperties) + { + log.debug("public Circuit createCircuit(ParsedProperties testProperties): called"); + + IList senders = new LinkedList(); + senders.add(getSender()); + IList receivers = getReceivers(); + ConversationFactory conversationFactory = getConversationFactory(); + + return DistributedCircuitImpl.createCircuit(testProperties, senders, receivers, conversationFactory); + } + + /// + /// Holds a test coordinating conversation with the test clients. This should consist of assigning the test roles, + /// begining the test, gathering the test reports from the participants, and checking for assertion failures against + /// the test reports. + /// + /// The test circuit. + /// The list of assertions to apply to the test circuit. + /// The test case definition. + public void sequenceTest(Circuit testCircuit, IList assertions, Properties testProperties) + { + log.debug("protected Message[] sequenceTest(Object... testProperties = " + testProperties + "): called"); + + TestClientDetails sender = getSender(); + IList receivers = getReceivers(); + ConversationFactory conversationFactory = getConversationFactory(); + + try + { + Session session = conversationFactory.getSession(); + Destination senderControlTopic = session.createTopic(sender.privateControlKey); + Destination receiverControlTopic = session.createTopic(receivers.get(0).privateControlKey); + + ConversationFactory.Conversation senderConversation = conversationFactory.startConversation(); + ConversationFactory.Conversation receiverConversation = conversationFactory.startConversation(); + + Message assignSender = conversationFactory.getSession().createMessage(); + TestUtils.setPropertiesOnMessage(assignSender, testProperties); + assignSender.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE"); + assignSender.setStringProperty("ROLE", "SENDER"); + + senderConversation.send(senderControlTopic, assignSender); + + // Assign the receivers role the receiving client. + Message assignReceiver = session.createMessage(); + TestUtils.setPropertiesOnMessage(assignReceiver, testProperties); + assignReceiver.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE"); + assignReceiver.setStringProperty("ROLE", "RECEIVER"); + + receiverConversation.send(receiverControlTopic, assignReceiver); + + // Wait for the senders and receivers to confirm their roles. + senderConversation.receive(); + receiverConversation.receive(); + + // Start the test. + Message start = session.createMessage(); + start.setStringProperty("CONTROL_TYPE", "START"); + + senderConversation.send(senderControlTopic, start); + + // Wait for the test sender to return its report. + Message senderReport = senderConversation.receive(); + TestUtils.pause(500); + + // Ask the receivers for its report. + Message statusRequest = session.createMessage(); + statusRequest.setStringProperty("CONTROL_TYPE", "STATUS_REQUEST"); + + receiverConversation.send(receiverControlTopic, statusRequest); + + // Wait for the receivers to send its report. + Message receiverReport = receiverConversation.receive(); + + // return new Message[] { senderReport, receiverReport }; + + // Apply assertions. + } + catch (JMSException e) + { + throw new RuntimeException("JMSException not handled."); + } + } + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace Apache.Qpid.Integration.Tests.framework +{ + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
+ ///
+ ///

+ public class TestCaseVector + { + /// The test case name. + private string testCase; + + /// The test cycle number within the test case. + private int testCycleNumber; + + public TestCaseVector(string testCase, int testCycleNumber) + { + this.testCase = testCase; + this.testCycleNumber = testCycleNumber; + } + + public string getTestCase() + { + return testCase; + } + + public int getTestCycleNumber() + { + return testCycleNumber; + } + + public bool equals(Object o) + { + if (this == o) + { + return true; + } + + if ((o == null) || (getClass() != o.getClass())) + { + return false; + } + + TestCaseVector that = (TestCaseVector) o; + + if (testCycleNumber != that.testCycleNumber) + { + return false; + } + + if ((testCase != null) ? (!testCase.equals(that.testCase)) : (that.testCase != null)) + { + return false; + } + + return true; + } + + public int hashCode() + { + int result; + result = ((testCase != null) ? testCase.hashCode() : 0); + result = (31 * result) + testCycleNumber; + + return result; + } + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace Apache.Qpid.Integration.Tests.framework +{ + /// + /// TestClientDetails is used to encapsulate information about an interop test client. It pairs together the unique + /// name of the client, and the route on which it listens to its control messages. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Record test clients control addresses together with their names. + ///
+ ///

+ public class TestClientDetails + { + /// The test clients name. + public string clientName; + + /// The routing key of the test clients control topic. + public string privateControlKey; + + /// + /// Two TestClientDetails are considered to be equal, iff they have the same client name. + /// + /// The object to compare to. + /// + /// If the object to compare to is a TestClientDetails equal to this one, false otherwise. + public bool equals(Object o) + { + if (this == o) + { + return true; + } + + if (!(o instanceof TestClientDetails)) + { + return false; + } + + final TestClientDetails testClientDetails = (TestClientDetails) o; + + return !((clientName != null) ? (!clientName.equals(testClientDetails.clientName)) + : (testClientDetails.clientName != null)); + } + + /// + /// Computes a hash code compatible with the equals method; based on the client name alone. + /// + /// A hash code for this. + public int hashCode() + { + return ((clientName != null) ? clientName.hashCode() : 0); + } + + /// + /// Outputs the client name and address details. Mostly used for debugging purposes. + /// + /// The client name and address. + public string ToString() + { + return "TestClientDetails: [ clientName = " + clientName + ", privateControlKey = " + privateControlKey + " ]"; + } + } +} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; + +using static Apache.Qpid.Integration.Tests.framework.MessagingTestConfigProperties.*; + +using uk.co.thebadgerset.junit.extensions.util.ParsedProperties; + +using javax.jms.*; +using javax.naming.Context; +using javax.naming.InitialContext; +using javax.naming.NamingException; + +using System.Collections.Generic.IDictionary; + +namespace Apache.Qpid.Integration.Tests.framework +{ + /// + /// TestUtils provides static helper methods that are usefull for writing tests against QPid. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Create connections from test properties. + ///
Create test messages. + ///
Inject a short pause in a test. + ///
Serialize properties into a message. + ///
+ ///

+ public class TestUtils + { + /// Used for debugging. + private static ILog log = LogManager.GetLogger(typeof(TestUtils)); + + /// Some dummy data to stuff all test messages with. + private static final byte[] MESSAGE_DATA_BYTES = + "Test Message -- Test Message -- Test Message -- Test Message -- Test Message -- Test Message -- Test Message -- " + .getBytes(); + + /// + /// Establishes a JMS connection using a set of properties and qpids built in JNDI implementation. This is a simple + /// convenience method for code that does not anticipate handling connection failures. All exceptions that indicate + /// that the connection has failed, are wrapped as rutime exceptions, presumably handled by a top level failure + /// handler. + /// + ///

This utility makes use of the following test parameters from to control + /// the connection creation: + /// + ///

+ ///
The username. + ///
The password. + ///
The virtual host name. + ///
The broker URL. + ///
The broker name in the initial context. + /// + /// Connection properties as defined in . + /// + /// A JMS conneciton. + public static Connection createConnection(ParsedProperties messagingProps) + { + log.debug("public static Connection createConnection(ParsedProperties messagingProps = " + messagingProps + + "): called"); + + try + { + // Extract the configured connection properties from the test configuration. + string conUsername = messagingProps.getProperty(USERNAME_PROPNAME); + string conPassword = messagingProps.getProperty(PASSWORD_PROPNAME); + string virtualHost = messagingProps.getProperty(VIRTUAL_HOST_PROPNAME); + string brokerUrl = messagingProps.getProperty(BROKER_PROPNAME); + + // Create the broker connection url. + string connectionstring = + "amqp://" + conUsername + ":" + conPassword + "@clientid/" + ((virtualHost != null) ? virtualHost : "") + + "?brokerlist='" + brokerUrl + "'"; + + // Create properties to create the initial context from, and inject the connection factory configuration + // for the defined connection name into it. + messagingProps.setProperty("connectionfactory." + CONNECTION_NAME, connectionString); + + Context ctx = new InitialContext(messagingProps); + + ConnectionFactory cf = (ConnectionFactory) ctx.lookup(CONNECTION_NAME); + + return cf.createConnection(); + } + catch (NamingException e) + { + throw new RuntimeException("Got JNDI NamingException whilst looking up the connection factory.", e); + } + catch (JMSException e) + { + throw new RuntimeException("Could not establish connection due to JMSException.", e); + } + } + + /// + /// Creates a test message of the specified size, on the given JMS session. + /// + /// The JMS session. + /// The size of the message in bytes. + /// + /// A bytes message, of the specified size, filled with dummy data. + /// + /// Any underlying JMSExceptions are allowed to fall through. + public static Message createTestMessageOfSize(Session session, int size) throws JMSException + { + BytesMessage message = session.createBytesMessage(); + + if (size > 0) + { + int div = MESSAGE_DATA_BYTES.length / size; + int mod = MESSAGE_DATA_BYTES.length % size; + + for (int i = 0; i < div; i++) + { + message.writeBytes(MESSAGE_DATA_BYTES); + } + + if (mod != 0) + { + message.writeBytes(MESSAGE_DATA_BYTES, 0, mod); + } + } + + return message; + } + + /// + /// Pauses for the specified length of time. In the event of failing to pause for at least that length of time + /// due to interuption of the thread, a RutimeException is raised to indicate the failure. The interupted status + /// of the thread is restores in that case. This method should only be used when it is expected that the pause + /// will be succesfull, for example in test code that relies on inejecting a pause. + /// + /// The minimum time to pause for in milliseconds. + public static void pause(long t) + { + try + { + Thread.sleep(t); + } + catch (InterruptedException e) + { + // Restore the interrupted status + Thread.currentThread().interrupt(); + + throw new RuntimeException("Failed to generate the requested pause length.", e); + } + } + + /// + /// Sets properties of different types on a JMS Message. + /// + /// The message to set properties on. + /// The property name/value pairs to set. + /// + /// All underlying JMSExceptions are allowed to fall through. + /// + /// Move this helper method somewhere else. For example, TestUtils. + public static void setPropertiesOnMessage(Message message, Map properties) throws JMSException + { + for (Map.Entry entry : properties.entrySet()) + { + string name = entry.getKey().ToString(); + Object value = entry.getValue(); + + message.setObjectProperty(name, value); + } + } + } +} \ No newline at end of file -- cgit v1.2.1