summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/junit-toolkit/src/main/org/apache/qpid/junit/extensions/ScaledTestDecorator.java16
-rw-r--r--java/junit-toolkit/src/main/org/apache/qpid/junit/extensions/TestThreadAware.java5
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java18
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java71
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java58
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/test/testcases/MessageThroughputPerf.java8
6 files changed, 131 insertions, 45 deletions
diff --git a/java/junit-toolkit/src/main/org/apache/qpid/junit/extensions/ScaledTestDecorator.java b/java/junit-toolkit/src/main/org/apache/qpid/junit/extensions/ScaledTestDecorator.java
index e0af22cfb7..93e2a3c855 100644
--- a/java/junit-toolkit/src/main/org/apache/qpid/junit/extensions/ScaledTestDecorator.java
+++ b/java/junit-toolkit/src/main/org/apache/qpid/junit/extensions/ScaledTestDecorator.java
@@ -323,6 +323,22 @@ public class ScaledTestDecorator extends WrappedSuiteTestDecorator implements Sh
// Wait until all test threads have completed their setups.
barrier.await();
+
+ // Call setup on all underlying tests in the suite that are thread aware.
+ for (Test childTest : test.getAllUnderlyingTests())
+ {
+ // Check that the test is concurrency aware, so provides a setup method to call.
+ if (childTest instanceof TestThreadAware)
+ {
+ // Call the tests post thread setup.
+ TestThreadAware setupTest = (TestThreadAware) childTest;
+ setupTest.postThreadSetUp();
+ }
+ }
+
+ // Wait until all test threads have completed their prefill.
+ barrier.await();
+
// Start timing the test batch, only after thread setups have completed.
if (testResult instanceof TKTestResult)
{
diff --git a/java/junit-toolkit/src/main/org/apache/qpid/junit/extensions/TestThreadAware.java b/java/junit-toolkit/src/main/org/apache/qpid/junit/extensions/TestThreadAware.java
index d7de2822a2..94dcf5499b 100644
--- a/java/junit-toolkit/src/main/org/apache/qpid/junit/extensions/TestThreadAware.java
+++ b/java/junit-toolkit/src/main/org/apache/qpid/junit/extensions/TestThreadAware.java
@@ -43,6 +43,11 @@ public interface TestThreadAware
public void threadSetUp();
/**
+ * Called after all threads have completed their setup.
+ */
+ public void postThreadSetUp();
+
+ /**
* Called when a test thread is destroyed.
*/
public void threadTearDown();
diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
index 9ed9fea299..dc78276edd 100644
--- a/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
+++ b/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
@@ -154,15 +154,25 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA
// Initialize the count and timing controller for the new correlation id.
+ // This perCorrelationId is only used for controlling the test.
+ // The PingClient itself uses its own perCorrelationId see in PingPongProducer
PerCorrelationId perCorrelationId = new PerCorrelationId();
TimingController tc = getTimingController().getControllerForCurrentThread();
perCorrelationId._tc = tc;
perCorrelationId._expectedCount = pingClient.getExpectedNumPings(numPings + preFill);
perCorrelationIds.put(perThreadSetup._correlationId, perCorrelationId);
- // Start the client that will have been paused due to preFill requirement.
- // or if we have not yet started client because messages are sitting on broker.
- if (preFill > 0 || testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME))
+ // Must be called before pingAndWaitForReply to setup the CorrelationID.
+ // This is required because pingClient.start() will start all client threads
+ // This means that the CorrelationID must be registered before hand.
+ pingClient.setupCorrelationID(perThreadSetup._correlationId, perCorrelationId._expectedCount);
+
+ // Start the client connection if:
+ // 1) we are not in a SEND_ONLY test.
+ // 2) if we have not yet started client because messages are sitting on broker.
+ // This is either due to a preFill or a consume only test.
+ if (!testParameters.getPropertyAsBoolean(PingPongProducer.SEND_ONLY_PROPNAME) &&
+ (preFill > 0 || testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME)))
{
pingClient.start();
}
@@ -285,7 +295,7 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA
{
// Record the total latency for the batch.
// if batchSize=1 then this will just be the message latency
- tc.completeTest(true, receivedInBatch, null, _batchLatency);
+ tc.completeTest(true, receivedInBatch, null, _batchSize == 1 ? latency : _batchLatency);
// Reset latency
_batchLatency = 0;
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java
index 4c5df0a471..cf16abc596 100644
--- a/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java
+++ b/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java
@@ -122,9 +122,7 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware
}
}
- /**
- * Performs test fixture creation on a per thread basis. This will only be called once for each test thread.
- */
+ /** Performs test fixture creation on a per thread basis. This will only be called once for each test thread. */
public void threadSetUp()
{
_logger.debug("public void threadSetUp(): called");
@@ -142,9 +140,28 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware
perThreadSetup._pingClient.establishConnection(true, true);
}
- // Prefill the broker unless we are in consume only mode.
- int preFill = testParameters.getPropertyAsInteger(PingPongProducer.PREFILL_PROPNAME);
- if (!testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME) && preFill > 0)
+ // Attach the per-thread set to the thread.
+ threadSetup.set(perThreadSetup);
+ }
+ catch (Exception e)
+ {
+ _logger.warn("There was an exception during per thread setup.", e);
+ }
+ }
+
+ /**
+ * Called after all threads have completed their setup.
+ */
+ public void postThreadSetUp()
+ {
+ _logger.debug("public void postThreadSetUp(): called");
+
+ PerThreadSetup perThreadSetup = threadSetup.get();
+ // Prefill the broker unless we are in consume only mode.
+ int preFill = testParameters.getPropertyAsInteger(PingPongProducer.PREFILL_PROPNAME);
+ if (!testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME) && preFill > 0)
+ {
+ try
{
// Manually set the correlation ID to 1. This is not ideal but it is the
// value that the main test loop will use.
@@ -156,8 +173,12 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware
long delayBeforeConsume = testParameters.getPropertyAsLong(PingPongProducer.DELAY_BEFORE_CONSUME_PROPNAME);
- // Only delay if we have consumers and a delayBeforeConsume
- if ((testParameters.getPropertyAsInteger(PingPongProducer.NUM_CONSUMERS_PROPNAME) > 0)
+ // Only delay if we are
+ // not doing send only
+ // and we have consumers
+ // and a delayBeforeConsume
+ if (!(testParameters.getPropertyAsBoolean(PingPongProducer.SEND_ONLY_PROPNAME))
+ && (testParameters.getPropertyAsInteger(PingPongProducer.NUM_CONSUMERS_PROPNAME) > 0)
&& delayBeforeConsume > 0)
{
@@ -170,11 +191,11 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware
long minutes = delayBeforeConsume / 60000;
long seconds = (delayBeforeConsume - (minutes * 60000)) / 1000;
long ms = delayBeforeConsume - (minutes * 60000) - (seconds * 1000);
- _logger.info("Delaying for " + minutes + "m " + seconds + "s " + ms + "ms before starting test.");
+ _logger.info("Delaying for " + minutes + "m " + seconds + "s " + ms + "ms before starting test.");
}
else
{
- _logger.info("Delaying for " + delayBeforeConsume + "ms before starting test.");
+ _logger.info("Delaying for " + delayBeforeConsume + "ms before starting test.");
}
}
@@ -190,22 +211,30 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware
// only when the test method is executed will the correlationID map be set up and ready to consume
// the messages we have sent here.
}
- else //Only start the consumer if we are not preFilling.
+ catch (Exception e)
{
- // Only start the consumer if we don't have messages waiting to be received.
- // we need to set up the correlationID mapping first
- if (!testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME))
+ _logger.warn("There was an exception during per thread setup.", e);
+ }
+ }
+ else //Only start the consumer if we are not preFilling.
+ {
+ // Start the consumers, unless we have data on the broker
+ // already this is signified by being in consume_only, we will
+ // start the clients after setting up the correlation IDs.
+ // We should also not start the clients if we are in Send only
+ if (!testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME) &&
+ !(testParameters.getPropertyAsBoolean(PingPongProducer.SEND_ONLY_PROPNAME)))
+ {
+ // Start the client connection
+ try
{
- // Start the client connection
perThreadSetup._pingClient.start();
}
+ catch (JMSException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
}
- // Attach the per-thread set to the thread.
- threadSetup.set(perThreadSetup);
- }
- catch (Exception e)
- {
- _logger.warn("There was an exception during per thread setup.", e);
}
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
index e3769e415e..0bf952b7e1 100644
--- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
+++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
@@ -342,6 +342,11 @@ public class PingPongProducer implements Runnable, ExceptionListener
/** Defines the default value of the consumeOnly flag to use when publishing messages is not desired. */
public static final boolean CONSUME_ONLY_DEFAULT = false;
+ /** Holds the name of the property to get when no messasges should be sent. */
+ public static final String SEND_ONLY_PROPNAME = "sendOnly";
+
+ /** Defines the default value of the consumeOnly flag to use when publishing messages is not desired. */
+ public static final boolean SEND_ONLY_DEFAULT = false;
/** Holds the default configuration properties. */
public static ParsedProperties defaults = new ParsedProperties();
@@ -381,7 +386,8 @@ public class PingPongProducer implements Runnable, ExceptionListener
defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT);
defaults.setPropertyIfNull(PREFILL_PROPNAME, PREFILL_DEFAULT);
defaults.setPropertyIfNull(DELAY_BEFORE_CONSUME_PROPNAME, DELAY_BEFORE_CONSUME);
- defaults.setPropertyIfNull(CONSUME_ONLY_PROPNAME, CONSUME_ONLY_DEFAULT);
+ defaults.setPropertyIfNull(CONSUME_ONLY_PROPNAME, CONSUME_ONLY_DEFAULT);
+ defaults.setPropertyIfNull(SEND_ONLY_PROPNAME, SEND_ONLY_DEFAULT);
}
/** Allows setting of client ID on the connection, rather than through the connection URL. */
@@ -490,10 +496,15 @@ public class PingPongProducer implements Runnable, ExceptionListener
/**
* Holds a boolean value of wither this test should just consume, i.e. skips
* sending messages, but still expects to receive the specified number.
- * Use in conjuction with numConsumers=0 to fill the broker.
*/
private boolean _consumeOnly;
+ /**
+ * Holds a boolean value of wither this test should just send, i.e. skips
+ * consuming messages, but still creates clients just doesn't start them.
+ */
+ private boolean _sendOnly;
+
/** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */
private static AtomicLong _correlationIdGenerator = new AtomicLong(0L);
@@ -631,6 +642,7 @@ public class PingPongProducer implements Runnable, ExceptionListener
_preFill = properties.getPropertyAsInteger(PREFILL_PROPNAME);
_delayBeforeConsume = properties.getPropertyAsLong(DELAY_BEFORE_CONSUME_PROPNAME);
_consumeOnly = properties.getPropertyAsBoolean(CONSUME_ONLY_PROPNAME);
+ _sendOnly = properties.getPropertyAsBoolean(SEND_ONLY_PROPNAME);
// Check that one or more destinations were specified.
if (_noOfDestinations < 1)
@@ -1072,8 +1084,8 @@ public class PingPongProducer implements Runnable, ExceptionListener
}
else
{
- log.warn("Got unexpected message with correlationId: " + correlationID);
- log.warn("Map contains:" + perCorrelationIds.entrySet());
+ log.warn(consumerNo + " Got unexpected message with correlationId: " + correlationID);
+ log.warn(consumerNo + " Map contains:" + perCorrelationIds.entrySet());
}
}
else
@@ -1092,6 +1104,21 @@ public class PingPongProducer implements Runnable, ExceptionListener
}
}
+ public void setupCorrelationID(String correlationId, int expectedCount)
+ {
+ PerCorrelationId perCorrelationId = new PerCorrelationId();
+
+ // Create a count down latch to count the number of replies with. This is created before the messages are
+ // sent so that the replies cannot be received before the count down is created.
+ // One is added to this, so that the last reply becomes a special case. The special case is that the
+ // chained message listener must be called before this sender can be unblocked, but that decrementing the
+ // countdown needs to be done before the chained listener can be called.
+ perCorrelationId.trafficLight = new CountDownLatch(expectedCount + 1);
+
+ perCorrelationIds.put(correlationId, perCorrelationId);
+ }
+
+
/**
* Sends the specified number of ping message and then waits for all correlating replies. If the wait times out
* before a reply arrives, then a null reply is returned from this method. This method allows the caller to specify
@@ -1122,33 +1149,24 @@ public class PingPongProducer implements Runnable, ExceptionListener
public int pingAndWaitForReply(Message message, int numPings, int preFill, long timeout, String messageCorrelationId)
throws JMSException, InterruptedException
{
-
- // If we are runnning a consumeOnly test then don't send any messages
-
-
/*log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
+ timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/
+ int totalPingsRequested = numPings + preFill;
+
// Generate a unique correlation id to put on the messages before sending them, if one was not specified.
if (messageCorrelationId == null)
{
messageCorrelationId = Long.toString(_correlationIdGenerator.incrementAndGet());
+
+ setupCorrelationID(messageCorrelationId, getExpectedNumPings(totalPingsRequested));
}
try
{
// NDC.push("prod");
- // Create a count down latch to count the number of replies with. This is created before the messages are
- // sent so that the replies cannot be received before the count down is created.
- // One is added to this, so that the last reply becomes a special case. The special case is that the
- // chained message listener must be called before this sender can be unblocked, but that decrementing the
- // countdown needs to be done before the chained listener can be called.
- PerCorrelationId perCorrelationId = new PerCorrelationId();
-
- int totalPingsRequested = numPings + preFill;
- perCorrelationId.trafficLight = new CountDownLatch(getExpectedNumPings(totalPingsRequested) + 1);
- perCorrelationIds.put(messageCorrelationId, perCorrelationId);
+ PerCorrelationId perCorrelationId = perCorrelationIds.get(messageCorrelationId);
// Set up the current time as the start time for pinging on the correlation id. This is used to determine
// timeouts.
@@ -1167,9 +1185,9 @@ public class PingPongProducer implements Runnable, ExceptionListener
//
// Return the number of requested messages, this will let the test
// report a pass.
- if (_noOfConsumers == 0)
+ if (_noOfConsumers == 0 || _sendOnly)
{
- return totalPingsRequested;
+ return getExpectedNumPings(totalPingsRequested);
}
do
diff --git a/java/perftests/src/main/java/org/apache/qpid/test/testcases/MessageThroughputPerf.java b/java/perftests/src/main/java/org/apache/qpid/test/testcases/MessageThroughputPerf.java
index 9397db82c9..dfb82b9b2d 100644
--- a/java/perftests/src/main/java/org/apache/qpid/test/testcases/MessageThroughputPerf.java
+++ b/java/perftests/src/main/java/org/apache/qpid/test/testcases/MessageThroughputPerf.java
@@ -165,6 +165,14 @@ public class MessageThroughputPerf extends FrameworkBaseCase implements TimingCo
}
/**
+ * Called after all threads have completed their setup.
+ */
+ public void postThreadSetUp()
+ {
+ //Nothing to do here, potentially implement preFill as per PingTestPerf.
+ }
+
+ /**
* Called when a test thread is destroyed.
*/
public void threadTearDown()