From 1176b9654e4dc089eeedd32f791d910048c68730 Mon Sep 17 00:00:00 2001 From: Rupert Smith Date: Wed, 10 Oct 2007 15:45:56 +0000 Subject: Changed maxPending to be by message correlation id. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@583518 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/requestreply/PingPongProducer.java | 37 ++++++++++++---------- 1 file changed, 21 insertions(+), 16 deletions(-) (limited to 'java/perftests/src') diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index 70e548d613..5e1f35053a 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -419,15 +419,6 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti */ protected int _maxPendingSize; - /** - * Holds a monitor which is used to synchronize sender and receivers threads, where the sender has elected - * to wait until the number of unreceived message is reduced before continuing to send. - */ - protected static final Object _sendPauseMonitor = new Object(); - - /** Keeps a count of the number of message currently sent but not received. */ - protected static AtomicInteger _unreceived = new AtomicInteger(0); - /** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */ private static AtomicLong _correlationIdGenerator = new AtomicLong(0L); @@ -898,7 +889,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti if ((_maxPendingSize > 0)) { // Decrement the count of sent but not yet received messages. - int unreceived = _unreceived.decrementAndGet(); + int unreceived = perCorrelationId._unreceived.decrementAndGet(); int unreceivedSize = (unreceived * ((_messageSize == 0) ? 1 : _messageSize)) / (_isPubSub ? getConsumersPerDestination() : 1); @@ -906,11 +897,11 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti // log.debug("unreceived = " + unreceived); // log.debug("unreceivedSize = " + unreceivedSize); - synchronized (_sendPauseMonitor) + synchronized (perCorrelationId._sendPauseMonitor) { if (unreceivedSize < _maxPendingSize) { - _sendPauseMonitor.notifyAll(); + perCorrelationId._sendPauseMonitor.notify(); } } } @@ -1169,10 +1160,14 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti // Prompt the user to kill the broker when doing failover testing. _failBeforeSend = waitForUserToPromptOnFailure(_failBeforeSend); + // Get the test setup for the correlation id. + String correlationID = message.getJMSCorrelationID(); + PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID); + // If necessary, wait until the max pending message size comes within its limit. if (_maxPendingSize > 0) { - synchronized (_sendPauseMonitor) + synchronized (perCorrelationId._sendPauseMonitor) { // Used to keep track of the number of times that send has to wait. int numWaits = 0; @@ -1184,7 +1179,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti while (true) { // Get the size estimate of sent but not yet received messages. - int unreceived = _unreceived.get(); + int unreceived = perCorrelationId._unreceived.get(); int unreceivedSize = (unreceived * ((_messageSize == 0) ? 1 : _messageSize)) / (_isPubSub ? getConsumersPerDestination() : 1); @@ -1212,7 +1207,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti try { long start = System.nanoTime(); - _sendPauseMonitor.wait(10000); + perCorrelationId._sendPauseMonitor.wait(10000); long end = System.nanoTime(); // Count the wait only if it was for > 99% of the requested wait time. @@ -1255,7 +1250,8 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti // in pub/sub mode. if (_maxPendingSize > 0) { - int newUnreceivedCount = _unreceived.addAndGet(_isPubSub ? getConsumersPerDestination() : 1); + int newUnreceivedCount = + perCorrelationId._unreceived.addAndGet(_isPubSub ? getConsumersPerDestination() : 1); // log.debug("newUnreceivedCount = " + newUnreceivedCount); } @@ -1676,5 +1672,14 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti /** Holds the last timestamp that the timeout was reset to. */ Long timeOutStart; + + /** + * Holds a monitor which is used to synchronize sender and receivers threads, where the sender has elected + * to wait until the number of unreceived message is reduced before continuing to send. + */ + final Object _sendPauseMonitor = new Object(); + + /** Keeps a count of the number of message currently sent but not received. */ + AtomicInteger _unreceived = new AtomicInteger(0); } } -- cgit v1.2.1