summaryrefslogtreecommitdiff
path: root/java/perftests/src/main
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-01-25 10:04:52 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-01-25 10:04:52 +0000
commitde2c61fdc2ee7693395264afc28b8d69c45243bf (patch)
treeed82510822df3b000e8b25e425297334458dbc90 /java/perftests/src/main
parente18eda8c992e8042f76ccb8bf2fed4b72489c44d (diff)
downloadqpid-python-de2c61fdc2ee7693395264afc28b8d69c45243bf.tar.gz
Race condition fixed fro AsyncTestPerf
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@499716 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/perftests/src/main')
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java85
1 files changed, 50 insertions, 35 deletions
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
index be46c1805b..1368f631fb 100644
--- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
+++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
@@ -76,7 +76,9 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
*/
protected static final int DEFAULT_MESSAGE_SIZE = 0;
- /** This is set and used when the test is for multiple-destinations */
+ /**
+ * This is set and used when the test is for multiple-destinations
+ */
protected static final int DEFAULT_DESTINATION_COUNT = 0;
protected static final int DEFAULT_RATE = 0;
@@ -202,10 +204,10 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
_throttleBatchSize = (int) Math.pow(100, x);
int throttleRate = rate / _throttleBatchSize;
- _logger.info("rate = " + rate);
- _logger.info("x = " + x);
- _logger.info("_throttleBatchSize = " + _throttleBatchSize);
- _logger.info("throttleRate = " + throttleRate);
+ _logger.debug("rate = " + rate);
+ _logger.debug("x = " + x);
+ _logger.debug("_throttleBatchSize = " + _throttleBatchSize);
+ _logger.debug("throttleRate = " + throttleRate);
rateLimiter = new Throttle();
rateLimiter.setRate(throttleRate);
@@ -519,6 +521,11 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
if (trafficLight != null)
{
+ if (_messageListener != null)
+ {
+ _messageListener.onMessage(message);
+ }
+
_logger.trace("Reply was expected, decrementing the latch for the id.");
trafficLight.countDown();
@@ -529,11 +536,6 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
commitTx(getConsumerSession());
}
- if (_messageListener != null)
- {
- _messageListener.onMessage(message);
- }
-
}
else
{
@@ -570,32 +572,39 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
*/
public int pingAndWaitForReply(Message message, int numPings, long timeout) throws JMSException, InterruptedException
{
- // Put a unique correlation id on the message before sending it.
- String messageCorrelationId = Long.toString(getNewID());
+ String messageCorrelationId = null;
- pingNoWaitForReply(message, numPings, messageCorrelationId);
+ try
+ {
+ // Put a unique correlation id on the message before sending it.
+ messageCorrelationId = Long.toString(getNewID());
- CountDownLatch trafficLight = trafficLights.get(messageCorrelationId);
- // Block the current thread until a reply to the message is received, or it times out.
- trafficLight.await(timeout, TimeUnit.MILLISECONDS);
+ pingNoWaitForReply(message, numPings, messageCorrelationId);
- trafficLights.remove(messageCorrelationId);
+ CountDownLatch trafficLight = trafficLights.get(messageCorrelationId);
+ // Block the current thread until a reply to the message is received, or it times out.
+ trafficLight.await(timeout, TimeUnit.MILLISECONDS);
- // Work out how many replies were receieved.
- int numReplies = numPings - (int) trafficLight.getCount();
+ // Work out how many replies were receieved.
+ int numReplies = numPings - (int) trafficLight.getCount();
- if ((numReplies < numPings) && _verbose)
- {
- _logger.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId);
+ if ((numReplies < numPings) && _verbose)
+ {
+ _logger.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId);
+ }
+ else if (_verbose)
+ {
+ _logger.info("Got all replies on id, " + messageCorrelationId);
+ }
+
+ commitTx(getConsumerSession());
+
+ return numReplies;
}
- else if (_verbose)
+ finally
{
- _logger.info("Got all replies on id, " + messageCorrelationId);
+ removeLock(messageCorrelationId);
}
-
- commitTx(getConsumerSession());
-
- return numReplies;
}
public long getNewID()
@@ -603,14 +612,20 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
return idGenerator.incrementAndGet();
}
+ public CountDownLatch removeLock(String correlationID)
+ {
+ return trafficLights.remove(correlationID);
+ }
+
+
/*
- * Sends the specified ping message but does not wait for a correlating reply.
- *
- * @param message The message to send.
- * @param numPings The number of pings to send.
- * @return The reply, or null if no reply arrives before the timeout.
- * @throws JMSException All underlying JMSExceptions are allowed to fall through.
- */
+ * Sends the specified ping message but does not wait for a correlating reply.
+ *
+ * @param message The message to send.
+ * @param numPings The number of pings to send.
+ * @return The reply, or null if no reply arrives before the timeout.
+ * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+ */
public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException, InterruptedException
{
// Create a count down latch to count the number of replies with. This is created before the message is sent