summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2010-03-30 11:50:18 +0000
committerRobert Gemmell <robbie@apache.org>2010-03-30 11:50:18 +0000
commita6cb3b13f6e4c5790cb4346c771a277c29f5b516 (patch)
tree1b379bcd5f5d3aee8630b29f9efba4a121cfc100
parentd09dd6cf7e4b37b2945ea97c823636febed0bb39 (diff)
downloadqpid-python-a6cb3b13f6e4c5790cb4346c771a277c29f5b516.tar.gz
QPID-2417 , QPID-2418 , QPID-2449 : expand topic testing, specifically around the change and unsubscription of durable subscriptions
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@929095 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java300
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java92
-rw-r--r--java/test-profiles/Excludes2
-rw-r--r--java/test-profiles/JavaExcludes3
4 files changed, 392 insertions, 5 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java
index cf815fbc05..8f76141bb1 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java
@@ -19,6 +19,10 @@ package org.apache.qpid.test.unit.ct;
import javax.jms.*;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.test.utils.QpidTestCase;
/**
@@ -163,5 +167,301 @@ public class DurableSubscriberTest extends QpidTestCase
durConnection2.close();
}
}
+
+ /**
+ * create and register a durable subscriber without a message selector and then unsubscribe it
+ * create and register a durable subscriber with a message selector and then close it
+ * restart the broker
+ * send matching and non matching messages
+ * recreate and register the durable subscriber with a message selector
+ * verify only the matching messages are received
+ */
+ public void testDurSubChangedToHaveSelectorThenRestart() throws Exception
+ {
+ if (! isBrokerStorePersistent())
+ {
+ _logger.warn("Test skipped due to requirement of a persistent store");
+ return;
+ }
+
+ final String SUB_NAME=getTestQueueName();
+
+ TopicConnectionFactory factory = getConnectionFactory();
+ Topic topic = (Topic) getInitialContext().lookup(_topicName);
+
+ //create and register a durable subscriber then unsubscribe it
+ TopicConnection durConnection = factory.createTopicConnection("guest", "guest");
+ TopicSession durSession = durConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicSubscriber durSub1 = durSession.createDurableSubscriber(topic, SUB_NAME);
+ durConnection.start();
+ durSub1.close();
+ durSession.unsubscribe(SUB_NAME);
+ durSession.close();
+ durConnection.close();
+
+ //create and register a durable subscriber with a message selector and then close it
+ TopicConnection durConnection2 = factory.createTopicConnection("guest", "guest");
+ TopicSession durSession2 = durConnection2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicSubscriber durSub2 = durSession2.createDurableSubscriber(topic, SUB_NAME, "testprop='true'", false);
+ durConnection2.start();
+ durSub2.close();
+ durSession2.close();
+ durConnection2.close();
+
+ //now restart the server
+ try
+ {
+ restartBroker();
+ }
+ catch (Exception e)
+ {
+ _logger.error("problems restarting broker: " + e);
+ throw e;
+ }
+
+ //send messages matching and not matching the selector
+ TopicConnection pubConnection = factory.createTopicConnection("guest", "guest");
+ TopicSession pubSession = pubConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicPublisher publisher = pubSession.createPublisher(topic);
+ for (int i = 0; i < 5; i++)
+ {
+ Message message = pubSession.createMessage();
+ message.setStringProperty("testprop", "true");
+ publisher.publish(message);
+ message = pubSession.createMessage();
+ message.setStringProperty("testprop", "false");
+ publisher.publish(message);
+ }
+ publisher.close();
+ pubSession.close();
+
+ //now recreate the durable subscriber with selector to check there are no exceptions generated
+ //and then verify the messages are received correctly
+ TopicConnection durConnection3 = (TopicConnection) factory.createConnection("guest", "guest");
+ TopicSession durSession3 = (TopicSession) durConnection3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicSubscriber durSub3 = durSession3.createDurableSubscriber(topic, SUB_NAME, "testprop='true'", false);
+ durConnection3.start();
+
+ for (int i = 0; i < 5; i++)
+ {
+ Message message = durSub3.receive(2000);
+ if (message == null)
+ {
+ fail("testDurSubChangedToHaveSelectorThenRestart test failed. Expected message " + i + " was not returned");
+ }
+ else
+ {
+ assertTrue("testDurSubChangedToHaveSelectorThenRestart test failed. Got message not matching selector",
+ message.getStringProperty("testprop").equals("true"));
+ }
+ }
+
+ durSub3.close();
+ durSession3.unsubscribe(SUB_NAME);
+ durSession3.close();
+ durConnection3.close();
+ }
+
+
+ /**
+ * create and register a durable subscriber with a message selector and then unsubscribe it
+ * create and register a durable subscriber without a message selector and then close it
+ * restart the broker
+ * send matching and non matching messages
+ * recreate and register the durable subscriber without a message selector
+ * verify ALL the sent messages are received
+ */
+ public void testDurSubChangedToNotHaveSelectorThenRestart() throws Exception
+ {
+ if (! isBrokerStorePersistent())
+ {
+ _logger.warn("Test skipped due to requirement of a persistent store");
+ return;
+ }
+
+ final String SUB_NAME=getTestQueueName();
+
+ TopicConnectionFactory factory = getConnectionFactory();
+ Topic topic = (Topic) getInitialContext().lookup(_topicName);
+
+ //create and register a durable subscriber with selector then unsubscribe it
+ TopicConnection durConnection = factory.createTopicConnection("guest", "guest");
+ TopicSession durSession = durConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicSubscriber durSub1 = durSession.createDurableSubscriber(topic, SUB_NAME, "testprop='true'", false);
+ durConnection.start();
+ durSub1.close();
+ durSession.unsubscribe(SUB_NAME);
+ durSession.close();
+ durConnection.close();
+
+ //create and register a durable subscriber without the message selector and then close it
+ TopicConnection durConnection2 = factory.createTopicConnection("guest", "guest");
+ TopicSession durSession2 = durConnection2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicSubscriber durSub2 = durSession2.createDurableSubscriber(topic, SUB_NAME);
+ durConnection2.start();
+ durSub2.close();
+ durSession2.close();
+ durConnection2.close();
+
+ //now restart the server
+ try
+ {
+ restartBroker();
+ }
+ catch (Exception e)
+ {
+ _logger.error("problems restarting broker: " + e);
+ throw e;
+ }
+
+ //send messages matching and not matching the original used selector
+ TopicConnection pubConnection = factory.createTopicConnection("guest", "guest");
+ TopicSession pubSession = pubConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicPublisher publisher = pubSession.createPublisher(topic);
+ for (int i = 1; i <= 5; i++)
+ {
+ Message message = pubSession.createMessage();
+ message.setStringProperty("testprop", "true");
+ publisher.publish(message);
+ message = pubSession.createMessage();
+ message.setStringProperty("testprop", "false");
+ publisher.publish(message);
+ }
+ publisher.close();
+ pubSession.close();
+
+ //now recreate the durable subscriber without selector to check there are no exceptions generated
+ //then verify ALL messages sent are received
+ TopicConnection durConnection3 = (TopicConnection) factory.createConnection("guest", "guest");
+ TopicSession durSession3 = (TopicSession) durConnection3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicSubscriber durSub3 = durSession3.createDurableSubscriber(topic, SUB_NAME);
+ durConnection3.start();
+
+ for (int i = 1; i <= 10; i++)
+ {
+ Message message = durSub3.receive(2000);
+ if (message == null)
+ {
+ fail("testDurSubChangedToNotHaveSelectorThenRestart test failed. Expected message " + i + " was not received");
+ }
+ }
+
+ durSub3.close();
+ durSession3.unsubscribe(SUB_NAME);
+ durSession3.close();
+ durConnection3.close();
+ }
+
+
+ public void testResubscribeWithChangedSelectorAndRestart() throws Exception
+ {
+ if (! isBrokerStorePersistent())
+ {
+ _logger.warn("Test skipped due to requirement of a persistent store");
+ return;
+ }
+
+ Connection conn = getConnection();
+ conn.start();
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ AMQTopic topic = new AMQTopic((AMQConnection) conn, "testResubscribeWithChangedSelectorAndRestart");
+ MessageProducer producer = session.createProducer(topic);
+
+ // Create durable subscriber that matches A
+ TopicSubscriber subA = session.createDurableSubscriber(topic,
+ "testResubscribeWithChangedSelector",
+ "Match = True", false);
+
+ // Send 1 matching message and 1 non-matching message
+ TextMessage msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
+ msg.setBooleanProperty("Match", true);
+ producer.send(msg);
+ msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
+ msg.setBooleanProperty("Match", false);
+ producer.send(msg);
+
+ Message rMsg = subA.receive(1000);
+ assertNotNull(rMsg);
+ assertEquals("Content was wrong",
+ "testResubscribeWithChangedSelectorAndRestart1",
+ ((TextMessage) rMsg).getText());
+
+ rMsg = subA.receive(1000);
+ assertNull(rMsg);
+
+ // Send another 1 matching message and 1 non-matching message
+ msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
+ msg.setBooleanProperty("Match", true);
+ producer.send(msg);
+ msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
+ msg.setBooleanProperty("Match", false);
+ producer.send(msg);
+
+ // Disconnect subscriber without receiving the message to
+ //leave it on the underlying queue
+ subA.close();
+
+ // Reconnect with new selector that matches B
+ TopicSubscriber subB = session.createDurableSubscriber(topic,
+ "testResubscribeWithChangedSelectorAndRestart","Match = False", false);
+
+ //verify no messages are now present on the queue as changing selector should have issued
+ //an unsubscribe and thus deleted the previous durable backing queue for the subscription.
+ //check the dur sub's underlying queue now has msg count 1
+ AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelector");
+ assertEquals("Msg count should be 0", 0, ((AMQSession)session).getQueueDepth(subQueue));
+
+
+ // Check that new messages are received properly
+ msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
+ msg.setBooleanProperty("Match", true);
+ producer.send(msg);
+ msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
+ msg.setBooleanProperty("Match", false);
+ producer.send(msg);
+
+ rMsg = subB.receive(1000);
+ assertNotNull(rMsg);
+ assertEquals("Content was wrong",
+ "testResubscribeWithChangedSelectorAndRestart2",
+ ((TextMessage) rMsg).getText());
+
+ rMsg = subB.receive(1000);
+ assertNull(rMsg);
+
+ //now restart the server
+ try
+ {
+ restartBroker();
+ }
+ catch (Exception e)
+ {
+ _logger.error("problems restarting broker: " + e);
+ throw e;
+ }
+
+ // Check that new messages are still received properly
+ msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
+ msg.setBooleanProperty("Match", true);
+ producer.send(msg);
+ msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
+ msg.setBooleanProperty("Match", false);
+ producer.send(msg);
+
+ rMsg = subB.receive(1000);
+ assertNotNull(rMsg);
+ assertEquals("Content was wrong",
+ "testResubscribeWithChangedSelectorAndRestart2",
+ ((TextMessage) rMsg).getText());
+
+ rMsg = subB.receive(1000);
+ assertNull(rMsg);
+
+ session.unsubscribe("testResubscribeWithChangedSelectorAndRestart");
+ subB.close();
+ session.close();
+ conn.close();
+ }
+
}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
index cafd212dd3..119949b0d6 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
@@ -20,8 +20,13 @@
*/
package org.apache.qpid.test.unit.topic;
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.qpid.management.common.JMXConnnectionFactory;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
@@ -39,6 +44,9 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
/**
* @todo Code to check that a consumer gets only one particular method could be factored into a re-usable method (as
@@ -58,6 +66,36 @@ public class DurableSubscriptionTest extends QpidTestCase
/** Timeout for receive() if we are not expecting a message */
private static final long NEGATIVE_RECEIVE_TIMEOUT = 1000;
+ private JMXConnector _jmxc;
+ private MBeanServerConnection _mbsc;
+ private static final String USER = "admin";
+ private static final String PASSWORD = "admin";
+ private boolean _jmxConnected;
+
+ public void setUp() throws Exception
+ {
+ setConfigurationProperty("management.enabled", "true");
+ _jmxConnected=false;
+ super.setUp();
+ }
+
+ public void tearDown() throws Exception
+ {
+ if(_jmxConnected)
+ {
+ try
+ {
+ _jmxc.close();
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ super.tearDown();
+ }
+
public void testUnsubscribe() throws Exception
{
AMQConnection con = (AMQConnection) getConnection("guest", "guest");
@@ -79,6 +117,12 @@ public class DurableSubscriptionTest extends QpidTestCase
_logger.info("Producer sending message A");
producer.send(session1.createTextMessage("A"));
+
+ ((AMQSession)session1).sync();
+
+ //check the dur sub's underlying queue now has msg count 1
+ AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "MySubscription");
+ assertEquals("Msg count should be 1", 1, ((AMQSession)session1).getQueueDepth(subQueue));
Message msg;
_logger.info("Receive message on consumer 1:expecting A");
@@ -96,11 +140,46 @@ public class DurableSubscriptionTest extends QpidTestCase
msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
_logger.info("Receive message on consumer 1 :expecting null");
assertEquals(null, msg);
+
+ ((AMQSession)session2).sync();
+
+ //check the dur sub's underlying queue now has msg count 0
+ assertEquals("Msg count should be 0", 0, ((AMQSession)session2).getQueueDepth(subQueue));
consumer2.close();
_logger.info("Unsubscribe session2/consumer2");
session2.unsubscribe("MySubscription");
-
+
+ ((AMQSession)session2).sync();
+
+ if(isJavaBroker() && isExternalBroker())
+ {
+ //Verify that the queue was deleted by querying for its JMX MBean
+ _jmxc = JMXConnnectionFactory.getJMXConnection(5000, "127.0.0.1",
+ getManagementPort(getPort()), USER, PASSWORD);
+
+ _jmxConnected = true;
+ _mbsc = _jmxc.getMBeanServerConnection();
+
+ //must replace the occurrence of ':' in queue name with '-'
+ String queueObjectNameText = "clientid" + "-" + "MySubscription";
+
+ ObjectName objName = new ObjectName("org.apache.qpid:type=VirtualHost.Queue,name="
+ + queueObjectNameText + ",*");
+
+ Set<ObjectName> objectInstances = _mbsc.queryNames(objName, null);
+
+ if(objectInstances.size() != 0)
+ {
+ fail("Queue MBean was found. Expected queue to have been deleted");
+ }
+ else
+ {
+ _logger.info("Underlying dueue for the durable subscription was confirmed deleted.");
+ }
+ }
+
+ //verify unsubscribing the durable subscriber did not affect the non-durable one
_logger.info("Producer sending message B");
producer.send(session1.createTextMessage("B"));
@@ -459,6 +538,9 @@ public class DurableSubscriptionTest extends QpidTestCase
rMsg = subA.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertNull(rMsg);
+ // Send another 1 matching message and 1 non-matching message
+ sendMatchingAndNonMatchingMessage(session, producer);
+
// Disconnect subscriber
subA.close();
@@ -466,9 +548,15 @@ public class DurableSubscriptionTest extends QpidTestCase
TopicSubscriber subB = session.createDurableSubscriber(topic,
"testResubscribeWithChangedSelector","Match = False", false);
+ //verify no messages are now present as changing selector should have issued
+ //an unsubscribe and thus deleted the previous backing queue for the subscription.
+ rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT);
+ assertNull("Should not have received message as the queue underlying the " +
+ "subscription should have been cleared/deleted when the selector was changed", rMsg);
- // Check messages are recieved properly
+ // Check that new messages are received properly
sendMatchingAndNonMatchingMessage(session, producer);
+
rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertNotNull(rMsg);
assertEquals("Content was wrong",
diff --git a/java/test-profiles/Excludes b/java/test-profiles/Excludes
index ccb907e597..05f68cbe9d 100644
--- a/java/test-profiles/Excludes
+++ b/java/test-profiles/Excludes
@@ -29,3 +29,5 @@ org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverTest#testClientAck
org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#*
org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverTest#*
+// QPID-2418 : The queue backing the dur sub is not currently deleted at subscription change, so the test will fail.
+org.apache.qpid.test.unit.ct.DurableSubscriberTest#testResubscribeWithChangedSelectorAndRestart
diff --git a/java/test-profiles/JavaExcludes b/java/test-profiles/JavaExcludes
index 184a6f9e78..6b9f22fd93 100644
--- a/java/test-profiles/JavaExcludes
+++ b/java/test-profiles/JavaExcludes
@@ -16,6 +16,3 @@ org.apache.qpid.client.SessionCreateTest#*
// QPID-2097 exclude it from the InVM test runs until InVM JMX Interface is reliable
org.apache.qpid.management.jmx.ManagementActorLoggingTest#*
org.apache.qpid.server.queue.ModelTest#*
-
-//QPID-2422: Derby currently doesnt persist queue arguments and 0-91 support causes exclusivity mismatch after restart
-org.apache.qpid.test.unit.ct.DurableSubscriberTest#*