summaryrefslogtreecommitdiff
path: root/java/client/src/test
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2006-12-18 09:04:09 +0000
committerRobert Greig <rgreig@apache.org>2006-12-18 09:04:09 +0000
commitbe115e3b33e6319047e8cdc3b913f14f98aa4b23 (patch)
tree20a5b1552b03e591f3c8b7ebd3e7962bcb3c46a2 /java/client/src/test
parent1aca5ed069697d2aa0eb743e53e787f79a299902 (diff)
downloadqpid-python-be115e3b33e6319047e8cdc3b913f14f98aa4b23.tar.gz
QPID-209 : Patch supplied by Rob Godfrey - Fix acknowledge so it only acknowledges messages that have actually been consumed
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@488159 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/test')
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java41
1 files changed, 41 insertions, 0 deletions
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
index 9ff02c3b71..817fcfb9e8 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
@@ -169,7 +169,48 @@ public class RecoverTest extends TestCase
con.close();
}
+ public void testAcknowledgePerConsumer() throws Exception
+ {
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+
+ Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = new AMQQueue("Q1", "Q1", false, true);
+ Queue queue2 = new AMQQueue("Q2", "Q2", false, true);
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
+ MessageConsumer consumer2 = consumerSession.createConsumer(queue2);
+ //force synch to ensure the consumer has resulted in a bound queue
+ ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct");
+
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+ Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(queue);
+ MessageProducer producer2 = producerSession.createProducer(queue2);
+
+ producer.send(producerSession.createTextMessage("msg1"));
+ producer2.send(producerSession.createTextMessage("msg2"));
+
+ con2.close();
+
+ _logger.info("Starting connection");
+ con.start();
+
+ TextMessage tm2 = (TextMessage) consumer2.receive();
+ assertNotNull(tm2);
+ assertEquals("msg2",tm2.getText());
+
+ tm2.acknowledge();
+
+ consumerSession.recover();
+
+ TextMessage tm1 = (TextMessage) consumer.receive(2000);
+ assertNotNull(tm1);
+ assertEquals("msg1",tm1.getText());
+
+ con.close();
+
+ }
+
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(RecoverTest.class);