From abc5e0fb1ca84adf1b06c4cd4fb45e55faa55f76 Mon Sep 17 00:00:00 2001 From: Rupert Smith Date: Tue, 14 Aug 2007 11:06:23 +0000 Subject: Added UDP based clock syncher to test suite. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@565703 13f79535-47bb-0310-9956-ffa450edef68 --- java/perftests/pom.xml | 4 +- .../framework/clocksynch/ClockSynchronizer.java | 67 +++++ .../framework/clocksynch/UDPClockReference.java | 159 ++++++++++ .../framework/clocksynch/UDPClockSynchronizer.java | 324 +++++++++++++++++++++ .../distributedcircuit/DistributedCircuitImpl.java | 2 +- .../framework/distributedtesting/Coordinator.java | 1 - 6 files changed, 553 insertions(+), 4 deletions(-) create mode 100644 java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/ClockSynchronizer.java create mode 100644 java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/UDPClockReference.java create mode 100644 java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/UDPClockSynchronizer.java (limited to 'java') diff --git a/java/perftests/pom.xml b/java/perftests/pom.xml index 16e4731435..132eb83e8c 100644 --- a/java/perftests/pom.xml +++ b/java/perftests/pom.xml @@ -263,8 +263,8 @@ -n PQR-Qpid-03 -d24H -s[100000] -c[16] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false transacted=true commitBatchSize=100 batchSize=100000 messageSize=256 destinationCount=1 rate=0 maxPending=1000000 -n PQR-Qpid-04 -d24H -s[100000] -c[16] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false transacted=false commitBatchSize=100 batchSize=100000 messageSize=256 destinationCount=1 rate=0 maxPending=1000000 - -n PQC-Qpid-01 -d1M -s[100] -c[1,30],samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false transacted=true commitBatchSize=100 batchSize=1000 messageSize=256 destinationCount=1 rate=600 maxPending=1000000 - -n PQC-Qpid-02 -d1M -s[100] -c[1,30],samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false transacted=false commitBatchSize=100 batchSize=1000 messageSize=256 destinationCount=1 rate=100 maxPending=1000000 + -n PQC-Qpid-01 -d1M -s[1000] -c[1,30],samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false transacted=true commitBatchSize=100 batchSize=1000 messageSize=256 destinationCount=1 rate=600 maxPending=1000000 + -n PQC-Qpid-02 -d1M -s[1000] -c[1,30],samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false transacted=false commitBatchSize=100 batchSize=1000 messageSize=256 destinationCount=1 rate=100 maxPending=1000000 -n PQC-Qpid-03 -d10M -s[1000] -c[10] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false transacted=true commitBatchSize=100 batchSize=1000 messageSize=256 destinationCount=10 rate=0 maxPending=1000000 -n PQC-Qpid-04 -d10M -s[1000] -c[10] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false transacted=false commitBatchSize=100 batchSize=1000 messageSize=256 destinationCount=10 rate=0 maxPending=1000000 -n PQC-Qpid-05 -d10M -s[1000] -c[100] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false transacted=true commitBatchSize=100 batchSize=1000 messageSize=256 destinationCount=10 rate=0 maxPending=100000 diff --git a/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/ClockSynchronizer.java b/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/ClockSynchronizer.java new file mode 100644 index 0000000000..0b39a8e42a --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/ClockSynchronizer.java @@ -0,0 +1,67 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.framework.clocksynch; + +/** + * ClockSynchronizer provides an interface through which two nodes may synchronize their clocks. It is expected that one + * node will act as the reference clock, to which no delta need be applied, and the other node will act as the slave, + * and which must apply a delta to its local clock to get a clock synchronized with the reference. + * + *

The slave side will initiate the computation of a clock delta by calling the {@link #synch} method. This method + * will not return until the delta has been computed, at which point there is a method to return its value, as well as + * an estimate of the likely error (usually one standard deviation), in the synchronization. For convenience there is a + * {@link #nanoTime} method to return the value of System.nanoTime() with the delta added in. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Trigger a clock synchronziation. + *
Compute a clock delta to apply to the local clock. + *
Estimate the error in the synchronzation. + *
+ */ +public interface ClockSynchronizer +{ + /** + * The slave side should call this to copute a clock delta with the reference. + */ + public void synch(); + + /** + * Gets the clock delta in nano seconds. + * + * @return The clock delta in nano seconds. + */ + public long getDelta(); + + /** + * Gets an estimate of the clock error in nan seconds. + * + * @return An estimate of the clock error in nan seconds. + */ + public long getEpsilon(); + + /** + * Gets the local clock time with any computed delta added in. + * + * @return The local clock time with any computed delta added in. + */ + public long nanoTime(); +} diff --git a/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/UDPClockReference.java b/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/UDPClockReference.java new file mode 100644 index 0000000000..556dec8f25 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/UDPClockReference.java @@ -0,0 +1,159 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.framework.clocksynch; + +import java.io.IOException; +import java.net.*; +import java.nio.ByteBuffer; + +/** + * UDPClockReference supplies a refernce clock signal (generated from System.nanoTime()). + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Supply a reference clock signal. + *
+ * + * @todo Port hard coded. Make configurable. + * + * @todo Errors rethrown as runtimes, or silently terminate the service. Could add better error handling if needed. + */ +public class UDPClockReference implements Runnable +{ + /** Defines the timeout to use when polling the socket for time requests. */ + private static final int TIMEOUT = 200; + + /** Defines the port to run the clock reference on. */ + public static final int REFERENCE_PORT = 4445; + + /** Holds the socket to receive clock reference requests on. */ + protected DatagramSocket socket = null; + + /** Flag used to indicate that the time server should keep running. Set to false to terminate. */ + protected boolean publish = true; + + /** + * Creates a clock reference service on the standard port. + */ + public UDPClockReference() + { + try + { + socket = new DatagramSocket(REFERENCE_PORT); + socket.setSoTimeout(TIMEOUT); + } + catch (SocketException e) + { + throw new RuntimeException(e); + } + } + + /** + * Implements the run loop for this reference time server. This waits for incoming time requests, and replies to + * any, with a message with the local time stamp in it. Periodically (controlled by {@link #TIMEOUT}), the run + * loop will check if the {@link #publish} flag has been cleared, and terminate the reference time service if so. + */ + public void run() + { + byte[] buf = new byte[256]; + ByteBuffer bbuf = ByteBuffer.wrap(buf); + + while (publish) + { + try + { + // Wait for a reference time request. + DatagramPacket packet = new DatagramPacket(buf, buf.length); + boolean timedOut = false; + + try + { + socket.receive(packet); + } + catch (SocketTimeoutException e) + { + timedOut = true; + } + + if (!timedOut) + { + // Work out from the received packet, where to reply to. + InetAddress address = packet.getAddress(); + int port = packet.getPort(); + + // Respond to the time request by sending back the local clock as the reference time. + bbuf.putLong(System.nanoTime()); + bbuf.flip(); + packet = new DatagramPacket(bbuf.array(), bbuf.capacity(), address, port); + + socket.send(packet); + } + } + catch (IOException e) + { + publish = false; + } + } + + socket.close(); + } + + /** + * Supplies a shutdown hook. + * + * @return The shut down hook. + */ + public Thread getShutdownHook() + { + return new Thread(new Runnable() + { + public void run() + { + publish = false; + } + }); + } + + /** + * For testing purposes. Runs a reference clock on the default port. + * + * @param args None. + */ + public static void main(String[] args) + { + try + { + // Create the clock reference service. + UDPClockReference clock = new UDPClockReference(); + + // Set up a shutdown hook for it. + Runtime.getRuntime().addShutdownHook(clock.getShutdownHook()); + + // Start the service. + clock.run(); + } + catch (Exception e) + { + e.printStackTrace(); + System.exit(1); + } + } +} diff --git a/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/UDPClockSynchronizer.java b/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/UDPClockSynchronizer.java new file mode 100644 index 0000000000..1bcfe3ccc6 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/UDPClockSynchronizer.java @@ -0,0 +1,324 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.framework.clocksynch; + +import uk.co.thebadgerset.junit.extensions.util.CommandLineParser; +import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; + +import java.io.IOException; +import java.net.*; +import java.nio.ByteBuffer; + +/** + * UDPClockSynchronizer is a {@link ClockSynchronizer} that sends pings as UDP datagrams, and uses the following simple + * algorithm to perform clock synchronization: + * + *

    + *
  1. Slave initiates synchronization with a Reference clock.
  2. + *
  3. Slave stamps current local time on a "time request" message and sends to the Reference.
  4. + *
  5. Upon receipt by Reference, Reference stamps Reference-time and returns.
  6. + *
  7. Upon receipt by Slave, Slave subtracts current time from sent time and divides by two to compute latency. It + * subtracts current time from Reference time to determine Slave-Reference time delta and adds in the + * half-latency to get the correct clock delta.
  8. + *
  9. The first result is immediately used to update the clock since it will get the local clock into at least + * the right ballpark.
  10. + *
  11. The Slave repeats steps 2 through 4, 15 more times.
  12. + *
  13. The results of the packet receipts are accumulated and sorted in lowest-latency to highest-latency order. The + * median latency is determined by picking the mid-point sample from this ordered list.
  14. + *
  15. All samples above approximately 1 standard-deviation from the median are discarded and the remaining samples + * are averaged using an arithmetic mean.
  16. + *
+ * + *

The use of UDP datagrams, instead of TCP based communication eliminates the hidden delays that TCP can introduce, + * as it can transparently re-order or re-send packets, or introduce delays as packets are naggled. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Trigger a clock synchronziation. + *
Compute a clock delta to apply to the local clock. + *
Estimate the error in the synchronzation. + *
+ */ +public class UDPClockSynchronizer implements ClockSynchronizer +{ + /** The clock delta. */ + private long delta = 0L; + + /** Holds an estimate of the clock error relative to the reference clock. */ + private long epsilon = 0L; + + /** Holds the address of the reference clock. */ + private InetAddress referenceAddress; + + /** Holds the socket to communicate with the reference service over. */ + private DatagramSocket socket; + + /** + * Creates a clock synchronizer against the specified address for the reference. + * + * @param address The address of the reference service. + */ + public UDPClockSynchronizer(String address) + { + try + { + referenceAddress = InetAddress.getByName(address); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + } + + /** + * The slave side should call this to compute a clock delta with the reference. + */ + public void synch() + { + try + { + socket = new DatagramSocket(); + + // Synchronize on a single ping, to get the clock into the right ball-park. + synch(1); + + // Synchronize on 15 pings for greater accuracy. + synch(15); + + socket.close(); + } + catch (SocketException e) + { + throw new RuntimeException(e); + } + } + + /** + * Updates the synchronization delta by performing the specified number of reference clock requests. + * + * @param n The number of reference clock request cycles to perform. + */ + protected void synch(int n) + { + // Create an array of deltas by performing n reference pings. + long[] delta = new long[n]; + + for (int i = 0; i < n; i++) + { + delta[i] = ping(); + } + + // Reject any deltas that are larger than 1 s.d. above the median. + long median = median(delta); + long sd = standardDeviation(delta); + + long[] tempDeltas = new long[n]; + int count = 0; + + for (int i = 0; i < n; i++) + { + if (delta[i] <= (median + sd)) + { + tempDeltas[count] = delta[i]; + count++; + } + } + + System.arraycopy(tempDeltas, 0, delta, 0, count); + + // Estimate the delta as the mean of the remaining deltas. + this.delta += mean(delta); + + // Estimate the error as the standard deviation of the remaining deltas. + this.epsilon = standardDeviation(delta); + + } + + /** + * Performs a single reference clock request cycle and returns the estimated delta relative to the local clock. + * This is computed as the half-latency of the requst cycle, plus the reference clock, minus the local clock. + * + * @return The estimated clock delta. + */ + protected long ping() + { + try + { + byte[] buf = new byte[256]; + DatagramPacket packet = new DatagramPacket(buf, buf.length, referenceAddress, UDPClockReference.REFERENCE_PORT); + + // Start timing the request latency. + long start = nanoTime(); + + // Get the reference time. + socket.send(packet); + packet = new DatagramPacket(buf, buf.length); + socket.receive(packet); + + ByteBuffer bbuf = ByteBuffer.wrap(packet.getData()); + long refTime = bbuf.getLong(); + + // Stop timing the request latency. + long localTime = nanoTime(); + long end = localTime - start; + + // Estimate delta as (ref clock + half-latency) - local clock. + return ((end - start) / 2) + refTime - localTime; + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + /** + * Gets the clock delta in nano seconds. + * + * @return The clock delta in nano seconds. + */ + public long getDelta() + { + return delta; + } + + /** + * Gets an estimate of the clock error in nan seconds. + * + * @return An estimate of the clock error in nan seconds. + */ + public long getEpsilon() + { + return epsilon; + } + + /** + * Gets the local clock time with any computed delta added in. + * + * @return The local clock time with any computed delta added in. + */ + public long nanoTime() + { + return System.nanoTime() + delta; + } + + /** + * Computes the median of a series of values. + * + * @param values The values. + * + * @return The median. + */ + public static long median(long[] values) + { + // Check if the median is computed from a pair of middle value. + if ((values.length % 2) == 0) + { + int middle = values.length / 2; + + return (values[middle] + values[middle - 1]) / 2; + } + // The median is computed from a single middle value. + else + { + return values[values.length / 2]; + } + } + + /** + * Computes the mean of a series of values. + * + * @param values The values. + * + * @return The mean. + */ + public static long mean(long[] values) + { + long total = 0L; + + for (long value : values) + { + total += value; + } + + return total / values.length; + } + + /** + * Computes the variance of series of values. + * + * @param values The values. + * + * @return The variance of the values. + */ + public static long variance(long[] values) + { + long mean = mean(values); + + long totalVariance = 0; + + for (long value : values) + { + totalVariance += (value - mean) ^ 2; + } + + return totalVariance / values.length; + } + + /** + * Computes the standard deviation of a series of values. + * + * @param values The values. + * + * @return The standard deviation. + */ + public static long standardDeviation(long[] values) + { + return Double.valueOf(Math.sqrt(variance(values))).longValue(); + } + + /** + * For testing purposes. Supply address of reference clock as arg 1. + * + * @param args Address of reference clock as arg 1. + */ + public static void main(String[] args) + { + ParsedProperties options = + new ParsedProperties(CommandLineParser.processCommandLine(args, + new CommandLineParser( + new String[][] + { + { "1", "Address of clock reference service.", "address", "true" } + }), System.getProperties())); + + String address = options.getProperty("1"); + + // Create a clock synchronizer. + UDPClockSynchronizer clockSyncher = new UDPClockSynchronizer(address); + + // Perform a clock clockSyncher. + clockSyncher.synch(); + + // Print out the clock delta and estimate of the error. + System.out.println("Delta = " + clockSyncher.getDelta()); + System.out.println("Epsilon = " + clockSyncher.getEpsilon()); + } +} diff --git a/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedCircuitImpl.java b/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedCircuitImpl.java index 921d16c998..3e359d6d2a 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedCircuitImpl.java +++ b/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedCircuitImpl.java @@ -296,7 +296,7 @@ public class DistributedCircuitImpl implements Circuit, TimingControllerAware * Checks the test circuit. The effect of this is to gather the circuits state, for both ends of the circuit, * into a report, against which assertions may be checked. * - * @todo Replace the asynch receiver report thread with a choice of direct os asynch executor, so that asynch + * @todo Replace the asynch receiver report thread with a choice of direct or asynch executor, so that asynch * or synch logging of test timings is optional. Also need to provide an onMessage method that is capable * of receiving timing reports that receivers will generate during an ongoing test, on the test sample * size boundaries. The message timing logging code should be factored out as a common method that can diff --git a/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/Coordinator.java b/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/Coordinator.java index c4c9622091..da63ce655e 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/Coordinator.java +++ b/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/Coordinator.java @@ -35,7 +35,6 @@ import org.apache.qpid.test.framework.listeners.XMLTestListener; import org.apache.qpid.util.ConversationFactory; import org.apache.qpid.util.PrettyPrintingUtils; -import uk.co.thebadgerset.junit.extensions.AsymptoticTestDecorator; import uk.co.thebadgerset.junit.extensions.TKTestResult; import uk.co.thebadgerset.junit.extensions.TKTestRunner; import uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator; -- cgit v1.2.1