From 535e368ace120cc7d6d38afcfac289bb6c3f6750 Mon Sep 17 00:00:00 2001 From: Rupert Smith Date: Wed, 23 Jan 2008 10:21:10 +0000 Subject: Qpid-755, Added session per connection variant to test, to check that durable subscription can be picked up by a fresh connection. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@614481 13f79535-47bb-0310-9956-ffa450edef68 --- .../test/unit/topic/DurableSubscriptionTest.java | 101 ++++++++++++++++++++- 1 file changed, 100 insertions(+), 1 deletion(-) (limited to 'java/client/src') diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java index 62cfc778f2..817732ce7e 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java @@ -21,6 +21,14 @@ import org.apache.qpid.url.URLSyntaxException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * @todo Code to check that a consumer gets only one particular method could be factored into a re-usable method (as + * a static on a base test helper class, e.g. TestUtils. + * + * @todo Code to create test end-points using session per connection, or all sessions on one connection, to be factored + * out to make creating this test variation simpler. Want to make this variation available through LocalCircuit, + * driven by the test model. + */ public class DurableSubscriptionTest extends TestCase { private static final Logger _logger = LoggerFactory.getLogger(DurableSubscriptionTest.class); @@ -105,9 +113,18 @@ public class DurableSubscriptionTest extends TestCase durabilityImpl(Session.AUTO_ACKNOWLEDGE); } - private void durabilityImpl(int ackMode) throws AMQException, JMSException, URLSyntaxException + public void testDurabilityNOACKSessionPerConnection() throws AMQException, JMSException, URLSyntaxException + { + durabilityImplSessionPerConnection(AMQSession.NO_ACKNOWLEDGE); + } + + public void testDurabilityAUTOACKSessionPerConnection() throws AMQException, JMSException, URLSyntaxException { + durabilityImplSessionPerConnection(Session.AUTO_ACKNOWLEDGE); + } + private void durabilityImpl(int ackMode) throws AMQException, JMSException, URLSyntaxException + { AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "test"); AMQTopic topic = new AMQTopic(con, "MyTopic"); Session session1 = con.createSession(false, ackMode); @@ -166,6 +183,88 @@ public class DurableSubscriptionTest extends TestCase con.close(); } + private void durabilityImplSessionPerConnection(int ackMode) throws AMQException, JMSException, URLSyntaxException + { + Message msg; + + // Create producer. + AMQConnection con0 = new AMQConnection("vm://:1", "guest", "guest", "test", "test"); + con0.start(); + Session session0 = con0.createSession(false, ackMode); + + AMQTopic topic = new AMQTopic(con0, "MyTopic"); + + Session sessionProd = con0.createSession(false, ackMode); + MessageProducer producer = sessionProd.createProducer(topic); + + // Create consumer 1. + AMQConnection con1 = new AMQConnection("vm://:1", "guest", "guest", "test", "test"); + con1.start(); + Session session1 = con1.createSession(false, ackMode); + + MessageConsumer consumer1 = session0.createConsumer(topic); + + // Create consumer 2. + AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "test", "test"); + con2.start(); + Session session2 = con2.createSession(false, ackMode); + + TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription"); + + // Send message and check that both consumers get it and only it. + producer.send(session0.createTextMessage("A")); + + msg = consumer1.receive(500); + assertNotNull("Message should be available", msg); + assertEquals("Message Text doesn't match", "A", ((TextMessage)msg).getText()); + msg = consumer1.receive(500); + assertNull("There should be no more messages for consumption on consumer1.", msg); + + msg = consumer2.receive(); + assertNotNull(msg); + assertEquals("Consumer 2 should also received the first msg.", "A", ((TextMessage)msg).getText()); + msg = consumer2.receive(500); + assertNull("There should be no more messages for consumption on consumer2.", msg); + + // Detach the durable subscriber. + consumer2.close(); + session2.close(); + con2.close(); + + // Send message and receive on open consumer. + producer.send(session0.createTextMessage("B")); + + _logger.info("Receive message on consumer 1 :expecting B"); + msg = consumer1.receive(500); + assertNotNull("Consumer 1 should get message 'B'.", msg); + assertEquals("Incorrect Message recevied on consumer1.", "B", ((TextMessage)msg).getText()); + _logger.info("Receive message on consumer 1 :expecting null"); + msg = consumer1.receive(500); + assertNull("There should be no more messages for consumption on consumer1.", msg); + + // Re-attach a new consumer to the durable subscription, and check that it gets the message that it missed. + AMQConnection con3 = new AMQConnection("vm://:1", "guest", "guest", "test", "test"); + con3.start(); + Session session3 = con3.createSession(false, ackMode); + + TopicSubscriber consumer3 = session3.createDurableSubscriber(topic, "MySubscription"); + + _logger.info("Receive message on consumer 3 :expecting B"); + msg = consumer3.receive(500); + assertNotNull("Consumer 3 should get message 'B'.", msg); + assertEquals("Incorrect Message recevied on consumer4.", "B", ((TextMessage)msg).getText()); + _logger.info("Receive message on consumer 3 :expecting null"); + msg = consumer3.receive(500); + assertNull("There should be no more messages for consumption on consumer3.", msg); + + consumer1.close(); + consumer3.close(); + + con0.close(); + con1.close(); + con3.close(); + } + public static junit.framework.Test suite() { return new junit.framework.TestSuite(DurableSubscriptionTest.class); -- cgit v1.2.1