summaryrefslogtreecommitdiff
path: root/java/systests
diff options
context:
space:
mode:
authorAndrew Donald Kennedy <grkvlt@apache.org>2010-07-22 13:09:56 +0000
committerAndrew Donald Kennedy <grkvlt@apache.org>2010-07-22 13:09:56 +0000
commit8dbbb9ad305ca936fb924420ec31e27e9a9d0caf (patch)
tree949ab5a030574ea199ee9821367eedb0c84131b6 /java/systests
parentcfb3a1ef5b743d68a2a78754ef0fdc750378d3cc (diff)
downloadqpid-python-8dbbb9ad305ca936fb924420ec31e27e9a9d0caf.tar.gz
QPID-2682: Move slow consumer disconnection mechanism to the broker
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@966637 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/systests')
-rw-r--r--java/systests/src/main/java/org/apache/qpid/systest/GlobalQueuesTest.java222
-rw-r--r--java/systests/src/main/java/org/apache/qpid/systest/GlobalTopicsTest.java36
-rw-r--r--java/systests/src/main/java/org/apache/qpid/systest/MergeConfigurationTest.java124
-rw-r--r--java/systests/src/main/java/org/apache/qpid/systest/SubscriptionTest.java146
-rw-r--r--java/systests/src/main/java/org/apache/qpid/systest/TestingBaseCase.java255
-rw-r--r--java/systests/src/main/java/org/apache/qpid/systest/TopicTest.java85
6 files changed, 868 insertions, 0 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/systest/GlobalQueuesTest.java b/java/systests/src/main/java/org/apache/qpid/systest/GlobalQueuesTest.java
new file mode 100644
index 0000000000..e0934faf44
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/systest/GlobalQueuesTest.java
@@ -0,0 +1,222 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systest;
+
+import org.apache.commons.configuration.ConfigurationException;
+
+import javax.jms.Session;
+import javax.naming.NamingException;
+import java.io.IOException;
+
+/**
+ * QPID-1447 : Add slow consumer detection and disconnection.
+ *
+ * Slow consumers should on a topic should expect to receive a
+ * 506 : Resource Error if the hit a predefined threshold.
+ */
+public class GlobalQueuesTest extends TestingBaseCase
+{
+
+ protected String CONFIG_SECTION = ".queues";
+
+ /**
+ * Queue Configuration
+
+ <slow-consumer-detection>
+ <!-- The depth before which the policy will be applied-->
+ <depth>4235264</depth>
+
+ <!-- The message age before which the policy will be applied-->
+ <messageAge>600000</messageAge>
+
+ <!-- The number of message before which the policy will be applied-->
+ <messageCount>50</messageCount>
+
+ <!-- Policies configuration -->
+ <policy>
+ <name>TopicDelete</name>
+ <topicDelete>
+ <delete-persistent/>
+ </topicDelete>
+ </policy>
+ </slow-consumer-detection>
+
+ */
+
+ /**
+ * VirtualHost Plugin Configuration
+
+ <slow-consumer-detection>
+ <delay>1</delay>
+ <timeunit>MINUTES</timeunit>
+ </slow-consumer-detection>
+
+ */
+
+ public void setConfig(String property, String value, boolean deleteDurable) throws NamingException, IOException, ConfigurationException
+ {
+ setProperty(CONFIG_SECTION + ".slow-consumer-detection." +
+ "policy.name", "TopicDelete");
+
+ setProperty(CONFIG_SECTION + ".slow-consumer-detection." +
+ property, value);
+
+ if (deleteDurable)
+ {
+ setProperty(CONFIG_SECTION + ".slow-consumer-detection." +
+ "policy.topicdelete.delete-persistent", "");
+ }
+ }
+
+ /**
+ * Test that setting messageCount takes affect on topics
+ *
+ * We send 10 messages and disconnect at 9
+ *
+ * @throws Exception
+ */
+ public void testTopicConsumerMessageCount() throws Exception
+ {
+ MAX_QUEUE_MESSAGE_COUNT = 10;
+
+ setConfig("messageCount", String.valueOf(MAX_QUEUE_MESSAGE_COUNT - 1), false);
+
+ //Start the broker
+ startBroker();
+
+ topicConsumer(Session.AUTO_ACKNOWLEDGE, false);
+ }
+
+ /**
+ * Test that setting depth has an effect on topics
+ *
+ * Sets the message size for the test
+ * Sets the depth to be 9 * the depth
+ * Ensure that sending 10 messages causes the disconnection
+ *
+ * @throws Exception
+ */
+ public void testTopicConsumerMessageSize() throws Exception
+ {
+ MAX_QUEUE_MESSAGE_COUNT = 10;
+
+ setConfig("depth", String.valueOf(MESSAGE_SIZE * 9), false);
+
+ //Start the broker
+ startBroker();
+
+ setMessageSize(MESSAGE_SIZE);
+
+ topicConsumer(Session.AUTO_ACKNOWLEDGE, false);
+ }
+
+ /**
+ * Test that setting messageAge has an effect on topics
+ *
+ * Sets the messageAge to be half the disconnection wait timeout
+ * Send 10 messages and then ensure that we get disconnected as we will
+ * wait for the full timeout.
+ *
+ * @throws Exception
+ */
+ public void testTopicConsumerMessageAge() throws Exception
+ {
+ MAX_QUEUE_MESSAGE_COUNT = 10;
+
+ setConfig("messageAge", String.valueOf(DISCONNECTION_WAIT / 2), false);
+
+ //Start the broker
+ startBroker();
+
+ topicConsumer(Session.AUTO_ACKNOWLEDGE, false);
+ }
+
+ /**
+ * Test that setting messageCount takes affect on a durable Consumer
+ *
+ * Ensure we set the delete-persistent option
+ *
+ * We send 10 messages and disconnect at 9
+ *
+ * @throws Exception
+ */
+
+ public void testTopicDurableConsumerMessageCount() throws Exception
+ {
+ MAX_QUEUE_MESSAGE_COUNT = 10;
+
+ setConfig("messageCount", String.valueOf(MAX_QUEUE_MESSAGE_COUNT - 1), true);
+
+ //Start the broker
+ startBroker();
+
+ topicConsumer(Session.AUTO_ACKNOWLEDGE, true);
+ }
+
+ /**
+ * Test that setting depth has an effect on durable consumer topics
+ *
+ * Ensure we set the delete-persistent option
+ *
+ * Sets the message size for the test
+ * Sets the depth to be 9 * the depth
+ * Ensure that sending 10 messages causes the disconnection
+ *
+ * @throws Exception
+ */
+ public void testTopicDurableConsumerMessageSize() throws Exception
+ {
+ MAX_QUEUE_MESSAGE_COUNT = 10;
+
+ setConfig("depth", String.valueOf(MESSAGE_SIZE * 9), true);
+
+ //Start the broker
+ startBroker();
+
+ setMessageSize(MESSAGE_SIZE);
+
+ topicConsumer(Session.AUTO_ACKNOWLEDGE, true);
+ }
+
+ /**
+ * Test that setting messageAge has an effect on topics
+ *
+ * Ensure we set the delete-persistent option
+ *
+ * Sets the messageAge to be 1/5 the disconnection wait timeout (or 1sec)
+ * Send 10 messages and then ensure that we get disconnected as we will
+ * wait for the full timeout.
+ *
+ * @throws Exception
+ */
+ public void testTopicDurableConsumerMessageAge() throws Exception
+ {
+ MAX_QUEUE_MESSAGE_COUNT = 10;
+
+ setConfig("messageAge", String.valueOf(DISCONNECTION_WAIT / 5), true);
+
+ //Start the broker
+ startBroker();
+
+ topicConsumer(Session.AUTO_ACKNOWLEDGE, true);
+ }
+
+}
diff --git a/java/systests/src/main/java/org/apache/qpid/systest/GlobalTopicsTest.java b/java/systests/src/main/java/org/apache/qpid/systest/GlobalTopicsTest.java
new file mode 100644
index 0000000000..aff5d1b1b8
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/systest/GlobalTopicsTest.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systest;
+
+import org.apache.commons.configuration.ConfigurationException;
+
+import javax.naming.NamingException;
+import java.io.IOException;
+
+public class GlobalTopicsTest extends GlobalQueuesTest
+{
+ @Override
+ public void setUp() throws Exception
+ {
+ CONFIG_SECTION = ".topics";
+ super.setUp();
+ }
+}
diff --git a/java/systests/src/main/java/org/apache/qpid/systest/MergeConfigurationTest.java b/java/systests/src/main/java/org/apache/qpid/systest/MergeConfigurationTest.java
new file mode 100644
index 0000000000..e4efac60f8
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/systest/MergeConfigurationTest.java
@@ -0,0 +1,124 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systest;
+
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.qpid.AMQChannelClosedException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQSession_0_10;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.naming.NamingException;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class MergeConfigurationTest extends TestingBaseCase
+{
+
+ protected int topicCount = 0;
+
+
+ public void configureTopic(String topic, int msgCount) throws NamingException, IOException, ConfigurationException
+ {
+
+ setProperty(".topics.topic("+topicCount+").name", topic);
+ setProperty(".topics.topic("+topicCount+").slow-consumer-detection.messageCount", String.valueOf(msgCount));
+ setProperty(".topics.topic("+topicCount+").slow-consumer-detection.policy.name", "TopicDelete");
+ topicCount++;
+ }
+
+
+ /**
+ * Test that setting messageCount takes affect on topics
+ *
+ * We send 10 messages and disconnect at 9
+ *
+ * @throws Exception
+ */
+ public void testTopicConsumerMessageCount() throws Exception
+ {
+ MAX_QUEUE_MESSAGE_COUNT = 10;
+
+ configureTopic(getName(), (MAX_QUEUE_MESSAGE_COUNT * 4) - 1);
+
+ //Configure topic as a subscription
+ setProperty(".topics.topic("+topicCount+").subscriptionName", "clientid:"+getTestQueueName());
+ configureTopic(getName(), (MAX_QUEUE_MESSAGE_COUNT - 1));
+
+
+
+ //Start the broker
+ startBroker();
+
+ topicConsumer(Session.AUTO_ACKNOWLEDGE, true);
+ }
+
+
+//
+// public void testMerge() throws ConfigurationException, AMQException
+// {
+//
+// AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString(getName()+":stockSubscription"), false, new AMQShortString("testowner"),
+// false, false, _virtualHost, null);
+//
+// _virtualHost.getQueueRegistry().registerQueue(queue);
+// Exchange defaultExchange = _virtualHost.getExchangeRegistry().getDefaultExchange();
+// _virtualHost.getBindingFactory().addBinding(getName(), queue, defaultExchange, null);
+//
+//
+// Exchange topicExchange = _virtualHost.getExchangeRegistry().getExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME);
+// _virtualHost.getBindingFactory().addBinding("stocks.nyse.orcl", queue, topicExchange, null);
+//
+// TopicConfig config = queue.getConfiguration().getConfiguration(TopicConfig.class.getName());
+//
+// assertNotNull("Queue should have topic configuration bound to it.", config);
+// assertEquals("Configuration name not correct", getName() + ":stockSubscription", config.getSubscriptionName());
+//
+// ConfigurationPlugin scdConfig = queue.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName());
+// if (scdConfig instanceof org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionQueueConfiguration)
+// {
+// System.err.println("********************** scd is a SlowConsumerDetectionQueueConfiguration.");
+// }
+// else
+// {
+// System.err.println("********************** Test SCD "+SlowConsumerDetectionQueueConfiguration.class.getClassLoader());
+// System.err.println("********************** Broker SCD "+scdConfig.getClass().getClassLoader());
+// System.err.println("********************** Broker SCD "+scdConfig.getClass().isAssignableFrom(SlowConsumerDetectionQueueConfiguration.class));
+// System.err.println("********************** is a "+scdConfig.getClass());
+// }
+//
+// assertNotNull("Queue should have scd configuration bound to it.", scdConfig);
+// assertEquals("MessageCount is not correct", 10 , ((SlowConsumerDetectionQueueConfiguration)scdConfig).getMessageCount());
+// assertEquals("Policy is not correct", TopicDeletePolicy.class.getName() , ((SlowConsumerDetectionQueueConfiguration)scdConfig).getPolicy().getClass().getName());
+// }
+
+}
diff --git a/java/systests/src/main/java/org/apache/qpid/systest/SubscriptionTest.java b/java/systests/src/main/java/org/apache/qpid/systest/SubscriptionTest.java
new file mode 100644
index 0000000000..9e9375fd44
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/systest/SubscriptionTest.java
@@ -0,0 +1,146 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systest;
+
+import org.apache.commons.configuration.ConfigurationException;
+
+import javax.jms.Session;
+import javax.naming.NamingException;
+import java.io.IOException;
+
+/**
+ * Test SCD when configured with Subscription details.
+ *
+ * We run the subscription based tests here to validate that the
+ * subscriptionname value is correctly associated with the subscription.
+ *
+ *
+ */
+public class SubscriptionTest extends TestingBaseCase
+{
+ private int _count=0;
+ protected String CONFIG_SECTION = ".topics.topic";
+
+ /**
+ * Add configuration for the queue that relates just to this test.
+ * We use the getTestQueueName() as our subscription. To ensure the
+ * config sections do not overlap we identify each section with a _count
+ * value.
+ *
+ * This would allow each test to configure more than one section.
+ *
+ * @param property to set
+ * @param value the value to set
+ * @param deleteDurable should deleteDurable be set.
+ * @throws NamingException
+ * @throws IOException
+ * @throws ConfigurationException
+ */
+ public void setConfig(String property, String value, boolean deleteDurable) throws NamingException, IOException, ConfigurationException
+ {
+ setProperty(CONFIG_SECTION + "("+_count+").subscriptionName", "clientid:"+getTestQueueName());
+
+ setProperty(CONFIG_SECTION + "("+_count+").slow-consumer-detection." +
+ "policy.name", "TopicDelete");
+
+ setProperty(CONFIG_SECTION + "("+_count+").slow-consumer-detection." +
+ property, value);
+
+ if (deleteDurable)
+ {
+ setProperty(CONFIG_SECTION + "("+_count+").slow-consumer-detection." +
+ "policy.topicdelete.delete-persistent", "");
+ }
+ _count++;
+ }
+
+
+ /**
+ * Test that setting messageCount takes affect on a durable Consumer
+ *
+ * Ensure we set the delete-persistent option
+ *
+ * We send 10 messages and disconnect at 9
+ *
+ * @throws Exception
+ */
+
+ public void testTopicDurableConsumerMessageCount() throws Exception
+ {
+ MAX_QUEUE_MESSAGE_COUNT = 10;
+
+ setConfig("messageCount", String.valueOf(MAX_QUEUE_MESSAGE_COUNT - 1), true);
+
+ //Start the broker
+ startBroker();
+
+ topicConsumer(Session.AUTO_ACKNOWLEDGE, true);
+ }
+
+ /**
+ * Test that setting depth has an effect on durable consumer topics
+ *
+ * Ensure we set the delete-persistent option
+ *
+ * Sets the message size for the test
+ * Sets the depth to be 9 * the depth
+ * Ensure that sending 10 messages causes the disconnection
+ *
+ * @throws Exception
+ */
+ public void testTopicDurableConsumerMessageSize() throws Exception
+ {
+ MAX_QUEUE_MESSAGE_COUNT = 10;
+
+ setConfig("depth", String.valueOf(MESSAGE_SIZE * 9), true);
+
+ //Start the broker
+ startBroker();
+
+ setMessageSize(MESSAGE_SIZE);
+
+ topicConsumer(Session.AUTO_ACKNOWLEDGE, true);
+ }
+
+ /**
+ * Test that setting messageAge has an effect on topics
+ *
+ * Ensure we set the delete-persistent option
+ *
+ * Sets the messageAge to be 1/5 the disconnection wait timeout (or 1sec)
+ * Send 10 messages and then ensure that we get disconnected as we will
+ * wait for the full timeout.
+ *
+ * @throws Exception
+ */
+ public void testTopicDurableConsumerMessageAge() throws Exception
+ {
+ MAX_QUEUE_MESSAGE_COUNT = 10;
+
+ setConfig("messageAge", String.valueOf(DISCONNECTION_WAIT / 5), true);
+
+ //Start the broker
+ startBroker();
+
+ topicConsumer(Session.AUTO_ACKNOWLEDGE, true);
+ }
+
+}
diff --git a/java/systests/src/main/java/org/apache/qpid/systest/TestingBaseCase.java b/java/systests/src/main/java/org/apache/qpid/systest/TestingBaseCase.java
new file mode 100644
index 0000000000..9831c74574
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/systest/TestingBaseCase.java
@@ -0,0 +1,255 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systest;
+
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.qpid.AMQChannelClosedException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQSession_0_10;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.naming.NamingException;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class TestingBaseCase extends QpidBrokerTestCase implements ExceptionListener, ConnectionListener
+{
+
+ Topic _destination;
+ protected CountDownLatch _disconnectionLatch = new CountDownLatch(1);
+ protected int MAX_QUEUE_MESSAGE_COUNT;
+ protected int MESSAGE_SIZE = DEFAULT_MESSAGE_SIZE;
+
+ private Thread _publisher;
+ protected static final long DISCONNECTION_WAIT = 5;
+ protected Exception _publisherError = null;
+ protected JMSException _connectionException = null;
+ private static final long JOIN_WAIT = 5000;
+
+ @Override
+ public void setUp() throws Exception
+ {
+
+ setConfigurationProperty("virtualhosts.virtualhost."
+ + getConnectionURL().getVirtualHost().substring(1) +
+ ".slow-consumer-detection.delay", "1");
+
+ setConfigurationProperty("virtualhosts.virtualhost."
+ + getConnectionURL().getVirtualHost().substring(1) +
+ ".slow-consumer-detection.timeunit", "SECONDS");
+
+ }
+
+
+ protected void setProperty(String property, String value) throws NamingException, IOException, ConfigurationException
+ {
+ setConfigurationProperty("virtualhosts.virtualhost." +
+ getConnectionURL().getVirtualHost().substring(1) +
+ property, value);
+ }
+
+
+ /**
+ * Create and start an asynchrounous publisher that will send MAX_QUEUE_MESSAGE_COUNT
+ * messages to the provided destination. Messages are sent in a new connection
+ * on a transaction. Any error is captured and the test is signalled to exit.
+ *
+ * @param destination
+ */
+ private void startPublisher(final Destination destination)
+ {
+ _publisher = new Thread(new Runnable()
+ {
+
+ public void run()
+ {
+ try
+ {
+ Connection connection = getConnection();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ MessageProducer publisher = session.createProducer(destination);
+
+ for (int count = 0; count < MAX_QUEUE_MESSAGE_COUNT; count++)
+ {
+ publisher.send(createNextMessage(session, count));
+ session.commit();
+ }
+ }
+ catch (Exception e)
+ {
+ _publisherError = e;
+ _disconnectionLatch.countDown();
+ }
+ }
+ });
+
+ _publisher.start();
+ }
+
+
+
+ /**
+ * Perform the Main test of a topic Consumer with the given AckMode.
+ *
+ * Test creates a new connection and sets up the connection to prevent
+ * failover
+ *
+ * A new consumer is connected and started so that it will prefetch msgs.
+ *
+ * An asynchrounous publisher is started to fill the broker with messages.
+ *
+ * We then wait to be notified of the disconnection via the ExceptionListener
+ *
+ * 0-10 does not have the same notification paths but sync() apparently should
+ * give us the exception, currently it doesn't, so the test is excluded from 0-10
+ *
+ * We should ensure that this test has the same path for all protocol versions.
+ *
+ * Clients should not have to modify their code based on the protocol in use.
+ *
+ * @param ackMode @see javax.jms.Session
+ *
+ * @throws Exception
+ */
+ protected void topicConsumer(int ackMode, boolean durable) throws Exception
+ {
+ Connection connection = getConnection();
+
+ connection.setExceptionListener(this);
+
+ Session session = connection.createSession(ackMode == Session.SESSION_TRANSACTED, ackMode);
+
+ _destination = session.createTopic(getName());
+
+ MessageConsumer consumer;
+
+ if (durable)
+ {
+ consumer = session.createDurableSubscriber(_destination, getTestQueueName());
+ }
+ else
+ {
+ consumer = session.createConsumer(_destination);
+ }
+
+ connection.start();
+
+ // Start the consumer pre-fetching
+ // Don't care about response as we will fill the broker up with messages
+ // after this point and ensure that the client is disconnected at the
+ // right point.
+ consumer.receiveNoWait();
+ startPublisher(_destination);
+
+ boolean disconnected = _disconnectionLatch.await(DISCONNECTION_WAIT, TimeUnit.SECONDS);
+
+ if (!disconnected && isBroker010())
+ {
+ try
+ {
+ ((AMQSession_0_10) session).sync();
+ }
+ catch (AMQException amqe)
+ {
+ JMSException jmsException = new JMSException(amqe.getMessage());
+ jmsException.setLinkedException(amqe);
+ jmsException.initCause(amqe);
+ _connectionException = jmsException;
+ }
+ }
+
+ assertTrue("Client was not disconnected.", _connectionException != null);
+
+ Exception linked = _connectionException.getLinkedException();
+
+ _publisher.join(JOIN_WAIT);
+
+ assertFalse("Publisher still running", _publisher.isAlive());
+
+ //Validate publishing occurred ok
+ if (_publisherError != null)
+ {
+ throw _publisherError;
+ }
+
+ // NOTE these exceptions will need to be modeled so that they are not
+ // 0-8 specific. e.g. JMSSessionClosedException
+
+ assertNotNull("No error received onException listener.", _connectionException);
+
+ assertNotNull("No linked exception set on:" + _connectionException.getMessage(), linked);
+
+ assertEquals("Incorrect linked exception received.", AMQChannelClosedException.class, linked.getClass());
+
+ AMQChannelClosedException ccException = (AMQChannelClosedException) linked;
+
+ assertEquals("Channel was not closed with correct code.", AMQConstant.RESOURCE_ERROR, ccException.getErrorCode());
+ }
+
+
+ // Exception Listener
+
+ public void onException(JMSException e)
+ {
+ _connectionException = e;
+
+ e.printStackTrace();
+
+ _disconnectionLatch.countDown();
+ }
+
+ /// Connection Listener
+
+ public void bytesSent(long count)
+ {
+ }
+
+ public void bytesReceived(long count)
+ {
+ }
+
+ public boolean preFailover(boolean redirect)
+ {
+ // Prevent Failover
+ return false;
+ }
+
+ public boolean preResubscribe()
+ {
+ return false;
+ }
+
+ public void failoverComplete()
+ {
+ }
+}
diff --git a/java/systests/src/main/java/org/apache/qpid/systest/TopicTest.java b/java/systests/src/main/java/org/apache/qpid/systest/TopicTest.java
new file mode 100644
index 0000000000..09c849cfde
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/systest/TopicTest.java
@@ -0,0 +1,85 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systest;
+
+import org.apache.commons.configuration.ConfigurationException;
+
+import javax.naming.NamingException;
+import java.io.IOException;
+
+/**
+ * This Topic test extends the Global queue test so it will run all the topic
+ * and subscription tests.
+ *
+ * We redefine the CONFIG_SECTION here so that the configuration is written
+ * against a topic element.
+ *
+ * To complete the migration to testing 'topic' elements we also override
+ * the setConfig to use the test name as the topic name.
+ *
+ */
+public class TopicTest extends GlobalQueuesTest
+{
+ private int _count=0;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ CONFIG_SECTION = ".topics.topic";
+ super.setUp();
+ }
+
+ /**
+ * Add configuration for the queue that relates just to this test.
+ * We use the getTestQueueName() as our subscription. To ensure the
+ * config sections do not overlap we identify each section with a _count
+ * value.
+ *
+ * This would allow each test to configure more than one section.
+ *
+ * @param property to set
+ * @param value the value to set
+ * @param deleteDurable should deleteDurable be set.
+ * @throws NamingException
+ * @throws IOException
+ * @throws ConfigurationException
+ */
+ @Override
+ public void setConfig(String property, String value, boolean deleteDurable) throws NamingException, IOException, ConfigurationException
+ {
+ setProperty(CONFIG_SECTION + "("+_count+").name", getName());
+
+ setProperty(CONFIG_SECTION + "("+_count+").slow-consumer-detection." +
+ "policy.name", "TopicDelete");
+
+ setProperty(CONFIG_SECTION + "("+_count+").slow-consumer-detection." +
+ property, value);
+
+ if (deleteDurable)
+ {
+ setProperty(CONFIG_SECTION + "("+_count+").slow-consumer-detection." +
+ "policy.topicdelete.delete-persistent", "");
+ }
+ _count++;
+ }
+
+
+}