From d4c28fabf668dcef1cfd67c7a73e57dc0d168dfd Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Mon, 14 Jun 2010 12:36:56 +0000 Subject: QPID-2638 : Add initial support for Topics section in configuration file. Added getQueueConfiguration(AMQQueue) which will return a new configuration for the given queue reflecting its binding status. This will allow the queue to be reconfigured during the binding process. Full Docs on this approach to appear on wiki. AMQQueue.configure and getConfiguration() have been updated to use ConfigurationPlugin rather than QueueConfiguration, The queue may be configured by a TopicConfiguration now. Update SlowConsumerTest to be GlobalQueuesTest and add a GlobalTopicsTest to match, where the config is added to the queues or topics section respectively git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@954433 13f79535-47bb-0310-9956-ffa450edef68 --- .../SlowConsumerDetectionQueueConfiguration.java | 2 +- .../virtualhost/plugin/SlowConsumerDetection.java | 4 +- .../plugin/policies/TopicDeletePolicy.java | 2 +- .../org/apache/qpid/systest/GlobalQueuesTest.java | 450 ++++++++++++++++++++ .../org/apache/qpid/systest/GlobalTopicsTest.java | 36 ++ .../org/apache/qpid/systest/SlowConsumerTest.java | 465 --------------------- 6 files changed, 490 insertions(+), 469 deletions(-) create mode 100644 java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalQueuesTest.java create mode 100644 java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalTopicsTest.java delete mode 100644 java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java (limited to 'java/broker-plugins/experimental') diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java index c094c7eb6d..0949204a33 100644 --- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java +++ b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java @@ -95,7 +95,7 @@ public class SlowConsumerDetectionQueueConfiguration extends ConfigurationPlugin "('messageAge','depth' or 'messageCount') must be specified."); } - SlowConsumerDetectionPolicyConfiguration policyConfig = getConfiguration(SlowConsumerDetectionPolicyConfiguration.class); + SlowConsumerDetectionPolicyConfiguration policyConfig = getConfiguration(SlowConsumerDetectionPolicyConfiguration.class.getName()); PluginManager pluginManager = ApplicationRegistry.getInstance().getPluginManager(); Map factories = pluginManager.getPlugins(SlowConsumerPolicyPluginFactory.class); diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java index 77819f8dbf..cac52c2fdf 100644 --- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java +++ b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java @@ -40,7 +40,7 @@ class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin { public SlowConsumerDetection newInstance(VirtualHost vhost) { - SlowConsumerDetectionConfiguration config = vhost.getConfiguration().getConfiguration(SlowConsumerDetectionConfiguration.class); + SlowConsumerDetectionConfiguration config = vhost.getConfiguration().getConfiguration(SlowConsumerDetectionConfiguration.class.getName()); if (config == null) { @@ -74,7 +74,7 @@ class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin try { SlowConsumerDetectionQueueConfiguration config = - q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class); + q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName()); if (checkQueueStatus(q, config)) { diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java index 3c67f6e6d7..c5275aa0bf 100644 --- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java +++ b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java @@ -45,7 +45,7 @@ public class TopicDeletePolicy implements SlowConsumerPolicyPlugin public TopicDeletePolicy newInstance(ConfigurationPlugin configuration) throws ConfigurationException { TopicDeletePolicyConfiguration config = - configuration.getConfiguration(TopicDeletePolicyConfiguration.class); + configuration.getConfiguration(TopicDeletePolicyConfiguration.class.getName()); TopicDeletePolicy policy = new TopicDeletePolicy(); policy.configure(config); diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalQueuesTest.java b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalQueuesTest.java new file mode 100644 index 0000000000..513bafa8ad --- /dev/null +++ b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalQueuesTest.java @@ -0,0 +1,450 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.systest; + +import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.AMQChannelClosedException; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQSession_0_10; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; +import javax.naming.NamingException; +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * QPID-1447 : Add slow consumer detection and disconnection. + * + * Slow consumers should on a topic should expect to receive a + * 506 : Resource Error if the hit a predefined threshold. + */ +public class GlobalQueuesTest extends QpidBrokerTestCase implements ExceptionListener, ConnectionListener +{ + Topic _destination; + private CountDownLatch _disconnectionLatch = new CountDownLatch(1); + private int MAX_QUEUE_MESSAGE_COUNT; + private int MESSAGE_SIZE = DEFAULT_MESSAGE_SIZE; + + private Thread _publisher; + private static final long DISCONNECTION_WAIT = 5; + private Exception _publisherError = null; + private JMSException _connectionException = null; + private static final long JOIN_WAIT = 5000; + protected String CONFIG_SECTION = ".queues"; + + @Override + public void setUp() throws IOException, ConfigurationException, NamingException + { + setConfigurationProperty("virtualhosts.virtualhost." + + getConnectionURL().getVirtualHost().substring(1) + + ".slow-consumer-detection.delay", "1"); + + setConfigurationProperty("virtualhosts.virtualhost." + + getConnectionURL().getVirtualHost().substring(1) + + ".slow-consumer-detection.timeunit", "SECONDS"); + + /** + * Queue Configuration + + + + 4235264 + + + 600000 + + + 50 + + + + TopicDelete + + + + + + + */ + + /** + * VirtualHost Plugin Configuration + + + 1 + MINUTES + + + */ + } + + /** + * Perform the Main test of a topic Consumer with the given AckMode. + * + * Test creates a new connection and sets up the connection to prevent + * failover + * + * A new consumer is connected and started so that it will prefetch msgs. + * + * An asynchrounous publisher is started to fill the broker with messages. + * + * We then wait to be notified of the disconnection via the ExceptionListener + * + * 0-10 does not have the same notification paths but sync() apparently should + * give us the exception, currently it doesn't, so the test is excluded from 0-10 + * + * We should ensure that this test has the same path for all protocol versions. + * + * Clients should not have to modify their code based on the protocol in use. + * + * @param ackMode @see javax.jms.Session + * + * @throws Exception + */ + public void topicConsumer(int ackMode, boolean durable) throws Exception + { + Connection connection = getConnection(); + + connection.setExceptionListener(this); + + Session session = connection.createSession(ackMode == Session.SESSION_TRANSACTED, ackMode); + + _destination = session.createTopic(getName()); + + MessageConsumer consumer; + + if (durable) + { + consumer = session.createDurableSubscriber(_destination, getTestQueueName()); + } + else + { + consumer = session.createConsumer(_destination); + } + + connection.start(); + + // Start the consumer pre-fetching + // Don't care about response as we will fill the broker up with messages + // after this point and ensure that the client is disconnected at the + // right point. + consumer.receiveNoWait(); + startPublisher(_destination); + + boolean disconnected = _disconnectionLatch.await(DISCONNECTION_WAIT, TimeUnit.SECONDS); + + if (!disconnected && isBroker010()) + { + try + { + ((AMQSession_0_10) session).sync(); + } + catch (AMQException amqe) + { + JMSException jmsException = new JMSException(amqe.getMessage()); + jmsException.setLinkedException(amqe); + jmsException.initCause(amqe); + _connectionException = jmsException; + } + } + + assertTrue("Client was not disconnected.", _connectionException != null); + + Exception linked = _connectionException.getLinkedException(); + + _publisher.join(JOIN_WAIT); + + assertFalse("Publisher still running", _publisher.isAlive()); + + //Validate publishing occurred ok + if (_publisherError != null) + { + throw _publisherError; + } + + // NOTE these exceptions will need to be modeled so that they are not + // 0-8 specific. e.g. JMSSessionClosedException + + assertNotNull("No error received onException listener.", _connectionException); + + assertNotNull("No linked exception set on:" + _connectionException.getMessage(), linked); + + assertEquals("Incorrect linked exception received.", AMQChannelClosedException.class, linked.getClass()); + + AMQChannelClosedException ccException = (AMQChannelClosedException) linked; + + assertEquals("Channel was not closed with correct code.", AMQConstant.RESOURCE_ERROR, ccException.getErrorCode()); + } + + /** + * Create and start an asynchrounous publisher that will send MAX_QUEUE_MESSAGE_COUNT + * messages to the provided destination. Messages are sent in a new connection + * on a transaction. Any error is captured and the test is signalled to exit. + * + * @param destination + */ + private void startPublisher(final Destination destination) + { + _publisher = new Thread(new Runnable() + { + + public void run() + { + try + { + Connection connection = getConnection(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + + MessageProducer publisher = session.createProducer(destination); + + for (int count = 0; count < MAX_QUEUE_MESSAGE_COUNT; count++) + { + publisher.send(createNextMessage(session, count)); + session.commit(); + } + } + catch (Exception e) + { + _publisherError = e; + _disconnectionLatch.countDown(); + } + } + }); + + _publisher.start(); + } + + /** + * Test to write: Check that exclusive Transient Queues are not + * disconnected. i.e. JMS Temporary Queues + * + * @param ackMode + * + * @throws Exception + */ + public void exclusiveTransientQueue(int ackMode) throws Exception + { + + } + + public void setConfig(String property, String value, boolean deleteDurable) throws NamingException, IOException, ConfigurationException + { + setConfigurationProperty("virtualhosts.virtualhost." + + getConnectionURL().getVirtualHost().substring(1) + + CONFIG_SECTION + ".slow-consumer-detection." + + "policy.name", "TopicDelete"); + + setConfigurationProperty("virtualhosts.virtualhost." + + getConnectionURL().getVirtualHost().substring(1) + + CONFIG_SECTION + ".slow-consumer-detection." + + property, value); + + if (deleteDurable) + { + setConfigurationProperty("virtualhosts.virtualhost." + + getConnectionURL().getVirtualHost().substring(1) + + CONFIG_SECTION + ".slow-consumer-detection." + + "policy.topicdelete.delete-persistent", ""); + } + } + + /** + * Test that setting messageCount takes affect on topics + * + * We send 10 messages and disconnect at 9 + * + * @throws Exception + */ + public void testTopicConsumerMessageCount() throws Exception + { + MAX_QUEUE_MESSAGE_COUNT = 10; + + setConfig("messageCount", String.valueOf(MAX_QUEUE_MESSAGE_COUNT - 1), false); + + //Start the broker + super.setUp(); + + topicConsumer(Session.AUTO_ACKNOWLEDGE, false); + } + + /** + * Test that setting depth has an effect on topics + * + * Sets the message size for the test + * Sets the depth to be 9 * the depth + * Ensure that sending 10 messages causes the disconnection + * + * @throws Exception + */ + public void testTopicConsumerMessageSize() throws Exception + { + MAX_QUEUE_MESSAGE_COUNT = 10; + + setConfig("depth", String.valueOf(MESSAGE_SIZE * 9), false); + + //Start the broker + super.setUp(); + + setMessageSize(MESSAGE_SIZE); + + topicConsumer(Session.AUTO_ACKNOWLEDGE, false); + } + + /** + * Test that setting messageAge has an effect on topics + * + * Sets the messageAge to be half the disconnection wait timeout + * Send 10 messages and then ensure that we get disconnected as we will + * wait for the full timeout. + * + * @throws Exception + */ + public void testTopicConsumerMessageAge() throws Exception + { + MAX_QUEUE_MESSAGE_COUNT = 10; + + setConfig("messageAge", String.valueOf(DISCONNECTION_WAIT / 2), false); + + //Start the broker + super.setUp(); + + topicConsumer(Session.AUTO_ACKNOWLEDGE, false); + } + + /** + * Test that setting messageCount takes affect on a durable Consumer + * + * Ensure we set the delete-persistent option + * + * We send 10 messages and disconnect at 9 + * + * @throws Exception + */ + + public void testTopicDurableConsumerMessageCount() throws Exception + { + MAX_QUEUE_MESSAGE_COUNT = 10; + + setConfig("messageCount", String.valueOf(MAX_QUEUE_MESSAGE_COUNT - 1), true); + + //Start the broker + super.setUp(); + + topicConsumer(Session.AUTO_ACKNOWLEDGE, true); + } + + /** + * Test that setting depth has an effect on durable consumer topics + * + * Ensure we set the delete-persistent option + * + * Sets the message size for the test + * Sets the depth to be 9 * the depth + * Ensure that sending 10 messages causes the disconnection + * + * @throws Exception + */ + public void testTopicDurableConsumerMessageSize() throws Exception + { + MAX_QUEUE_MESSAGE_COUNT = 10; + + setConfig("depth", String.valueOf(MESSAGE_SIZE * 9), true); + + //Start the broker + super.setUp(); + + setMessageSize(MESSAGE_SIZE); + + topicConsumer(Session.AUTO_ACKNOWLEDGE, true); + } + + /** + * Test that setting messageAge has an effect on topics + * + * Ensure we set the delete-persistent option + * + * Sets the messageAge to be 1/5 the disconnection wait timeout (or 1sec) + * Send 10 messages and then ensure that we get disconnected as we will + * wait for the full timeout. + * + * @throws Exception + */ + public void testTopicDurableConsumerMessageAge() throws Exception + { + MAX_QUEUE_MESSAGE_COUNT = 10; + + setConfig("messageAge", String.valueOf(DISCONNECTION_WAIT / 5), true); + + //Start the broker + super.setUp(); + + topicConsumer(Session.AUTO_ACKNOWLEDGE, true); + } + + // Exception Listener + + public void onException(JMSException e) + { + _connectionException = e; + + System.out.println("***** SCT Received Exception: " + e); + e.printStackTrace(); + + _disconnectionLatch.countDown(); + } + + + /// Connection Listener + + public void bytesSent(long count) + { + } + + public void bytesReceived(long count) + { + } + + public boolean preFailover(boolean redirect) + { + // Prevent Failover + return false; + } + + public boolean preResubscribe() + { + return false; + } + + public void failoverComplete() + { + } +} diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalTopicsTest.java b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalTopicsTest.java new file mode 100644 index 0000000000..1f8103fa3c --- /dev/null +++ b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalTopicsTest.java @@ -0,0 +1,36 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.systest; + +import org.apache.commons.configuration.ConfigurationException; + +import javax.naming.NamingException; +import java.io.IOException; + +public class GlobalTopicsTest extends GlobalQueuesTest +{ + @Override + public void setUp() throws NamingException, IOException, ConfigurationException + { + CONFIG_SECTION = ".topics"; + super.setUp(); + } +} diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java deleted file mode 100644 index f0be3d2db0..0000000000 --- a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java +++ /dev/null @@ -1,465 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.systest; - -import org.apache.commons.configuration.ConfigurationException; -import org.apache.qpid.AMQChannelClosedException; -import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQSession_0_10; -import org.apache.qpid.jms.ConnectionListener; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.Topic; -import javax.naming.NamingException; -import java.io.IOException; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -/** - * QPID-1447 : Add slow consumer detection and disconnection. - * - * Slow consumers should on a topic should expect to receive a - * 506 : Resource Error if the hit a predefined threshold. - */ -public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionListener, ConnectionListener -{ - Topic _destination; - private CountDownLatch _disconnectionLatch = new CountDownLatch(1); - private int MAX_QUEUE_MESSAGE_COUNT; - private int MESSAGE_SIZE = DEFAULT_MESSAGE_SIZE; - - private Thread _publisher; - private static final long DISCONNECTION_WAIT = 5; - private Exception _publisherError = null; - private JMSException _connectionException = null; - private static final long JOIN_WAIT = 5000; - - @Override - public void setUp() throws IOException, ConfigurationException, NamingException - { - setConfigurationProperty("virtualhosts.virtualhost." - + getConnectionURL().getVirtualHost().substring(1) + - ".slow-consumer-detection.delay", "1"); - - setConfigurationProperty("virtualhosts.virtualhost." - + getConnectionURL().getVirtualHost().substring(1) + - ".slow-consumer-detection.timeunit", "SECONDS"); - - setConfigurationProperty("virtualhosts.virtualhost." - + getConnectionURL().getVirtualHost().substring(1) + - ".queues.slow-consumer-detection." + - "policy.name", "TopicDelete"); - - - /** - * Queue Configuration - - - - 4235264 - - - 600000 - - - 50 - - - - TopicDelete - - - - - - - */ - - /** - * VirtualHost Plugin Configuration - - - 1 - MINUTES - - - */ - } - - /** - * Perform the Main test of a topic Consumer with the given AckMode. - * - * Test creates a new connection and sets up the connection to prevent - * failover - * - * A new consumer is connected and started so that it will prefetch msgs. - * - * An asynchrounous publisher is started to fill the broker with messages. - * - * We then wait to be notified of the disconnection via the ExceptionListener - * - * 0-10 does not have the same notification paths but sync() apparently should - * give us the exception, currently it doesn't, so the test is excluded from 0-10 - * - * We should ensure that this test has the same path for all protocol versions. - * - * Clients should not have to modify their code based on the protocol in use. - * - * @param ackMode @see javax.jms.Session - * @throws Exception - */ - public void topicConsumer(int ackMode, boolean durable) throws Exception - { - Connection connection = getConnection(); - - connection.setExceptionListener(this); - - Session session = connection.createSession(ackMode == Session.SESSION_TRANSACTED, ackMode); - - _destination = session.createTopic(getName()); - - MessageConsumer consumer; - - if (durable) - { - consumer = session.createDurableSubscriber(_destination, getTestQueueName()); - } - else - { - consumer = session.createConsumer(_destination); - } - - connection.start(); - - // Start the consumer pre-fetching - // Don't care about response as we will fill the broker up with messages - // after this point and ensure that the client is disconnected at the - // right point. - consumer.receiveNoWait(); - startPublisher(_destination); - - boolean disconnected = _disconnectionLatch.await(DISCONNECTION_WAIT, TimeUnit.SECONDS); - - if (!disconnected && isBroker010()) - { - try - { - ((AMQSession_0_10) session).sync(); - } - catch (AMQException amqe) - { - JMSException jmsException = new JMSException(amqe.getMessage()); - jmsException.setLinkedException(amqe); - jmsException.initCause(amqe); - _connectionException = jmsException; - } - } - - assertTrue("Client was not disconnected.", _connectionException != null); - - Exception linked = _connectionException.getLinkedException(); - - _publisher.join(JOIN_WAIT); - - assertFalse("Publisher still running", _publisher.isAlive()); - - //Validate publishing occurred ok - if (_publisherError != null) - { - throw _publisherError; - } - - // NOTE these exceptions will need to be modeled so that they are not - // 0-8 specific. e.g. JMSSessionClosedException - - assertNotNull("No error received onException listener.", _connectionException); - - assertNotNull("No linked exception set on:" + _connectionException.getMessage(), linked); - - assertEquals("Incorrect linked exception received.", AMQChannelClosedException.class, linked.getClass()); - - AMQChannelClosedException ccException = (AMQChannelClosedException) linked; - - assertEquals("Channel was not closed with correct code.", AMQConstant.RESOURCE_ERROR, ccException.getErrorCode()); - } - - /** - * Create and start an asynchrounous publisher that will send MAX_QUEUE_MESSAGE_COUNT - * messages to the provided destination. Messages are sent in a new connection - * on a transaction. Any error is captured and the test is signalled to exit. - * - * @param destination - */ - private void startPublisher(final Destination destination) - { - _publisher = new Thread(new Runnable() - { - - public void run() - { - try - { - Connection connection = getConnection(); - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - - MessageProducer publisher = session.createProducer(destination); - - for (int count = 0; count < MAX_QUEUE_MESSAGE_COUNT; count++) - { - publisher.send(createNextMessage(session, count)); - session.commit(); - } - } - catch (Exception e) - { - _publisherError = e; - _disconnectionLatch.countDown(); - } - } - }); - - _publisher.start(); - } - - /** - * Test to write: Check that exclusive Transient Queues are not - * disconnected. i.e. JMS Temporary Queues - * - * @param ackMode - * - * @throws Exception - */ - public void exclusiveTransientQueue(int ackMode) throws Exception - { - - } - - /** - * Test that setting messageCount takes affect on topics - * - * We send 10 messages and disconnect at 9 - * - * @throws Exception - */ - public void testTopicConsumerMessageCount() throws Exception - { - MAX_QUEUE_MESSAGE_COUNT = 10; - - setConfigurationProperty("virtualhosts.virtualhost." + - getConnectionURL().getVirtualHost().substring(1) + - ".queues.slow-consumer-detection." + - "messageCount", String.valueOf(MAX_QUEUE_MESSAGE_COUNT - 1)); - - //Start the broker - super.setUp(); - - topicConsumer(Session.AUTO_ACKNOWLEDGE, false); - } - - /** - * Test that setting depth has an effect on topics - * - * Sets the message size for the test - * Sets the depth to be 9 * the depth - * Ensure that sending 10 messages causes the disconnection - * - * @throws Exception - */ - public void testTopicConsumerMessageSize() throws Exception - { - MAX_QUEUE_MESSAGE_COUNT = 10; - - setConfigurationProperty("virtualhosts.virtualhost." + - getConnectionURL().getVirtualHost().substring(1) + - ".queues.slow-consumer-detection." + - "depth", String.valueOf(MESSAGE_SIZE * 9)); - - //Start the broker - super.setUp(); - - setMessageSize(MESSAGE_SIZE); - - topicConsumer(Session.AUTO_ACKNOWLEDGE, false); - } - - /** - * Test that setting messageAge has an effect on topics - * - * Sets the messageAge to be half the disconnection wait timeout - * Send 10 messages and then ensure that we get disconnected as we will - * wait for the full timeout. - * - * @throws Exception - */ - public void testTopicConsumerMessageAge() throws Exception - { - MAX_QUEUE_MESSAGE_COUNT = 10; - - setConfigurationProperty("virtualhosts.virtualhost." + - getConnectionURL().getVirtualHost().substring(1) + - ".queues.slow-consumer-detection." + - "messageAge", String.valueOf(DISCONNECTION_WAIT / 2)); - - //Start the broker - super.setUp(); - - topicConsumer(Session.AUTO_ACKNOWLEDGE, false); - } - - /** - * Test that setting messageCount takes affect on a durable Consumer - * - * Ensure we set the delete-persistent option - * - * We send 10 messages and disconnect at 9 - * - * @throws Exception - */ - - public void testTopicDurableConsumerMessageCount() throws Exception - { - MAX_QUEUE_MESSAGE_COUNT = 10; - - setConfigurationProperty("virtualhosts.virtualhost." + - getConnectionURL().getVirtualHost().substring(1) + - ".queues.slow-consumer-detection." + - "messageCount", String.valueOf(MAX_QUEUE_MESSAGE_COUNT - 1)); - - setConfigurationProperty("virtualhosts.virtualhost." - + getConnectionURL().getVirtualHost().substring(1) + - ".queues.slow-consumer-detection." + - "policy.topicdelete.delete-persistent", ""); - - //Start the broker - super.setUp(); - - topicConsumer(Session.AUTO_ACKNOWLEDGE, true); - } - - /** - * Test that setting depth has an effect on durable consumer topics - * - * Ensure we set the delete-persistent option - * - * Sets the message size for the test - * Sets the depth to be 9 * the depth - * Ensure that sending 10 messages causes the disconnection - * - * @throws Exception - */ - public void testTopicDurableConsumerMessageSize() throws Exception - { - MAX_QUEUE_MESSAGE_COUNT = 10; - - setConfigurationProperty("virtualhosts.virtualhost." + - getConnectionURL().getVirtualHost().substring(1) + - ".queues.slow-consumer-detection." + - "depth", String.valueOf(MESSAGE_SIZE * 9)); - - setConfigurationProperty("virtualhosts.virtualhost." - + getConnectionURL().getVirtualHost().substring(1) + - ".queues.slow-consumer-detection." + - "policy.topicdelete.delete-persistent", ""); - - //Start the broker - super.setUp(); - - setMessageSize(MESSAGE_SIZE); - - topicConsumer(Session.AUTO_ACKNOWLEDGE, true); - } - - /** - * Test that setting messageAge has an effect on topics - * - * Ensure we set the delete-persistent option - * - * Sets the messageAge to be 1/5 the disconnection wait timeout (or 1sec) - * Send 10 messages and then ensure that we get disconnected as we will - * wait for the full timeout. - * - * @throws Exception - */ - public void testTopicDurableConsumerMessageAge() throws Exception - { - MAX_QUEUE_MESSAGE_COUNT = 10; - - setConfigurationProperty("virtualhosts.virtualhost." + - getConnectionURL().getVirtualHost().substring(1) + - ".queues.slow-consumer-detection." + - "messageAge", String.valueOf(DISCONNECTION_WAIT / 5)); - - setConfigurationProperty("virtualhosts.virtualhost." - + getConnectionURL().getVirtualHost().substring(1) + - ".queues.slow-consumer-detection." + - "policy.topicdelete.delete-persistent", ""); - - //Start the broker - super.setUp(); - - topicConsumer(Session.AUTO_ACKNOWLEDGE, true); - } - - // Exception Listener - - public void onException(JMSException e) - { - _connectionException = e; - - System.out.println("***** SCT Received Exception: " + e); - e.printStackTrace(); - - _disconnectionLatch.countDown(); - } - - /// Connection Listener - - public void bytesSent(long count) - { - } - - public void bytesReceived(long count) - { - } - - public boolean preFailover(boolean redirect) - { - // Prevent Failover - return false; - } - - public boolean preResubscribe() - { - return false; - } - - public void failoverComplete() - { - } -} -- cgit v1.2.1