From de2c61fdc2ee7693395264afc28b8d69c45243bf Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Thu, 25 Jan 2007 10:04:52 +0000 Subject: Race condition fixed fro AsyncTestPerf git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@499716 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/requestreply/PingPongProducer.java | 85 +++++++++++++--------- 1 file changed, 50 insertions(+), 35 deletions(-) (limited to 'java/perftests/src/main') diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index be46c1805b..1368f631fb 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -76,7 +76,9 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, */ protected static final int DEFAULT_MESSAGE_SIZE = 0; - /** This is set and used when the test is for multiple-destinations */ + /** + * This is set and used when the test is for multiple-destinations + */ protected static final int DEFAULT_DESTINATION_COUNT = 0; protected static final int DEFAULT_RATE = 0; @@ -202,10 +204,10 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, _throttleBatchSize = (int) Math.pow(100, x); int throttleRate = rate / _throttleBatchSize; - _logger.info("rate = " + rate); - _logger.info("x = " + x); - _logger.info("_throttleBatchSize = " + _throttleBatchSize); - _logger.info("throttleRate = " + throttleRate); + _logger.debug("rate = " + rate); + _logger.debug("x = " + x); + _logger.debug("_throttleBatchSize = " + _throttleBatchSize); + _logger.debug("throttleRate = " + throttleRate); rateLimiter = new Throttle(); rateLimiter.setRate(throttleRate); @@ -519,6 +521,11 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, if (trafficLight != null) { + if (_messageListener != null) + { + _messageListener.onMessage(message); + } + _logger.trace("Reply was expected, decrementing the latch for the id."); trafficLight.countDown(); @@ -529,11 +536,6 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, commitTx(getConsumerSession()); } - if (_messageListener != null) - { - _messageListener.onMessage(message); - } - } else { @@ -570,32 +572,39 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, */ public int pingAndWaitForReply(Message message, int numPings, long timeout) throws JMSException, InterruptedException { - // Put a unique correlation id on the message before sending it. - String messageCorrelationId = Long.toString(getNewID()); + String messageCorrelationId = null; - pingNoWaitForReply(message, numPings, messageCorrelationId); + try + { + // Put a unique correlation id on the message before sending it. + messageCorrelationId = Long.toString(getNewID()); - CountDownLatch trafficLight = trafficLights.get(messageCorrelationId); - // Block the current thread until a reply to the message is received, or it times out. - trafficLight.await(timeout, TimeUnit.MILLISECONDS); + pingNoWaitForReply(message, numPings, messageCorrelationId); - trafficLights.remove(messageCorrelationId); + CountDownLatch trafficLight = trafficLights.get(messageCorrelationId); + // Block the current thread until a reply to the message is received, or it times out. + trafficLight.await(timeout, TimeUnit.MILLISECONDS); - // Work out how many replies were receieved. - int numReplies = numPings - (int) trafficLight.getCount(); + // Work out how many replies were receieved. + int numReplies = numPings - (int) trafficLight.getCount(); - if ((numReplies < numPings) && _verbose) - { - _logger.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId); + if ((numReplies < numPings) && _verbose) + { + _logger.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId); + } + else if (_verbose) + { + _logger.info("Got all replies on id, " + messageCorrelationId); + } + + commitTx(getConsumerSession()); + + return numReplies; } - else if (_verbose) + finally { - _logger.info("Got all replies on id, " + messageCorrelationId); + removeLock(messageCorrelationId); } - - commitTx(getConsumerSession()); - - return numReplies; } public long getNewID() @@ -603,14 +612,20 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, return idGenerator.incrementAndGet(); } + public CountDownLatch removeLock(String correlationID) + { + return trafficLights.remove(correlationID); + } + + /* - * Sends the specified ping message but does not wait for a correlating reply. - * - * @param message The message to send. - * @param numPings The number of pings to send. - * @return The reply, or null if no reply arrives before the timeout. - * @throws JMSException All underlying JMSExceptions are allowed to fall through. - */ + * Sends the specified ping message but does not wait for a correlating reply. + * + * @param message The message to send. + * @param numPings The number of pings to send. + * @return The reply, or null if no reply arrives before the timeout. + * @throws JMSException All underlying JMSExceptions are allowed to fall through. + */ public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException, InterruptedException { // Create a count down latch to count the number of replies with. This is created before the message is sent -- cgit v1.2.1