diff options
| author | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-07-22 13:09:56 +0000 |
|---|---|---|
| committer | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-07-22 13:09:56 +0000 |
| commit | 8dbbb9ad305ca936fb924420ec31e27e9a9d0caf (patch) | |
| tree | 949ab5a030574ea199ee9821367eedb0c84131b6 /java/systests | |
| parent | cfb3a1ef5b743d68a2a78754ef0fdc750378d3cc (diff) | |
| download | qpid-python-8dbbb9ad305ca936fb924420ec31e27e9a9d0caf.tar.gz | |
QPID-2682: Move slow consumer disconnection mechanism to the broker
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@966637 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/systests')
6 files changed, 868 insertions, 0 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/systest/GlobalQueuesTest.java b/java/systests/src/main/java/org/apache/qpid/systest/GlobalQueuesTest.java new file mode 100644 index 0000000000..e0934faf44 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/systest/GlobalQueuesTest.java @@ -0,0 +1,222 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.systest; + +import org.apache.commons.configuration.ConfigurationException; + +import javax.jms.Session; +import javax.naming.NamingException; +import java.io.IOException; + +/** + * QPID-1447 : Add slow consumer detection and disconnection. + * + * Slow consumers should on a topic should expect to receive a + * 506 : Resource Error if the hit a predefined threshold. + */ +public class GlobalQueuesTest extends TestingBaseCase +{ + + protected String CONFIG_SECTION = ".queues"; + + /** + * Queue Configuration + + <slow-consumer-detection> + <!-- The depth before which the policy will be applied--> + <depth>4235264</depth> + + <!-- The message age before which the policy will be applied--> + <messageAge>600000</messageAge> + + <!-- The number of message before which the policy will be applied--> + <messageCount>50</messageCount> + + <!-- Policies configuration --> + <policy> + <name>TopicDelete</name> + <topicDelete> + <delete-persistent/> + </topicDelete> + </policy> + </slow-consumer-detection> + + */ + + /** + * VirtualHost Plugin Configuration + + <slow-consumer-detection> + <delay>1</delay> + <timeunit>MINUTES</timeunit> + </slow-consumer-detection> + + */ + + public void setConfig(String property, String value, boolean deleteDurable) throws NamingException, IOException, ConfigurationException + { + setProperty(CONFIG_SECTION + ".slow-consumer-detection." + + "policy.name", "TopicDelete"); + + setProperty(CONFIG_SECTION + ".slow-consumer-detection." + + property, value); + + if (deleteDurable) + { + setProperty(CONFIG_SECTION + ".slow-consumer-detection." + + "policy.topicdelete.delete-persistent", ""); + } + } + + /** + * Test that setting messageCount takes affect on topics + * + * We send 10 messages and disconnect at 9 + * + * @throws Exception + */ + public void testTopicConsumerMessageCount() throws Exception + { + MAX_QUEUE_MESSAGE_COUNT = 10; + + setConfig("messageCount", String.valueOf(MAX_QUEUE_MESSAGE_COUNT - 1), false); + + //Start the broker + startBroker(); + + topicConsumer(Session.AUTO_ACKNOWLEDGE, false); + } + + /** + * Test that setting depth has an effect on topics + * + * Sets the message size for the test + * Sets the depth to be 9 * the depth + * Ensure that sending 10 messages causes the disconnection + * + * @throws Exception + */ + public void testTopicConsumerMessageSize() throws Exception + { + MAX_QUEUE_MESSAGE_COUNT = 10; + + setConfig("depth", String.valueOf(MESSAGE_SIZE * 9), false); + + //Start the broker + startBroker(); + + setMessageSize(MESSAGE_SIZE); + + topicConsumer(Session.AUTO_ACKNOWLEDGE, false); + } + + /** + * Test that setting messageAge has an effect on topics + * + * Sets the messageAge to be half the disconnection wait timeout + * Send 10 messages and then ensure that we get disconnected as we will + * wait for the full timeout. + * + * @throws Exception + */ + public void testTopicConsumerMessageAge() throws Exception + { + MAX_QUEUE_MESSAGE_COUNT = 10; + + setConfig("messageAge", String.valueOf(DISCONNECTION_WAIT / 2), false); + + //Start the broker + startBroker(); + + topicConsumer(Session.AUTO_ACKNOWLEDGE, false); + } + + /** + * Test that setting messageCount takes affect on a durable Consumer + * + * Ensure we set the delete-persistent option + * + * We send 10 messages and disconnect at 9 + * + * @throws Exception + */ + + public void testTopicDurableConsumerMessageCount() throws Exception + { + MAX_QUEUE_MESSAGE_COUNT = 10; + + setConfig("messageCount", String.valueOf(MAX_QUEUE_MESSAGE_COUNT - 1), true); + + //Start the broker + startBroker(); + + topicConsumer(Session.AUTO_ACKNOWLEDGE, true); + } + + /** + * Test that setting depth has an effect on durable consumer topics + * + * Ensure we set the delete-persistent option + * + * Sets the message size for the test + * Sets the depth to be 9 * the depth + * Ensure that sending 10 messages causes the disconnection + * + * @throws Exception + */ + public void testTopicDurableConsumerMessageSize() throws Exception + { + MAX_QUEUE_MESSAGE_COUNT = 10; + + setConfig("depth", String.valueOf(MESSAGE_SIZE * 9), true); + + //Start the broker + startBroker(); + + setMessageSize(MESSAGE_SIZE); + + topicConsumer(Session.AUTO_ACKNOWLEDGE, true); + } + + /** + * Test that setting messageAge has an effect on topics + * + * Ensure we set the delete-persistent option + * + * Sets the messageAge to be 1/5 the disconnection wait timeout (or 1sec) + * Send 10 messages and then ensure that we get disconnected as we will + * wait for the full timeout. + * + * @throws Exception + */ + public void testTopicDurableConsumerMessageAge() throws Exception + { + MAX_QUEUE_MESSAGE_COUNT = 10; + + setConfig("messageAge", String.valueOf(DISCONNECTION_WAIT / 5), true); + + //Start the broker + startBroker(); + + topicConsumer(Session.AUTO_ACKNOWLEDGE, true); + } + +} diff --git a/java/systests/src/main/java/org/apache/qpid/systest/GlobalTopicsTest.java b/java/systests/src/main/java/org/apache/qpid/systest/GlobalTopicsTest.java new file mode 100644 index 0000000000..aff5d1b1b8 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/systest/GlobalTopicsTest.java @@ -0,0 +1,36 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.systest; + +import org.apache.commons.configuration.ConfigurationException; + +import javax.naming.NamingException; +import java.io.IOException; + +public class GlobalTopicsTest extends GlobalQueuesTest +{ + @Override + public void setUp() throws Exception + { + CONFIG_SECTION = ".topics"; + super.setUp(); + } +} diff --git a/java/systests/src/main/java/org/apache/qpid/systest/MergeConfigurationTest.java b/java/systests/src/main/java/org/apache/qpid/systest/MergeConfigurationTest.java new file mode 100644 index 0000000000..e4efac60f8 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/systest/MergeConfigurationTest.java @@ -0,0 +1,124 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.systest; + +import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.AMQChannelClosedException; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQSession_0_10; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; +import javax.naming.NamingException; +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class MergeConfigurationTest extends TestingBaseCase +{ + + protected int topicCount = 0; + + + public void configureTopic(String topic, int msgCount) throws NamingException, IOException, ConfigurationException + { + + setProperty(".topics.topic("+topicCount+").name", topic); + setProperty(".topics.topic("+topicCount+").slow-consumer-detection.messageCount", String.valueOf(msgCount)); + setProperty(".topics.topic("+topicCount+").slow-consumer-detection.policy.name", "TopicDelete"); + topicCount++; + } + + + /** + * Test that setting messageCount takes affect on topics + * + * We send 10 messages and disconnect at 9 + * + * @throws Exception + */ + public void testTopicConsumerMessageCount() throws Exception + { + MAX_QUEUE_MESSAGE_COUNT = 10; + + configureTopic(getName(), (MAX_QUEUE_MESSAGE_COUNT * 4) - 1); + + //Configure topic as a subscription + setProperty(".topics.topic("+topicCount+").subscriptionName", "clientid:"+getTestQueueName()); + configureTopic(getName(), (MAX_QUEUE_MESSAGE_COUNT - 1)); + + + + //Start the broker + startBroker(); + + topicConsumer(Session.AUTO_ACKNOWLEDGE, true); + } + + +// +// public void testMerge() throws ConfigurationException, AMQException +// { +// +// AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString(getName()+":stockSubscription"), false, new AMQShortString("testowner"), +// false, false, _virtualHost, null); +// +// _virtualHost.getQueueRegistry().registerQueue(queue); +// Exchange defaultExchange = _virtualHost.getExchangeRegistry().getDefaultExchange(); +// _virtualHost.getBindingFactory().addBinding(getName(), queue, defaultExchange, null); +// +// +// Exchange topicExchange = _virtualHost.getExchangeRegistry().getExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME); +// _virtualHost.getBindingFactory().addBinding("stocks.nyse.orcl", queue, topicExchange, null); +// +// TopicConfig config = queue.getConfiguration().getConfiguration(TopicConfig.class.getName()); +// +// assertNotNull("Queue should have topic configuration bound to it.", config); +// assertEquals("Configuration name not correct", getName() + ":stockSubscription", config.getSubscriptionName()); +// +// ConfigurationPlugin scdConfig = queue.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName()); +// if (scdConfig instanceof org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionQueueConfiguration) +// { +// System.err.println("********************** scd is a SlowConsumerDetectionQueueConfiguration."); +// } +// else +// { +// System.err.println("********************** Test SCD "+SlowConsumerDetectionQueueConfiguration.class.getClassLoader()); +// System.err.println("********************** Broker SCD "+scdConfig.getClass().getClassLoader()); +// System.err.println("********************** Broker SCD "+scdConfig.getClass().isAssignableFrom(SlowConsumerDetectionQueueConfiguration.class)); +// System.err.println("********************** is a "+scdConfig.getClass()); +// } +// +// assertNotNull("Queue should have scd configuration bound to it.", scdConfig); +// assertEquals("MessageCount is not correct", 10 , ((SlowConsumerDetectionQueueConfiguration)scdConfig).getMessageCount()); +// assertEquals("Policy is not correct", TopicDeletePolicy.class.getName() , ((SlowConsumerDetectionQueueConfiguration)scdConfig).getPolicy().getClass().getName()); +// } + +} diff --git a/java/systests/src/main/java/org/apache/qpid/systest/SubscriptionTest.java b/java/systests/src/main/java/org/apache/qpid/systest/SubscriptionTest.java new file mode 100644 index 0000000000..9e9375fd44 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/systest/SubscriptionTest.java @@ -0,0 +1,146 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.systest; + +import org.apache.commons.configuration.ConfigurationException; + +import javax.jms.Session; +import javax.naming.NamingException; +import java.io.IOException; + +/** + * Test SCD when configured with Subscription details. + * + * We run the subscription based tests here to validate that the + * subscriptionname value is correctly associated with the subscription. + * + * + */ +public class SubscriptionTest extends TestingBaseCase +{ + private int _count=0; + protected String CONFIG_SECTION = ".topics.topic"; + + /** + * Add configuration for the queue that relates just to this test. + * We use the getTestQueueName() as our subscription. To ensure the + * config sections do not overlap we identify each section with a _count + * value. + * + * This would allow each test to configure more than one section. + * + * @param property to set + * @param value the value to set + * @param deleteDurable should deleteDurable be set. + * @throws NamingException + * @throws IOException + * @throws ConfigurationException + */ + public void setConfig(String property, String value, boolean deleteDurable) throws NamingException, IOException, ConfigurationException + { + setProperty(CONFIG_SECTION + "("+_count+").subscriptionName", "clientid:"+getTestQueueName()); + + setProperty(CONFIG_SECTION + "("+_count+").slow-consumer-detection." + + "policy.name", "TopicDelete"); + + setProperty(CONFIG_SECTION + "("+_count+").slow-consumer-detection." + + property, value); + + if (deleteDurable) + { + setProperty(CONFIG_SECTION + "("+_count+").slow-consumer-detection." + + "policy.topicdelete.delete-persistent", ""); + } + _count++; + } + + + /** + * Test that setting messageCount takes affect on a durable Consumer + * + * Ensure we set the delete-persistent option + * + * We send 10 messages and disconnect at 9 + * + * @throws Exception + */ + + public void testTopicDurableConsumerMessageCount() throws Exception + { + MAX_QUEUE_MESSAGE_COUNT = 10; + + setConfig("messageCount", String.valueOf(MAX_QUEUE_MESSAGE_COUNT - 1), true); + + //Start the broker + startBroker(); + + topicConsumer(Session.AUTO_ACKNOWLEDGE, true); + } + + /** + * Test that setting depth has an effect on durable consumer topics + * + * Ensure we set the delete-persistent option + * + * Sets the message size for the test + * Sets the depth to be 9 * the depth + * Ensure that sending 10 messages causes the disconnection + * + * @throws Exception + */ + public void testTopicDurableConsumerMessageSize() throws Exception + { + MAX_QUEUE_MESSAGE_COUNT = 10; + + setConfig("depth", String.valueOf(MESSAGE_SIZE * 9), true); + + //Start the broker + startBroker(); + + setMessageSize(MESSAGE_SIZE); + + topicConsumer(Session.AUTO_ACKNOWLEDGE, true); + } + + /** + * Test that setting messageAge has an effect on topics + * + * Ensure we set the delete-persistent option + * + * Sets the messageAge to be 1/5 the disconnection wait timeout (or 1sec) + * Send 10 messages and then ensure that we get disconnected as we will + * wait for the full timeout. + * + * @throws Exception + */ + public void testTopicDurableConsumerMessageAge() throws Exception + { + MAX_QUEUE_MESSAGE_COUNT = 10; + + setConfig("messageAge", String.valueOf(DISCONNECTION_WAIT / 5), true); + + //Start the broker + startBroker(); + + topicConsumer(Session.AUTO_ACKNOWLEDGE, true); + } + +} diff --git a/java/systests/src/main/java/org/apache/qpid/systest/TestingBaseCase.java b/java/systests/src/main/java/org/apache/qpid/systest/TestingBaseCase.java new file mode 100644 index 0000000000..9831c74574 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/systest/TestingBaseCase.java @@ -0,0 +1,255 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.systest; + +import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.AMQChannelClosedException; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQSession_0_10; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; +import javax.naming.NamingException; +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class TestingBaseCase extends QpidBrokerTestCase implements ExceptionListener, ConnectionListener +{ + + Topic _destination; + protected CountDownLatch _disconnectionLatch = new CountDownLatch(1); + protected int MAX_QUEUE_MESSAGE_COUNT; + protected int MESSAGE_SIZE = DEFAULT_MESSAGE_SIZE; + + private Thread _publisher; + protected static final long DISCONNECTION_WAIT = 5; + protected Exception _publisherError = null; + protected JMSException _connectionException = null; + private static final long JOIN_WAIT = 5000; + + @Override + public void setUp() throws Exception + { + + setConfigurationProperty("virtualhosts.virtualhost." + + getConnectionURL().getVirtualHost().substring(1) + + ".slow-consumer-detection.delay", "1"); + + setConfigurationProperty("virtualhosts.virtualhost." + + getConnectionURL().getVirtualHost().substring(1) + + ".slow-consumer-detection.timeunit", "SECONDS"); + + } + + + protected void setProperty(String property, String value) throws NamingException, IOException, ConfigurationException + { + setConfigurationProperty("virtualhosts.virtualhost." + + getConnectionURL().getVirtualHost().substring(1) + + property, value); + } + + + /** + * Create and start an asynchrounous publisher that will send MAX_QUEUE_MESSAGE_COUNT + * messages to the provided destination. Messages are sent in a new connection + * on a transaction. Any error is captured and the test is signalled to exit. + * + * @param destination + */ + private void startPublisher(final Destination destination) + { + _publisher = new Thread(new Runnable() + { + + public void run() + { + try + { + Connection connection = getConnection(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + + MessageProducer publisher = session.createProducer(destination); + + for (int count = 0; count < MAX_QUEUE_MESSAGE_COUNT; count++) + { + publisher.send(createNextMessage(session, count)); + session.commit(); + } + } + catch (Exception e) + { + _publisherError = e; + _disconnectionLatch.countDown(); + } + } + }); + + _publisher.start(); + } + + + + /** + * Perform the Main test of a topic Consumer with the given AckMode. + * + * Test creates a new connection and sets up the connection to prevent + * failover + * + * A new consumer is connected and started so that it will prefetch msgs. + * + * An asynchrounous publisher is started to fill the broker with messages. + * + * We then wait to be notified of the disconnection via the ExceptionListener + * + * 0-10 does not have the same notification paths but sync() apparently should + * give us the exception, currently it doesn't, so the test is excluded from 0-10 + * + * We should ensure that this test has the same path for all protocol versions. + * + * Clients should not have to modify their code based on the protocol in use. + * + * @param ackMode @see javax.jms.Session + * + * @throws Exception + */ + protected void topicConsumer(int ackMode, boolean durable) throws Exception + { + Connection connection = getConnection(); + + connection.setExceptionListener(this); + + Session session = connection.createSession(ackMode == Session.SESSION_TRANSACTED, ackMode); + + _destination = session.createTopic(getName()); + + MessageConsumer consumer; + + if (durable) + { + consumer = session.createDurableSubscriber(_destination, getTestQueueName()); + } + else + { + consumer = session.createConsumer(_destination); + } + + connection.start(); + + // Start the consumer pre-fetching + // Don't care about response as we will fill the broker up with messages + // after this point and ensure that the client is disconnected at the + // right point. + consumer.receiveNoWait(); + startPublisher(_destination); + + boolean disconnected = _disconnectionLatch.await(DISCONNECTION_WAIT, TimeUnit.SECONDS); + + if (!disconnected && isBroker010()) + { + try + { + ((AMQSession_0_10) session).sync(); + } + catch (AMQException amqe) + { + JMSException jmsException = new JMSException(amqe.getMessage()); + jmsException.setLinkedException(amqe); + jmsException.initCause(amqe); + _connectionException = jmsException; + } + } + + assertTrue("Client was not disconnected.", _connectionException != null); + + Exception linked = _connectionException.getLinkedException(); + + _publisher.join(JOIN_WAIT); + + assertFalse("Publisher still running", _publisher.isAlive()); + + //Validate publishing occurred ok + if (_publisherError != null) + { + throw _publisherError; + } + + // NOTE these exceptions will need to be modeled so that they are not + // 0-8 specific. e.g. JMSSessionClosedException + + assertNotNull("No error received onException listener.", _connectionException); + + assertNotNull("No linked exception set on:" + _connectionException.getMessage(), linked); + + assertEquals("Incorrect linked exception received.", AMQChannelClosedException.class, linked.getClass()); + + AMQChannelClosedException ccException = (AMQChannelClosedException) linked; + + assertEquals("Channel was not closed with correct code.", AMQConstant.RESOURCE_ERROR, ccException.getErrorCode()); + } + + + // Exception Listener + + public void onException(JMSException e) + { + _connectionException = e; + + e.printStackTrace(); + + _disconnectionLatch.countDown(); + } + + /// Connection Listener + + public void bytesSent(long count) + { + } + + public void bytesReceived(long count) + { + } + + public boolean preFailover(boolean redirect) + { + // Prevent Failover + return false; + } + + public boolean preResubscribe() + { + return false; + } + + public void failoverComplete() + { + } +} diff --git a/java/systests/src/main/java/org/apache/qpid/systest/TopicTest.java b/java/systests/src/main/java/org/apache/qpid/systest/TopicTest.java new file mode 100644 index 0000000000..09c849cfde --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/systest/TopicTest.java @@ -0,0 +1,85 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.systest; + +import org.apache.commons.configuration.ConfigurationException; + +import javax.naming.NamingException; +import java.io.IOException; + +/** + * This Topic test extends the Global queue test so it will run all the topic + * and subscription tests. + * + * We redefine the CONFIG_SECTION here so that the configuration is written + * against a topic element. + * + * To complete the migration to testing 'topic' elements we also override + * the setConfig to use the test name as the topic name. + * + */ +public class TopicTest extends GlobalQueuesTest +{ + private int _count=0; + + @Override + public void setUp() throws Exception + { + CONFIG_SECTION = ".topics.topic"; + super.setUp(); + } + + /** + * Add configuration for the queue that relates just to this test. + * We use the getTestQueueName() as our subscription. To ensure the + * config sections do not overlap we identify each section with a _count + * value. + * + * This would allow each test to configure more than one section. + * + * @param property to set + * @param value the value to set + * @param deleteDurable should deleteDurable be set. + * @throws NamingException + * @throws IOException + * @throws ConfigurationException + */ + @Override + public void setConfig(String property, String value, boolean deleteDurable) throws NamingException, IOException, ConfigurationException + { + setProperty(CONFIG_SECTION + "("+_count+").name", getName()); + + setProperty(CONFIG_SECTION + "("+_count+").slow-consumer-detection." + + "policy.name", "TopicDelete"); + + setProperty(CONFIG_SECTION + "("+_count+").slow-consumer-detection." + + property, value); + + if (deleteDurable) + { + setProperty(CONFIG_SECTION + "("+_count+").slow-consumer-detection." + + "policy.topicdelete.delete-persistent", ""); + } + _count++; + } + + +} |
