diff options
Diffstat (limited to 'java')
6 files changed, 131 insertions, 45 deletions
diff --git a/java/junit-toolkit/src/main/org/apache/qpid/junit/extensions/ScaledTestDecorator.java b/java/junit-toolkit/src/main/org/apache/qpid/junit/extensions/ScaledTestDecorator.java index e0af22cfb7..93e2a3c855 100644 --- a/java/junit-toolkit/src/main/org/apache/qpid/junit/extensions/ScaledTestDecorator.java +++ b/java/junit-toolkit/src/main/org/apache/qpid/junit/extensions/ScaledTestDecorator.java @@ -323,6 +323,22 @@ public class ScaledTestDecorator extends WrappedSuiteTestDecorator implements Sh // Wait until all test threads have completed their setups. barrier.await(); + + // Call setup on all underlying tests in the suite that are thread aware. + for (Test childTest : test.getAllUnderlyingTests()) + { + // Check that the test is concurrency aware, so provides a setup method to call. + if (childTest instanceof TestThreadAware) + { + // Call the tests post thread setup. + TestThreadAware setupTest = (TestThreadAware) childTest; + setupTest.postThreadSetUp(); + } + } + + // Wait until all test threads have completed their prefill. + barrier.await(); + // Start timing the test batch, only after thread setups have completed. if (testResult instanceof TKTestResult) { diff --git a/java/junit-toolkit/src/main/org/apache/qpid/junit/extensions/TestThreadAware.java b/java/junit-toolkit/src/main/org/apache/qpid/junit/extensions/TestThreadAware.java index d7de2822a2..94dcf5499b 100644 --- a/java/junit-toolkit/src/main/org/apache/qpid/junit/extensions/TestThreadAware.java +++ b/java/junit-toolkit/src/main/org/apache/qpid/junit/extensions/TestThreadAware.java @@ -43,6 +43,11 @@ public interface TestThreadAware public void threadSetUp(); /** + * Called after all threads have completed their setup. + */ + public void postThreadSetUp(); + + /** * Called when a test thread is destroyed. */ public void threadTearDown(); diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java index 9ed9fea299..dc78276edd 100644 --- a/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java +++ b/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java @@ -154,15 +154,25 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA // Initialize the count and timing controller for the new correlation id. + // This perCorrelationId is only used for controlling the test. + // The PingClient itself uses its own perCorrelationId see in PingPongProducer PerCorrelationId perCorrelationId = new PerCorrelationId(); TimingController tc = getTimingController().getControllerForCurrentThread(); perCorrelationId._tc = tc; perCorrelationId._expectedCount = pingClient.getExpectedNumPings(numPings + preFill); perCorrelationIds.put(perThreadSetup._correlationId, perCorrelationId); - // Start the client that will have been paused due to preFill requirement. - // or if we have not yet started client because messages are sitting on broker. - if (preFill > 0 || testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME)) + // Must be called before pingAndWaitForReply to setup the CorrelationID. + // This is required because pingClient.start() will start all client threads + // This means that the CorrelationID must be registered before hand. + pingClient.setupCorrelationID(perThreadSetup._correlationId, perCorrelationId._expectedCount); + + // Start the client connection if: + // 1) we are not in a SEND_ONLY test. + // 2) if we have not yet started client because messages are sitting on broker. + // This is either due to a preFill or a consume only test. + if (!testParameters.getPropertyAsBoolean(PingPongProducer.SEND_ONLY_PROPNAME) && + (preFill > 0 || testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME))) { pingClient.start(); } @@ -285,7 +295,7 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA { // Record the total latency for the batch. // if batchSize=1 then this will just be the message latency - tc.completeTest(true, receivedInBatch, null, _batchLatency); + tc.completeTest(true, receivedInBatch, null, _batchSize == 1 ? latency : _batchLatency); // Reset latency _batchLatency = 0; } diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java index 4c5df0a471..cf16abc596 100644 --- a/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java +++ b/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java @@ -122,9 +122,7 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware } } - /** - * Performs test fixture creation on a per thread basis. This will only be called once for each test thread. - */ + /** Performs test fixture creation on a per thread basis. This will only be called once for each test thread. */ public void threadSetUp() { _logger.debug("public void threadSetUp(): called"); @@ -142,9 +140,28 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware perThreadSetup._pingClient.establishConnection(true, true); } - // Prefill the broker unless we are in consume only mode. - int preFill = testParameters.getPropertyAsInteger(PingPongProducer.PREFILL_PROPNAME); - if (!testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME) && preFill > 0) + // Attach the per-thread set to the thread. + threadSetup.set(perThreadSetup); + } + catch (Exception e) + { + _logger.warn("There was an exception during per thread setup.", e); + } + } + + /** + * Called after all threads have completed their setup. + */ + public void postThreadSetUp() + { + _logger.debug("public void postThreadSetUp(): called"); + + PerThreadSetup perThreadSetup = threadSetup.get(); + // Prefill the broker unless we are in consume only mode. + int preFill = testParameters.getPropertyAsInteger(PingPongProducer.PREFILL_PROPNAME); + if (!testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME) && preFill > 0) + { + try { // Manually set the correlation ID to 1. This is not ideal but it is the // value that the main test loop will use. @@ -156,8 +173,12 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware long delayBeforeConsume = testParameters.getPropertyAsLong(PingPongProducer.DELAY_BEFORE_CONSUME_PROPNAME); - // Only delay if we have consumers and a delayBeforeConsume - if ((testParameters.getPropertyAsInteger(PingPongProducer.NUM_CONSUMERS_PROPNAME) > 0) + // Only delay if we are + // not doing send only + // and we have consumers + // and a delayBeforeConsume + if (!(testParameters.getPropertyAsBoolean(PingPongProducer.SEND_ONLY_PROPNAME)) + && (testParameters.getPropertyAsInteger(PingPongProducer.NUM_CONSUMERS_PROPNAME) > 0) && delayBeforeConsume > 0) { @@ -170,11 +191,11 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware long minutes = delayBeforeConsume / 60000; long seconds = (delayBeforeConsume - (minutes * 60000)) / 1000; long ms = delayBeforeConsume - (minutes * 60000) - (seconds * 1000); - _logger.info("Delaying for " + minutes + "m " + seconds + "s " + ms + "ms before starting test."); + _logger.info("Delaying for " + minutes + "m " + seconds + "s " + ms + "ms before starting test."); } else { - _logger.info("Delaying for " + delayBeforeConsume + "ms before starting test."); + _logger.info("Delaying for " + delayBeforeConsume + "ms before starting test."); } } @@ -190,22 +211,30 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware // only when the test method is executed will the correlationID map be set up and ready to consume // the messages we have sent here. } - else //Only start the consumer if we are not preFilling. + catch (Exception e) { - // Only start the consumer if we don't have messages waiting to be received. - // we need to set up the correlationID mapping first - if (!testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME)) + _logger.warn("There was an exception during per thread setup.", e); + } + } + else //Only start the consumer if we are not preFilling. + { + // Start the consumers, unless we have data on the broker + // already this is signified by being in consume_only, we will + // start the clients after setting up the correlation IDs. + // We should also not start the clients if we are in Send only + if (!testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME) && + !(testParameters.getPropertyAsBoolean(PingPongProducer.SEND_ONLY_PROPNAME))) + { + // Start the client connection + try { - // Start the client connection perThreadSetup._pingClient.start(); } + catch (JMSException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } } - // Attach the per-thread set to the thread. - threadSetup.set(perThreadSetup); - } - catch (Exception e) - { - _logger.warn("There was an exception during per thread setup.", e); } } diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index e3769e415e..0bf952b7e1 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -342,6 +342,11 @@ public class PingPongProducer implements Runnable, ExceptionListener /** Defines the default value of the consumeOnly flag to use when publishing messages is not desired. */ public static final boolean CONSUME_ONLY_DEFAULT = false; + /** Holds the name of the property to get when no messasges should be sent. */ + public static final String SEND_ONLY_PROPNAME = "sendOnly"; + + /** Defines the default value of the consumeOnly flag to use when publishing messages is not desired. */ + public static final boolean SEND_ONLY_DEFAULT = false; /** Holds the default configuration properties. */ public static ParsedProperties defaults = new ParsedProperties(); @@ -381,7 +386,8 @@ public class PingPongProducer implements Runnable, ExceptionListener defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT); defaults.setPropertyIfNull(PREFILL_PROPNAME, PREFILL_DEFAULT); defaults.setPropertyIfNull(DELAY_BEFORE_CONSUME_PROPNAME, DELAY_BEFORE_CONSUME); - defaults.setPropertyIfNull(CONSUME_ONLY_PROPNAME, CONSUME_ONLY_DEFAULT); + defaults.setPropertyIfNull(CONSUME_ONLY_PROPNAME, CONSUME_ONLY_DEFAULT); + defaults.setPropertyIfNull(SEND_ONLY_PROPNAME, SEND_ONLY_DEFAULT); } /** Allows setting of client ID on the connection, rather than through the connection URL. */ @@ -490,10 +496,15 @@ public class PingPongProducer implements Runnable, ExceptionListener /** * Holds a boolean value of wither this test should just consume, i.e. skips * sending messages, but still expects to receive the specified number. - * Use in conjuction with numConsumers=0 to fill the broker. */ private boolean _consumeOnly; + /** + * Holds a boolean value of wither this test should just send, i.e. skips + * consuming messages, but still creates clients just doesn't start them. + */ + private boolean _sendOnly; + /** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */ private static AtomicLong _correlationIdGenerator = new AtomicLong(0L); @@ -631,6 +642,7 @@ public class PingPongProducer implements Runnable, ExceptionListener _preFill = properties.getPropertyAsInteger(PREFILL_PROPNAME); _delayBeforeConsume = properties.getPropertyAsLong(DELAY_BEFORE_CONSUME_PROPNAME); _consumeOnly = properties.getPropertyAsBoolean(CONSUME_ONLY_PROPNAME); + _sendOnly = properties.getPropertyAsBoolean(SEND_ONLY_PROPNAME); // Check that one or more destinations were specified. if (_noOfDestinations < 1) @@ -1072,8 +1084,8 @@ public class PingPongProducer implements Runnable, ExceptionListener } else { - log.warn("Got unexpected message with correlationId: " + correlationID); - log.warn("Map contains:" + perCorrelationIds.entrySet()); + log.warn(consumerNo + " Got unexpected message with correlationId: " + correlationID); + log.warn(consumerNo + " Map contains:" + perCorrelationIds.entrySet()); } } else @@ -1092,6 +1104,21 @@ public class PingPongProducer implements Runnable, ExceptionListener } } + public void setupCorrelationID(String correlationId, int expectedCount) + { + PerCorrelationId perCorrelationId = new PerCorrelationId(); + + // Create a count down latch to count the number of replies with. This is created before the messages are + // sent so that the replies cannot be received before the count down is created. + // One is added to this, so that the last reply becomes a special case. The special case is that the + // chained message listener must be called before this sender can be unblocked, but that decrementing the + // countdown needs to be done before the chained listener can be called. + perCorrelationId.trafficLight = new CountDownLatch(expectedCount + 1); + + perCorrelationIds.put(correlationId, perCorrelationId); + } + + /** * Sends the specified number of ping message and then waits for all correlating replies. If the wait times out * before a reply arrives, then a null reply is returned from this method. This method allows the caller to specify @@ -1122,33 +1149,24 @@ public class PingPongProducer implements Runnable, ExceptionListener public int pingAndWaitForReply(Message message, int numPings, int preFill, long timeout, String messageCorrelationId) throws JMSException, InterruptedException { - - // If we are runnning a consumeOnly test then don't send any messages - - /*log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = " + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/ + int totalPingsRequested = numPings + preFill; + // Generate a unique correlation id to put on the messages before sending them, if one was not specified. if (messageCorrelationId == null) { messageCorrelationId = Long.toString(_correlationIdGenerator.incrementAndGet()); + + setupCorrelationID(messageCorrelationId, getExpectedNumPings(totalPingsRequested)); } try { // NDC.push("prod"); - // Create a count down latch to count the number of replies with. This is created before the messages are - // sent so that the replies cannot be received before the count down is created. - // One is added to this, so that the last reply becomes a special case. The special case is that the - // chained message listener must be called before this sender can be unblocked, but that decrementing the - // countdown needs to be done before the chained listener can be called. - PerCorrelationId perCorrelationId = new PerCorrelationId(); - - int totalPingsRequested = numPings + preFill; - perCorrelationId.trafficLight = new CountDownLatch(getExpectedNumPings(totalPingsRequested) + 1); - perCorrelationIds.put(messageCorrelationId, perCorrelationId); + PerCorrelationId perCorrelationId = perCorrelationIds.get(messageCorrelationId); // Set up the current time as the start time for pinging on the correlation id. This is used to determine // timeouts. @@ -1167,9 +1185,9 @@ public class PingPongProducer implements Runnable, ExceptionListener // // Return the number of requested messages, this will let the test // report a pass. - if (_noOfConsumers == 0) + if (_noOfConsumers == 0 || _sendOnly) { - return totalPingsRequested; + return getExpectedNumPings(totalPingsRequested); } do diff --git a/java/perftests/src/main/java/org/apache/qpid/test/testcases/MessageThroughputPerf.java b/java/perftests/src/main/java/org/apache/qpid/test/testcases/MessageThroughputPerf.java index 9397db82c9..dfb82b9b2d 100644 --- a/java/perftests/src/main/java/org/apache/qpid/test/testcases/MessageThroughputPerf.java +++ b/java/perftests/src/main/java/org/apache/qpid/test/testcases/MessageThroughputPerf.java @@ -165,6 +165,14 @@ public class MessageThroughputPerf extends FrameworkBaseCase implements TimingCo } /** + * Called after all threads have completed their setup. + */ + public void postThreadSetUp() + { + //Nothing to do here, potentially implement preFill as per PingTestPerf. + } + + /** * Called when a test thread is destroyed. */ public void threadTearDown() |
