diff options
| author | Robert Gemmell <robbie@apache.org> | 2010-02-24 16:56:16 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2010-02-24 16:56:16 +0000 |
| commit | 537ea5882ed0d6afd0202a3c793d04327470fc27 (patch) | |
| tree | b019104acf0b78ee69d02d47f6ef7aefaef5e0b8 /java | |
| parent | 761a6f7c4b91419e5c67e05e69a3eb0666532722 (diff) | |
| download | qpid-python-537ea5882ed0d6afd0202a3c793d04327470fc27.tar.gz | |
QPID-2417: update the auto-ack tests to leave an unacked message on the durable subscriptions queue between disconnect and reconnect, create consumer1 on the correct connection in SessionPerConnection tests, add single connection NO_ACK test.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@915866 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
| -rw-r--r-- | java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java | 103 |
1 files changed, 80 insertions, 23 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java index a907e6a29d..59c37f6b71 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java @@ -115,6 +115,11 @@ public class DurableSubscriptionTest extends QpidTestCase _logger.info("Close connection"); con.close(); } + + public void testDurabilityNOACK() throws Exception + { + durabilityImpl(AMQSession.NO_ACKNOWLEDGE); + } public void testDurabilityAUTOACK() throws Exception { @@ -138,49 +143,78 @@ public class DurableSubscriptionTest extends QpidTestCase Session session1 = con.createSession(false, ackMode); MessageConsumer consumer1 = session1.createConsumer(topic); - Session sessionProd = con.createSession(false, AMQSession.NO_ACKNOWLEDGE); + Session sessionProd = con.createSession(false, ackMode); MessageProducer producer = sessionProd.createProducer(topic); - Session session2 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE); + Session session2 = con.createSession(false, ackMode); TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription"); con.start(); + //send message A and check both consumers receive producer.send(session1.createTextMessage("A")); Message msg; + _logger.info("Receive message on consumer 1 :expecting A"); msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT); assertNotNull("Message should have been received",msg); assertEquals("A", ((TextMessage) msg).getText()); msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT); assertEquals(null, msg); + _logger.info("Receive message on consumer 2 :expecting A"); msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT); assertNotNull("Message should have been received",msg); assertEquals("A", ((TextMessage) msg).getText()); msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT); assertEquals(null, msg); - consumer2.close(); - session2.close(); - + //send message B, receive with consumer 1, and disconnect consumer 2 to leave the message behind (if not NO_ACK) producer.send(session1.createTextMessage("B")); _logger.info("Receive message on consumer 1 :expecting B"); msg = consumer1.receive(500); assertNotNull("Consumer 1 should get message 'B'.", msg); - assertEquals("Incorrect Message recevied on consumer1.", "B", ((TextMessage) msg).getText()); + assertEquals("Incorrect Message received on consumer1.", "B", ((TextMessage) msg).getText()); _logger.info("Receive message on consumer 1 :expecting null"); msg = consumer1.receive(500); assertNull("There should be no more messages for consumption on consumer1.", msg); + consumer2.close(); + session2.close(); + + //Send message C, then connect consumer 3 to durable subscription and get + //message B if not using NO_ACK, then receive C with consumer 1 and 3 + producer.send(session1.createTextMessage("C")); + Session session3 = con.createSession(false, ackMode); MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "MySubscription"); - _logger.info("Receive message on consumer 3 :expecting B"); + if(ackMode == AMQSession.NO_ACKNOWLEDGE) + { + //Do nothing if NO_ACK was used, as prefetch means the message was dropped + //when we didn't call receive() to get it before closing consumer 2 + } + else + { + _logger.info("Receive message on consumer 3 :expecting B"); + msg = consumer3.receive(500); + assertNotNull("Consumer 3 should get message 'B'.", msg); + assertEquals("Incorrect Message received on consumer3.", "B", ((TextMessage) msg).getText()); + } + + _logger.info("Receive message on consumer 1 :expecting C"); + msg = consumer1.receive(500); + assertNotNull("Consumer 1 should get message 'C'.", msg); + assertEquals("Incorrect Message received on consumer1.", "C", ((TextMessage) msg).getText()); + _logger.info("Receive message on consumer 1 :expecting null"); + msg = consumer1.receive(500); + assertNull("There should be no more messages for consumption on consumer1.", msg); + + _logger.info("Receive message on consumer 3 :expecting C"); msg = consumer3.receive(500); - assertNotNull("Consumer 3 should get message 'B'.", msg); - assertEquals("Incorrect Message recevied on consumer4.", "B", ((TextMessage) msg).getText()); + assertNotNull("Consumer 3 should get message 'C'.", msg); + assertEquals("Incorrect Message received on consumer3.", "C", ((TextMessage) msg).getText()); _logger.info("Receive message on consumer 3 :expecting null"); msg = consumer3.receive(500); assertNull("There should be no more messages for consumption on consumer3.", msg); @@ -211,7 +245,7 @@ public class DurableSubscriptionTest extends QpidTestCase con1.start(); Session session1 = con1.createSession(false, ackMode); - MessageConsumer consumer1 = session0.createConsumer(topic); + MessageConsumer consumer1 = session1.createConsumer(topic); // Create consumer 2. AMQConnection con2 = (AMQConnection) getConnection("guest", "guest"); @@ -232,37 +266,60 @@ public class DurableSubscriptionTest extends QpidTestCase msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT); assertNotNull("Message should have been received",msg); assertEquals("Consumer 2 should also received the first msg.", "A", ((TextMessage) msg).getText()); - msg = consumer2.receive(500); + msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT); assertNull("There should be no more messages for consumption on consumer2.", msg); + // Send message and receive on consumer 1. + producer.send(session0.createTextMessage("B")); + + _logger.info("Receive message on consumer 1 :expecting B"); + msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT); + assertEquals("B", ((TextMessage) msg).getText()); + _logger.info("Receive message on consumer 1 :expecting null"); + msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT); + assertEquals(null, msg); + // Detach the durable subscriber. consumer2.close(); session2.close(); con2.close(); + + // Send message C and receive on consumer 1 + producer.send(session0.createTextMessage("C")); - // Send message and receive on open consumer. - producer.send(session0.createTextMessage("B")); - - _logger.info("Receive message on consumer 1 :expecting B"); - msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT); - assertEquals("B", ((TextMessage) msg).getText()); + _logger.info("Receive message on consumer 1 :expecting C"); + msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT); + assertEquals("C", ((TextMessage) msg).getText()); _logger.info("Receive message on consumer 1 :expecting null"); msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT); assertEquals(null, msg); - // Re-attach a new consumer to the durable subscription, and check that it gets the message that it missed. + // Re-attach a new consumer to the durable subscription, and check that it gets message B it left (if not NO_ACK) + // and also gets message C sent after it was disconnected. AMQConnection con3 = (AMQConnection) getConnection("guest", "guest"); con3.start(); Session session3 = con3.createSession(false, ackMode); TopicSubscriber consumer3 = session3.createDurableSubscriber(topic, "MySubscription"); - _logger.info("Receive message on consumer 3 :expecting B"); - msg = consumer3.receive(500); - assertNotNull("Consumer 3 should get message 'B'.", msg); - assertEquals("Incorrect Message recevied on consumer4.", "B", ((TextMessage) msg).getText()); + if(ackMode == AMQSession.NO_ACKNOWLEDGE) + { + //Do nothing if NO_ACK was used, as prefetch means the message was dropped + //when we didn't call receive() to get it before closing consumer 2 + } + else + { + _logger.info("Receive message on consumer 3 :expecting B"); + msg = consumer3.receive(POSITIVE_RECEIVE_TIMEOUT); + assertEquals("B", ((TextMessage) msg).getText()); + } + + _logger.info("Receive message on consumer 3 :expecting C"); + msg = consumer3.receive(POSITIVE_RECEIVE_TIMEOUT); + assertNotNull("Consumer 3 should get message 'C'.", msg); + assertEquals("Incorrect Message recevied on consumer3.", "C", ((TextMessage) msg).getText()); _logger.info("Receive message on consumer 3 :expecting null"); - msg = consumer3.receive(500); + msg = consumer3.receive(NEGATIVE_RECEIVE_TIMEOUT); assertNull("There should be no more messages for consumption on consumer3.", msg); consumer1.close(); |
