summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java18
1 files changed, 14 insertions, 4 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
index bddbc329ab..ee7fc533a3 100644
--- a/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
+++ b/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
@@ -22,10 +22,11 @@ package org.apache.qpid.client.util;
import java.util.Iterator;
import java.util.Queue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow
* control) to try to prevent the queue growing (much) further. The underlying queue itself is not bounded therefore the
@@ -36,6 +37,8 @@ import java.util.concurrent.ConcurrentLinkedQueue;
*/
public class FlowControllingBlockingQueue
{
+ private static final Logger _logger = LoggerFactory.getLogger(FlowControllingBlockingQueue.class);
+
/** This queue is bounded and is used to store messages before being dispatched to the consumer */
private final Queue _queue = new ConcurrentLinkedQueue();
@@ -46,6 +49,8 @@ public class FlowControllingBlockingQueue
/** We require a separate count so we can track whether we have reached the threshold */
private int _count;
+
+ private boolean disableFlowControl;
public boolean isEmpty()
{
@@ -69,6 +74,10 @@ public class FlowControllingBlockingQueue
_flowControlHighThreshold = highThreshold;
_flowControlLowThreshold = lowThreshold;
_listener = listener;
+ if (highThreshold == 0)
+ {
+ disableFlowControl = true;
+ }
}
public Object take() throws InterruptedException
@@ -84,7 +93,7 @@ public class FlowControllingBlockingQueue
}
}
}
- if (_listener != null)
+ if (!disableFlowControl && _listener != null)
{
synchronized (_listener)
{
@@ -93,6 +102,7 @@ public class FlowControllingBlockingQueue
_listener.underThreshold(_count);
}
}
+
}
return o;
@@ -106,7 +116,7 @@ public class FlowControllingBlockingQueue
notifyAll();
}
- if (_listener != null)
+ if (!disableFlowControl && _listener != null)
{
synchronized (_listener)
{