From 2b29cdf3d73b64ce1de5a2a1a16228bdcb8376d4 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Wed, 9 May 2007 15:20:55 +0000 Subject: Merged revisions 536483-536496 via svnmerge from https://svn.apache.org/repos/asf/incubator/qpid/branches/M2 ........ r536483 | rgreig | 2007-05-09 11:36:06 +0100 (Wed, 09 May 2007) | 1 line Improvements made to max pending message limit code. ........ r536486 | rgreig | 2007-05-09 11:48:39 +0100 (Wed, 09 May 2007) | 1 line Added max buffer limits to all tests. ........ r536496 | rgreig | 2007-05-09 13:11:51 +0100 (Wed, 09 May 2007) | 1 line Improved safety limts for message size tests. ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@536564 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/requestreply/PingPongProducer.java | 94 +++++++++++++--------- 1 file changed, 58 insertions(+), 36 deletions(-) (limited to 'qpid/java/perftests/src/main') diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index 1b4fa6b779..d5d1c304e9 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -98,7 +98,7 @@ import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; * 3 - DUPS_OK_ACKNOWLEDGE * 257 - NO_ACKNOWLEDGE * 258 - PRE_ACKNOWLEDGE - * maxPending 0 The maximum size in bytes, of messages send but not yet received. + * maxPending 0 The maximum size in bytes, of messages sent but not yet received. * Limits the volume of messages currently buffered on the client * or broker. Can help scale test clients by limiting amount of buffered * data to avoid out of memory errors. @@ -373,10 +373,10 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis protected int _maxPendingSize; /** - * Holds a cyclic barrier which is used to synchronize sender and receiver threads, where the sender has elected + * Holds a monitor which is used to synchronize sender and receiver threads, where the sender has elected * to wait until the number of unreceived message is reduced before continuing to send. */ - protected CyclicBarrier _sendPauseBarrier = new CyclicBarrier(2); + protected Object _sendPauseMonitor = new Object(); /** Keeps a count of the number of message currently sent but not received. */ protected AtomicInteger _unreceived = new AtomicInteger(0); @@ -801,23 +801,27 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis int unreceivedSize = (unreceived * ((_messageSize == 0) ? 1 : _messageSize)); // Release a waiting sender if there is one. - if ((_maxPendingSize > 0) && (unreceivedSize < _maxPendingSize) - && (_sendPauseBarrier.getNumberWaiting() == 1)) + synchronized (_sendPauseMonitor) { - log.debug("unreceived size estimate under limit = " + unreceivedSize); - - // Wait on the send pause barrier for the limit to be re-established. - try - { - _sendPauseBarrier.await(); - } - catch (InterruptedException e) + if ((_maxPendingSize > 0) && (unreceivedSize < _maxPendingSize)) + // && (_sendPauseBarrier.getNumberWaiting() == 1)) { - throw new RuntimeException(e); - } - catch (BrokenBarrierException e) - { - throw new RuntimeException(e); + log.debug("unreceived size estimate under limit = " + unreceivedSize); + + // Wait on the send pause barrier for the limit to be re-established. + /*try + {*/ + // _sendPauseBarrier.await(); + _sendPauseMonitor.notify(); + /*} + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + catch (BrokenBarrierException e) + { + throw new RuntimeException(e); + }*/ } } @@ -1052,26 +1056,40 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis waitForUser(KILL_BROKER_PROMPT); } - // Increase the count of sent but not yet received messages. - int unreceived = _unreceived.getAndIncrement(); - int unreceivedSize = (unreceived * ((_messageSize == 0) ? 1 : _messageSize)); - - if ((_maxPendingSize > 0) && (unreceivedSize > _maxPendingSize)) + // If necessary, wait until the max pending message size comes within its limit. + synchronized (_sendPauseMonitor) { - log.debug("unreceived size estimate over limit = " + unreceivedSize); - - // Wait on the send pause barrier for the limit to be re-established. - try - { - _sendPauseBarrier.await(); - } - catch (InterruptedException e) + while ((_maxPendingSize > 0)) { - throw new RuntimeException(e); - } - catch (BrokenBarrierException e) - { - throw new RuntimeException(e); + // Get the size estimate of sent but not yet received messages. + int unreceived = _unreceived.get(); + int unreceivedSize = (unreceived * ((_messageSize == 0) ? 1 : _messageSize)); + + if (unreceivedSize > _maxPendingSize) + { + log.debug("unreceived size estimate over limit = " + unreceivedSize); + + // Wait on the send pause barrier for the limit to be re-established. + try + { + // _sendPauseBarrier.await(); + _sendPauseMonitor.wait(1000); + } + catch (InterruptedException e) + { + // Restore the interrupted status + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + /*catch (BrokenBarrierException e) + { + throw new RuntimeException(e); + }*/ + } + else + { + break; + } } } @@ -1085,6 +1103,9 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis _producer.send(destination, message); } + // Increase the unreceived size, this may actually happen aftern the message is recevied. + _unreceived.getAndIncrement(); + // Apply message rate throttling if a rate limit has been set up. if (_rateLimiter != null) { @@ -1300,6 +1321,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * @return true if the session was committed, false if it was not. * * @throws javax.jms.JMSException If the commit fails and then the rollback fails. + * * @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit * method, because commits only apply to transactional pingers, but fail after send applied to transactional and * non-transactional alike. -- cgit v1.2.1