diff options
Diffstat (limited to 'java/client/src')
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java | 18 |
1 files changed, 14 insertions, 4 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java index bddbc329ab..ee7fc533a3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java +++ b/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java @@ -22,10 +22,11 @@ package org.apache.qpid.client.util; import java.util.Iterator; import java.util.Queue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow * control) to try to prevent the queue growing (much) further. The underlying queue itself is not bounded therefore the @@ -36,6 +37,8 @@ import java.util.concurrent.ConcurrentLinkedQueue; */ public class FlowControllingBlockingQueue { + private static final Logger _logger = LoggerFactory.getLogger(FlowControllingBlockingQueue.class); + /** This queue is bounded and is used to store messages before being dispatched to the consumer */ private final Queue _queue = new ConcurrentLinkedQueue(); @@ -46,6 +49,8 @@ public class FlowControllingBlockingQueue /** We require a separate count so we can track whether we have reached the threshold */ private int _count; + + private boolean disableFlowControl; public boolean isEmpty() { @@ -69,6 +74,10 @@ public class FlowControllingBlockingQueue _flowControlHighThreshold = highThreshold; _flowControlLowThreshold = lowThreshold; _listener = listener; + if (highThreshold == 0) + { + disableFlowControl = true; + } } public Object take() throws InterruptedException @@ -84,7 +93,7 @@ public class FlowControllingBlockingQueue } } } - if (_listener != null) + if (!disableFlowControl && _listener != null) { synchronized (_listener) { @@ -93,6 +102,7 @@ public class FlowControllingBlockingQueue _listener.underThreshold(_count); } } + } return o; @@ -106,7 +116,7 @@ public class FlowControllingBlockingQueue notifyAll(); } - if (_listener != null) + if (!disableFlowControl && _listener != null) { synchronized (_listener) { |
