summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-01-02 09:47:21 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-01-02 09:47:21 +0000
commit6631e8e980107ad609105d4ef1bb2ee5c4275e8b (patch)
tree57984953fa4ae8c658657e762104475a52a7c1e9 /qpid/java
parente249f157f5963cfc458eca1988fb970f086ced72 (diff)
downloadqpid-python-6631e8e980107ad609105d4ef1bb2ee5c4275e8b.tar.gz
QPID-6294 : [Java Client] Allow use of 0 prefetch in AMQP 0-8/9/9-1
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1648987 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java61
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java14
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/systest/prefetch/ZeroPrefetchTest.java83
3 files changed, 140 insertions, 18 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index d86a2739f2..bb0f0d9b13 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -824,12 +824,13 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
throws AMQException, FailoverException
{
_currentPrefetch.set(0);
- BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(sizePrefetch, messagePrefetch, false);
-
- // todo send low water mark when protocol allows.
- // todo Be aware of possible changes to parameter order as versions change.
- getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), BasicQosOkBody.class);
+ if(messagePrefetch > 0 || sizePrefetch > 0)
+ {
+ BasicQosBody basicQosBody =
+ getProtocolHandler().getMethodRegistry().createBasicQosBody(sizePrefetch, messagePrefetch, false);
+ getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), BasicQosOkBody.class);
+ }
}
@@ -842,13 +843,17 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
public Boolean execute() throws AMQException, FailoverException
{
int currentPrefetch = _currentPrefetch.get();
- if (currentPrefetch >= getPrefetch())
+ if (currentPrefetch >= getPrefetch() && getPrefetch() >= 0)
{
BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry()
.createBasicQosBody(0, currentPrefetch + 1, false);
getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()),
BasicQosOkBody.class);
+ if(currentPrefetch == 0 && !isSuspended())
+ {
+ sendSuspendChannel(false);
+ }
_creditChanged.set(true);
return true;
}
@@ -863,8 +868,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
protected void reduceCreditAfterAcknowledge() throws AMQException
{
- int acknowledgeMode = getAcknowledgeMode();
- boolean manageCredit = acknowledgeMode == javax.jms.Session.CLIENT_ACKNOWLEDGE || acknowledgeMode == javax.jms.Session.SESSION_TRANSACTED;
+ boolean manageCredit = isManagingCredit();
if(manageCredit && _creditChanged.compareAndSet(true,false))
{
@@ -873,18 +877,40 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
{
public Void execute() throws AMQException, FailoverException
{
- BasicQosBody basicQosBody =
- getProtocolHandler().getMethodRegistry()
- .createBasicQosBody(0, getPrefetch(), false);
+ int prefetch = getPrefetch();
+ if(prefetch == 0)
+ {
+ sendSuspendChannel(true);
+ }
+ else
+ {
+ BasicQosBody basicQosBody =
+ getProtocolHandler().getMethodRegistry()
+ .createBasicQosBody(0, prefetch == -1 ? 0 : prefetch, false);
- getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()),
- BasicQosOkBody.class);
+ getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()),
+ BasicQosOkBody.class);
+ }
return null;
}
}, getProtocolHandler().getConnection()).execute();
}
}
+ protected void reduceCreditInPostDeliver()
+ {
+ int acknowledgeMode = getAcknowledgeMode();
+ boolean manageCredit = (acknowledgeMode == AUTO_ACKNOWLEDGE || acknowledgeMode == DUPS_OK_ACKNOWLEDGE) && getPrefetch() == 0;
+
+ if(manageCredit && _creditChanged.compareAndSet(true,false))
+ {
+ ChannelFlowBody body = getMethodRegistry().createChannelFlowBody(false);
+ AMQFrame channelFlowFrame = body.generateFrame(getChannelId());
+ getProtocolHandler().writeFrame(channelFlowFrame, true);
+ }
+ }
+
+
protected void updateCurrentPrefetch(int delta)
{
_currentPrefetch.addAndGet(delta);
@@ -1414,6 +1440,15 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
}
}
+ boolean isManagingCredit()
+ {
+ int acknowledgeMode = getAcknowledgeMode();
+ return acknowledgeMode == CLIENT_ACKNOWLEDGE
+ || acknowledgeMode == SESSION_TRANSACTED
+ || ((acknowledgeMode == AUTO_ACKNOWLEDGE || acknowledgeMode == DUPS_OK_ACKNOWLEDGE) && getPrefetch() == 0);
+ }
+
+
public boolean isFlowBlocked()
{
synchronized (_flowControl)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
index 6718121e6b..3cc50512ed 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
@@ -22,7 +22,6 @@ package org.apache.qpid.client;
import javax.jms.JMSException;
import javax.jms.Message;
-import javax.jms.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -170,8 +169,7 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
@Override
public Message receive(final long l) throws JMSException
{
- int acknowledgeMode = getSession().getAcknowledgeMode();
- boolean manageCredit = acknowledgeMode == Session.CLIENT_ACKNOWLEDGE || acknowledgeMode == Session.SESSION_TRANSACTED;
+ boolean manageCredit = getSession().isManagingCredit();
boolean creditModified = false;
try
{
@@ -199,8 +197,7 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
@Override
public Message receiveNoWait() throws JMSException
{
- int acknowledgeMode = getSession().getAcknowledgeMode();
- boolean manageCredit = acknowledgeMode == Session.CLIENT_ACKNOWLEDGE || acknowledgeMode == Session.SESSION_TRANSACTED;
+ boolean manageCredit = getSession().isManagingCredit();
boolean creditModified = false;
try
{
@@ -228,4 +225,11 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
throw new JMSAMQException(e);
}
}
+
+
+ void postDeliver(AbstractJMSMessage msg)
+ {
+ getSession().reduceCreditInPostDeliver();
+ super.postDeliver(msg);
+ }
}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/systest/prefetch/ZeroPrefetchTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/systest/prefetch/ZeroPrefetchTest.java
new file mode 100644
index 0000000000..6a9fa0175a
--- /dev/null
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/systest/prefetch/ZeroPrefetchTest.java
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systest.prefetch;
+
+import java.util.UUID;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class ZeroPrefetchTest extends QpidBrokerTestCase
+{
+
+ private static final String TEST_PROPERTY_NAME = "testProp";
+
+ // send two messages to the queue, consume and acknowledge one message on connection 1
+ // create a second connection and attempt to consume the second message - this will only be possible
+ // if the first connection has no prefetch
+ public void testZeroPrefetch() throws Exception
+ {
+ setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, "0");
+ Connection prefetch1Connection = getConnection();
+
+ prefetch1Connection.start();
+
+ final Session prefetch1session = prefetch1Connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = prefetch1session.createQueue(getTestQueueName());
+ MessageConsumer prefetch1consumer = prefetch1session.createConsumer(queue);
+
+
+ Session producerSession = prefetch1Connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(queue);
+ Message firstMessage = producerSession.createMessage();
+ String firstPropertyValue = UUID.randomUUID().toString();
+ firstMessage.setStringProperty(TEST_PROPERTY_NAME, firstPropertyValue);
+ producer.send(firstMessage);
+
+ Message secondMessage = producerSession.createMessage();
+ String secondPropertyValue = UUID.randomUUID().toString();
+ secondMessage.setStringProperty(TEST_PROPERTY_NAME, secondPropertyValue);
+ producer.send(secondMessage);
+
+
+ Message receivedMessage = prefetch1consumer.receive(2000l);
+ assertNotNull("First message was not received", receivedMessage);
+ assertEquals("Message property was not as expected", firstPropertyValue, receivedMessage.getStringProperty(TEST_PROPERTY_NAME));
+
+ Connection prefetch2Connection = getConnection();
+
+ prefetch2Connection.start();
+ final Session prefetch2session = prefetch2Connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer prefetch2consumer = prefetch2session.createConsumer(queue);
+
+ receivedMessage = prefetch2consumer.receive(2000l);
+ assertNotNull("Second message was not received", receivedMessage);
+ assertEquals("Message property was not as expected", secondPropertyValue, receivedMessage.getStringProperty(TEST_PROPERTY_NAME));
+
+ }
+}