summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorRupert Smith <rupertlssmith@apache.org>2008-01-23 10:21:10 +0000
committerRupert Smith <rupertlssmith@apache.org>2008-01-23 10:21:10 +0000
commit535e368ace120cc7d6d38afcfac289bb6c3f6750 (patch)
tree083f272b14eac73ba1bf4a86522d12c8d0d6b69f /java/client/src
parent6128589721fb80224648cc391a4b1dd5f6f76d23 (diff)
downloadqpid-python-535e368ace120cc7d6d38afcfac289bb6c3f6750.tar.gz
Qpid-755, Added session per connection variant to test, to check that durable subscription can be picked up by a fresh connection.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@614481 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java101
1 files changed, 100 insertions, 1 deletions
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
index 62cfc778f2..817732ce7e 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
@@ -21,6 +21,14 @@ import org.apache.qpid.url.URLSyntaxException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * @todo Code to check that a consumer gets only one particular method could be factored into a re-usable method (as
+ * a static on a base test helper class, e.g. TestUtils.
+ *
+ * @todo Code to create test end-points using session per connection, or all sessions on one connection, to be factored
+ * out to make creating this test variation simpler. Want to make this variation available through LocalCircuit,
+ * driven by the test model.
+ */
public class DurableSubscriptionTest extends TestCase
{
private static final Logger _logger = LoggerFactory.getLogger(DurableSubscriptionTest.class);
@@ -105,9 +113,18 @@ public class DurableSubscriptionTest extends TestCase
durabilityImpl(Session.AUTO_ACKNOWLEDGE);
}
- private void durabilityImpl(int ackMode) throws AMQException, JMSException, URLSyntaxException
+ public void testDurabilityNOACKSessionPerConnection() throws AMQException, JMSException, URLSyntaxException
+ {
+ durabilityImplSessionPerConnection(AMQSession.NO_ACKNOWLEDGE);
+ }
+
+ public void testDurabilityAUTOACKSessionPerConnection() throws AMQException, JMSException, URLSyntaxException
{
+ durabilityImplSessionPerConnection(Session.AUTO_ACKNOWLEDGE);
+ }
+ private void durabilityImpl(int ackMode) throws AMQException, JMSException, URLSyntaxException
+ {
AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "test");
AMQTopic topic = new AMQTopic(con, "MyTopic");
Session session1 = con.createSession(false, ackMode);
@@ -166,6 +183,88 @@ public class DurableSubscriptionTest extends TestCase
con.close();
}
+ private void durabilityImplSessionPerConnection(int ackMode) throws AMQException, JMSException, URLSyntaxException
+ {
+ Message msg;
+
+ // Create producer.
+ AMQConnection con0 = new AMQConnection("vm://:1", "guest", "guest", "test", "test");
+ con0.start();
+ Session session0 = con0.createSession(false, ackMode);
+
+ AMQTopic topic = new AMQTopic(con0, "MyTopic");
+
+ Session sessionProd = con0.createSession(false, ackMode);
+ MessageProducer producer = sessionProd.createProducer(topic);
+
+ // Create consumer 1.
+ AMQConnection con1 = new AMQConnection("vm://:1", "guest", "guest", "test", "test");
+ con1.start();
+ Session session1 = con1.createSession(false, ackMode);
+
+ MessageConsumer consumer1 = session0.createConsumer(topic);
+
+ // Create consumer 2.
+ AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "test", "test");
+ con2.start();
+ Session session2 = con2.createSession(false, ackMode);
+
+ TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription");
+
+ // Send message and check that both consumers get it and only it.
+ producer.send(session0.createTextMessage("A"));
+
+ msg = consumer1.receive(500);
+ assertNotNull("Message should be available", msg);
+ assertEquals("Message Text doesn't match", "A", ((TextMessage)msg).getText());
+ msg = consumer1.receive(500);
+ assertNull("There should be no more messages for consumption on consumer1.", msg);
+
+ msg = consumer2.receive();
+ assertNotNull(msg);
+ assertEquals("Consumer 2 should also received the first msg.", "A", ((TextMessage)msg).getText());
+ msg = consumer2.receive(500);
+ assertNull("There should be no more messages for consumption on consumer2.", msg);
+
+ // Detach the durable subscriber.
+ consumer2.close();
+ session2.close();
+ con2.close();
+
+ // Send message and receive on open consumer.
+ producer.send(session0.createTextMessage("B"));
+
+ _logger.info("Receive message on consumer 1 :expecting B");
+ msg = consumer1.receive(500);
+ assertNotNull("Consumer 1 should get message 'B'.", msg);
+ assertEquals("Incorrect Message recevied on consumer1.", "B", ((TextMessage)msg).getText());
+ _logger.info("Receive message on consumer 1 :expecting null");
+ msg = consumer1.receive(500);
+ assertNull("There should be no more messages for consumption on consumer1.", msg);
+
+ // Re-attach a new consumer to the durable subscription, and check that it gets the message that it missed.
+ AMQConnection con3 = new AMQConnection("vm://:1", "guest", "guest", "test", "test");
+ con3.start();
+ Session session3 = con3.createSession(false, ackMode);
+
+ TopicSubscriber consumer3 = session3.createDurableSubscriber(topic, "MySubscription");
+
+ _logger.info("Receive message on consumer 3 :expecting B");
+ msg = consumer3.receive(500);
+ assertNotNull("Consumer 3 should get message 'B'.", msg);
+ assertEquals("Incorrect Message recevied on consumer4.", "B", ((TextMessage)msg).getText());
+ _logger.info("Receive message on consumer 3 :expecting null");
+ msg = consumer3.receive(500);
+ assertNull("There should be no more messages for consumption on consumer3.", msg);
+
+ consumer1.close();
+ consumer3.close();
+
+ con0.close();
+ con1.close();
+ con3.close();
+ }
+
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(DurableSubscriptionTest.class);