diff options
Diffstat (limited to 'java/client')
| -rw-r--r-- | java/client/src/org/apache/qpid/client/AMQSession.java | 7 | ||||
| -rw-r--r-- | java/client/src/org/apache/qpid/client/util/FlowControllingBlockingQueue.java | 19 |
2 files changed, 20 insertions, 6 deletions
diff --git a/java/client/src/org/apache/qpid/client/AMQSession.java b/java/client/src/org/apache/qpid/client/AMQSession.java index 4768399036..3bc670e609 100644 --- a/java/client/src/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/org/apache/qpid/client/AMQSession.java @@ -220,6 +220,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _channelId = channelId; _messageFactoryRegistry = messageFactoryRegistry; _defaultPrefetch = defaultPrefetch; + if (_acknowledgeMode == NO_ACKNOWLEDGE) + { _queue = new FlowControllingBlockingQueue(_defaultPrefetch, new FlowControllingBlockingQueue.ThresholdListener() { @@ -241,6 +243,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } }); + } + else + { + _queue = new FlowControllingBlockingQueue(_defaultPrefetch,null); + } } AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode) diff --git a/java/client/src/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/java/client/src/org/apache/qpid/client/util/FlowControllingBlockingQueue.java index ad2ca7b731..89e6968e44 100644 --- a/java/client/src/org/apache/qpid/client/util/FlowControllingBlockingQueue.java +++ b/java/client/src/org/apache/qpid/client/util/FlowControllingBlockingQueue.java @@ -63,11 +63,14 @@ public class FlowControllingBlockingQueue public Object take() throws InterruptedException { Object o = _queue.take(); - synchronized (_listener) + if (_listener != null) { - if (--_count == (_flowControlThreshold - 1)) + synchronized(_listener) { - _listener.underThreshold(_count); + if (--_count == (_flowControlThreshold - 1)) + { + _listener.underThreshold(_count); + } } } return o; @@ -76,12 +79,16 @@ public class FlowControllingBlockingQueue public void add(Object o) { _queue.add(o); - synchronized (_listener) + if (_listener != null) { - if (++_count == _flowControlThreshold) + synchronized(_listener) { - _listener.aboveThreshold(_count); + if (++_count == _flowControlThreshold) + { + _listener.aboveThreshold(_count); + } } } } } + |
