diff options
Diffstat (limited to 'java/client/src/test')
| -rw-r--r-- | java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java | 41 |
1 files changed, 41 insertions, 0 deletions
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java index 9ff02c3b71..817fcfb9e8 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java @@ -169,7 +169,48 @@ public class RecoverTest extends TestCase con.close(); } + public void testAcknowledgePerConsumer() throws Exception + { + Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + + Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = new AMQQueue("Q1", "Q1", false, true); + Queue queue2 = new AMQQueue("Q2", "Q2", false, true); + MessageConsumer consumer = consumerSession.createConsumer(queue); + MessageConsumer consumer2 = consumerSession.createConsumer(queue2); + //force synch to ensure the consumer has resulted in a bound queue + ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct"); + + Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); + Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(queue); + MessageProducer producer2 = producerSession.createProducer(queue2); + + producer.send(producerSession.createTextMessage("msg1")); + producer2.send(producerSession.createTextMessage("msg2")); + + con2.close(); + + _logger.info("Starting connection"); + con.start(); + + TextMessage tm2 = (TextMessage) consumer2.receive(); + assertNotNull(tm2); + assertEquals("msg2",tm2.getText()); + + tm2.acknowledge(); + + consumerSession.recover(); + + TextMessage tm1 = (TextMessage) consumer.receive(2000); + assertNotNull(tm1); + assertEquals("msg1",tm1.getText()); + + con.close(); + + } + public static junit.framework.Test suite() { return new junit.framework.TestSuite(RecoverTest.class); |
