diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2015-01-20 16:47:50 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2015-01-20 16:47:50 +0000 |
| commit | 9dfc81a98121ac26304790dd474a9bb4e3962496 (patch) | |
| tree | a843b654d35d4926e99ab8e5da3da32a9c6d9946 /qpid/java | |
| parent | 5d8797f0deec8ee295634f42fb6ac7b366e45165 (diff) | |
| download | qpid-python-9dfc81a98121ac26304790dd474a9bb4e3962496.tar.gz | |
QPID-6294 : Fix issue whereby AUTO ACK receive consumer would only ever receive one message since _currentPrefetch would never be zeroed
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1653293 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
| -rw-r--r-- | qpid/java/client/example/src/main/java/org/apache/qpid/example/Hello.java | 37 | ||||
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java | 7 |
2 files changed, 39 insertions, 5 deletions
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/Hello.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/Hello.java index 7e956698d1..aab08ebf0f 100644 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/Hello.java +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/Hello.java @@ -34,6 +34,9 @@ import javax.jms.TextMessage; import javax.naming.Context; import javax.naming.InitialContext; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.configuration.ClientProperties; + public class Hello { @@ -42,10 +45,38 @@ public class Hello { } - public static void main(String[] args) + public static void main(String[] args) throws Exception { - Hello hello = new Hello(); - hello.runTest(); + System.setProperty(ClientProperties.AMQP_VERSION, "0-91"); + System.setProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, "0"); + System.setProperty(ClientProperties.DEST_SYNTAX, "BURL"); + + Connection conn = new AMQConnection("127.0.0.1", 5672, "admin","admin", "client", "/"); + + conn.start(); + + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue("queue"); + + MessageConsumer consumer = session.createConsumer(destination); + MessageProducer producer = session.createProducer(destination); + + for(int i = 0 ; i < 2 ; i ++) + { + TextMessage message = (TextMessage) consumer.receive(1000l); + System.out.println(message == null ? "null" : message.getText()); + } + for(int i = 0 ; i < 2 ; i ++) + { + TextMessage message = session.createTextMessage("Hello " + i); + producer.send(message); + } + + for(int i = 0 ; i < 2 ; i ++) + { + TextMessage message = (TextMessage) consumer.receive(1000l); + System.out.println(message == null ? "null" : message.getText()); + } } private void runTest() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index 3cc50512ed..1d7bb6087a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -22,6 +22,7 @@ package org.apache.qpid.client; import javax.jms.JMSException; import javax.jms.Message; +import javax.jms.Session; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -182,7 +183,8 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe { getSession().reduceCreditAfterAcknowledge(); } - if (manageCredit && message != null) + if (manageCredit && !(getSession().getAcknowledgeMode() == Session.AUTO_ACKNOWLEDGE + || getSession().getAcknowledgeMode() == Session.DUPS_OK_ACKNOWLEDGE) && message != null) { getSession().updateCurrentPrefetch(1); } @@ -214,7 +216,8 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe { getSession().reduceCreditAfterAcknowledge(); } - if (manageCredit && message != null) + if (manageCredit && !(getSession().getAcknowledgeMode() == Session.AUTO_ACKNOWLEDGE + || getSession().getAcknowledgeMode() == Session.DUPS_OK_ACKNOWLEDGE) && message != null) { getSession().updateCurrentPrefetch(1); } |
