From be115e3b33e6319047e8cdc3b913f14f98aa4b23 Mon Sep 17 00:00:00 2001 From: Robert Greig Date: Mon, 18 Dec 2006 09:04:09 +0000 Subject: QPID-209 : Patch supplied by Rob Godfrey - Fix acknowledge so it only acknowledges messages that have actually been consumed git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@488159 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/test/unit/ack/RecoverTest.java | 41 ++++++++++++++++++++++ 1 file changed, 41 insertions(+) (limited to 'java/client/src/test') diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java index 9ff02c3b71..817fcfb9e8 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java @@ -169,7 +169,48 @@ public class RecoverTest extends TestCase con.close(); } + public void testAcknowledgePerConsumer() throws Exception + { + Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + + Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = new AMQQueue("Q1", "Q1", false, true); + Queue queue2 = new AMQQueue("Q2", "Q2", false, true); + MessageConsumer consumer = consumerSession.createConsumer(queue); + MessageConsumer consumer2 = consumerSession.createConsumer(queue2); + //force synch to ensure the consumer has resulted in a bound queue + ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct"); + + Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); + Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(queue); + MessageProducer producer2 = producerSession.createProducer(queue2); + + producer.send(producerSession.createTextMessage("msg1")); + producer2.send(producerSession.createTextMessage("msg2")); + + con2.close(); + + _logger.info("Starting connection"); + con.start(); + + TextMessage tm2 = (TextMessage) consumer2.receive(); + assertNotNull(tm2); + assertEquals("msg2",tm2.getText()); + + tm2.acknowledge(); + + consumerSession.recover(); + + TextMessage tm1 = (TextMessage) consumer.receive(2000); + assertNotNull(tm1); + assertEquals("msg1",tm1.getText()); + + con.close(); + + } + public static junit.framework.Test suite() { return new junit.framework.TestSuite(RecoverTest.class); -- cgit v1.2.1