summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-01-20 16:47:50 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-01-20 16:47:50 +0000
commit9dfc81a98121ac26304790dd474a9bb4e3962496 (patch)
treea843b654d35d4926e99ab8e5da3da32a9c6d9946 /qpid/java
parent5d8797f0deec8ee295634f42fb6ac7b366e45165 (diff)
downloadqpid-python-9dfc81a98121ac26304790dd474a9bb4e3962496.tar.gz
QPID-6294 : Fix issue whereby AUTO ACK receive consumer would only ever receive one message since _currentPrefetch would never be zeroed
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1653293 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/Hello.java37
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java7
2 files changed, 39 insertions, 5 deletions
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/Hello.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/Hello.java
index 7e956698d1..aab08ebf0f 100644
--- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/Hello.java
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/Hello.java
@@ -34,6 +34,9 @@ import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.configuration.ClientProperties;
+
public class Hello
{
@@ -42,10 +45,38 @@ public class Hello
{
}
- public static void main(String[] args)
+ public static void main(String[] args) throws Exception
{
- Hello hello = new Hello();
- hello.runTest();
+ System.setProperty(ClientProperties.AMQP_VERSION, "0-91");
+ System.setProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, "0");
+ System.setProperty(ClientProperties.DEST_SYNTAX, "BURL");
+
+ Connection conn = new AMQConnection("127.0.0.1", 5672, "admin","admin", "client", "/");
+
+ conn.start();
+
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination destination = session.createQueue("queue");
+
+ MessageConsumer consumer = session.createConsumer(destination);
+ MessageProducer producer = session.createProducer(destination);
+
+ for(int i = 0 ; i < 2 ; i ++)
+ {
+ TextMessage message = (TextMessage) consumer.receive(1000l);
+ System.out.println(message == null ? "null" : message.getText());
+ }
+ for(int i = 0 ; i < 2 ; i ++)
+ {
+ TextMessage message = session.createTextMessage("Hello " + i);
+ producer.send(message);
+ }
+
+ for(int i = 0 ; i < 2 ; i ++)
+ {
+ TextMessage message = (TextMessage) consumer.receive(1000l);
+ System.out.println(message == null ? "null" : message.getText());
+ }
}
private void runTest()
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
index 3cc50512ed..1d7bb6087a 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
@@ -22,6 +22,7 @@ package org.apache.qpid.client;
import javax.jms.JMSException;
import javax.jms.Message;
+import javax.jms.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -182,7 +183,8 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
{
getSession().reduceCreditAfterAcknowledge();
}
- if (manageCredit && message != null)
+ if (manageCredit && !(getSession().getAcknowledgeMode() == Session.AUTO_ACKNOWLEDGE
+ || getSession().getAcknowledgeMode() == Session.DUPS_OK_ACKNOWLEDGE) && message != null)
{
getSession().updateCurrentPrefetch(1);
}
@@ -214,7 +216,8 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
{
getSession().reduceCreditAfterAcknowledge();
}
- if (manageCredit && message != null)
+ if (manageCredit && !(getSession().getAcknowledgeMode() == Session.AUTO_ACKNOWLEDGE
+ || getSession().getAcknowledgeMode() == Session.DUPS_OK_ACKNOWLEDGE) && message != null)
{
getSession().updateCurrentPrefetch(1);
}