From 6631e8e980107ad609105d4ef1bb2ee5c4275e8b Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Fri, 2 Jan 2015 09:47:21 +0000 Subject: QPID-6294 : [Java Client] Allow use of 0 prefetch in AMQP 0-8/9/9-1 git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1648987 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/client/AMQSession_0_8.java | 61 ++++++++++++---- .../qpid/client/BasicMessageConsumer_0_8.java | 14 ++-- .../qpid/systest/prefetch/ZeroPrefetchTest.java | 83 ++++++++++++++++++++++ 3 files changed, 140 insertions(+), 18 deletions(-) create mode 100644 qpid/java/systests/src/test/java/org/apache/qpid/systest/prefetch/ZeroPrefetchTest.java (limited to 'qpid/java') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index d86a2739f2..bb0f0d9b13 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -824,12 +824,13 @@ public class AMQSession_0_8 extends AMQSession 0 || sizePrefetch > 0) + { + BasicQosBody basicQosBody = + getProtocolHandler().getMethodRegistry().createBasicQosBody(sizePrefetch, messagePrefetch, false); + getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), BasicQosOkBody.class); + } } @@ -842,13 +843,17 @@ public class AMQSession_0_8 extends AMQSession= getPrefetch()) + if (currentPrefetch >= getPrefetch() && getPrefetch() >= 0) { BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry() .createBasicQosBody(0, currentPrefetch + 1, false); getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), BasicQosOkBody.class); + if(currentPrefetch == 0 && !isSuspended()) + { + sendSuspendChannel(false); + } _creditChanged.set(true); return true; } @@ -863,8 +868,7 @@ public class AMQSession_0_8 extends AMQSession