From 495a47e8feeb3cf3110ed9b972775fa40f58be00 Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Tue, 18 May 2010 23:05:36 +0000 Subject: Implemented the feature described in QPID-2515 However a few issues needs to be ironed out - see the JIRA for these issues. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@945945 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/client/AMQSession_0_10.java | 32 +++++++++++++---- .../qpid/client/BasicMessageConsumer_0_10.java | 40 ++++++++++++++++------ 2 files changed, 55 insertions(+), 17 deletions(-) (limited to 'java/client/src') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 5344f77539..4aca7454bd 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -142,7 +142,7 @@ public class AMQSession_0_10 extends AMQSession 0 && _dispatcher != null && (isStarted() || _immediatePrefetch)) { // set the flow getQpidSession().messageFlow(consumerTag, MessageCreditUnit.MESSAGE, - getAMQConnection().getMaxPrefetch(), + capacity, Option.UNRELIABLE); } @@ -604,6 +607,21 @@ public class AMQSession_0_10 extends AMQSession 0) + { + capacity = destination.getSourceLink().getCapacity(); + } + else if (prefetch()) + { + capacity = getAMQConnection().getMaxPrefetch(); + } + return capacity; + } /** * Create an 0_10 message producer @@ -744,7 +762,9 @@ public class AMQSession_0_10 extends AMQSession 0) + { + capacity = destination.getSourceLink().getCapacity(); + } + else if (getSession().prefetch()) + { + capacity = _0_10session.getAMQConnection().getMaxPrefetch(); + } + } @@ -146,7 +161,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer 0) { _0_10session.getQpidSession().messageFlow - (getConsumerTagString(), MessageCreditUnit.MESSAGE, - _0_10session.getAMQConnection().getMaxPrefetch(), - Option.UNRELIABLE); + (getConsumerTagString(), + MessageCreditUnit.MESSAGE, + capacity, + Option.UNRELIABLE); } _0_10session.syncDispatchQueue(); o = super.getMessageFromQueue(-1); } - if (! getSession().prefetch()) + if (capacity == 0) { _syncReceive.set(false); } -- cgit v1.2.1