diff options
Diffstat (limited to 'java')
7 files changed, 297 insertions, 28 deletions
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/build.xml b/java/broker-plugins/experimental/slowconsumerdetection/build.xml index 4d01b54f2d..e93b5e89de 100644 --- a/java/broker-plugins/experimental/slowconsumerdetection/build.xml +++ b/java/broker-plugins/experimental/slowconsumerdetection/build.xml @@ -21,7 +21,7 @@ nn - or more contributor license agreements. See the NOTICE file <project name="Slow Consumer Disconnect" default="build"> <property name="module.depends" value="common broker broker-plugins"/> - <property name="module.test.depends" value="broker/test systests client management/common"/> + <property name="module.test.depends" value="broker/test common/test systests client management/common"/> <property name="module.manifest" value="MANIFEST.MF"/> <property name="module.plugin" value="true"/> diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java index a413da387b..482168930b 100644 --- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java +++ b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java @@ -25,8 +25,6 @@ import org.apache.commons.configuration.ConfigurationException; import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; -import java.util.List; - public class SlowConsumerDetectionPolicyConfiguration extends ConfigurationPlugin { @@ -61,4 +59,14 @@ public class SlowConsumerDetectionPolicyConfiguration extends ConfigurationPlugi return _configuration.getString("name"); } + public void setConfiguration(String path, Configuration configuration) throws ConfigurationException + { + super.setConfiguration(path,configuration); + + if (getPolicyName() == null) + { + throw new ConfigurationException("No Slow consumer policy defined."); + } + } + } diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java index e69925f2b1..afd3059a77 100644 --- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java +++ b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java @@ -105,8 +105,6 @@ class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin { if (config != null) { - - if ((config.getMessageCount() != 0 && q.getMessageCount() >= config.getMessageCount()) || (config.getDepth() != 0 && q.getQueueDepth() >= config.getDepth()) || (config.getMessageAge() != 0 && q.getOldestMessageArrivalTime() >= config.getMessageAge())) diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyTest.java b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyTest.java new file mode 100644 index 0000000000..9a7ab67b85 --- /dev/null +++ b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyTest.java @@ -0,0 +1,248 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost.plugin.policies; + +import junit.framework.TestCase; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.exchange.DirectExchange; +import org.apache.qpid.server.exchange.TopicExchange; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.InternalTestProtocolSession; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.MockAMQQueue; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; + +public class TopicDeletePolicyTest extends TestCase +{ + + TopicDeletePolicyConfiguration _config; + + VirtualHost _defaultVhost; + InternalTestProtocolSession _connection; + + public void setUp() throws ConfigurationException, AMQException + { + _defaultVhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getDefaultVirtualHost(); + + _connection = new InternalTestProtocolSession(_defaultVhost); + + _config = new TopicDeletePolicyConfiguration(); + + XMLConfiguration config = new XMLConfiguration(); + + _config.setConfiguration("", config); + } + + public void tearDown() throws Exception + { + try + { + ApplicationRegistry.remove(); + } + finally + { + super.tearDown(); + } + } + + private MockAMQQueue createOwnedQueue() + { + MockAMQQueue queue = new MockAMQQueue("testQueue"); + + _defaultVhost.getQueueRegistry().registerQueue(queue); + + try + { + AMQChannel channel = new AMQChannel(_connection, 0, null); + _connection.addChannel(channel); + + queue.setExclusiveOwningSession(channel); + } + catch (AMQException e) + { + fail("Unable to create Channel:" + e.getMessage()); + } + + return queue; + } + + private void setQueueToAutoDelete(final AMQQueue queue) + { + ((MockAMQQueue) queue).setAutoDelete(true); + + queue.setDeleteOnNoConsumers(true); + final AMQProtocolSession.Task deleteQueueTask = + new AMQProtocolSession.Task() + { + public void doTask(AMQProtocolSession session) throws AMQException + { + queue.delete(); + } + }; + + ((AMQChannel) queue.getExclusiveOwningSession()).getProtocolSession().addSessionCloseTask(deleteQueueTask); + } + + + + /** Check that a null queue passed in does not upset the policy. */ + public void testNullQueueParameter() + { + TopicDeletePolicy policy = new TopicDeletePolicy(_config); + + try + { + policy.performPolicy(null); + } + catch (Exception e) + { + fail("Exception should not be thrown:" + e.getMessage()); + } + + } + + /** + * Set a owning Session to null which means this is not an exclusive queue + * so the queue should not be deleted + */ + public void testNonExclusiveQueue() + { + TopicDeletePolicy policy = new TopicDeletePolicy(_config); + + MockAMQQueue queue = createOwnedQueue(); + + queue.setExclusiveOwningSession(null); + + policy.performPolicy(queue); + + assertFalse("Queue should not be deleted", queue.isDeleted()); + assertFalse("Connection should not be closed", _connection.isClosed()); + } + + /** + * Test that exclusive JMS Queues are not deleted. + * Bind the queue to the direct exchange (so it is a JMS Queue). + * + * JMS Queues are not to be processed so this should not delete the queue. + */ + public void testQueuesAreNotProcessed() + { + TopicDeletePolicy policy = new TopicDeletePolicy(_config); + + MockAMQQueue queue = createOwnedQueue(); + + queue.addBinding(new Binding(null, "bindingKey", queue, new DirectExchange(), null)); + + policy.performPolicy(queue); + + assertFalse("Queue should not be deleted", queue.isDeleted()); + assertFalse("Connection should not be closed", _connection.isClosed()); + } + + + /** + * Give a non auto-delete queue is bound to the topic exchange the + * TopicDeletePolicy will close the connection and delete the queue, + */ + public void testNonAutoDeleteTopicIsNotClosed() + { + TopicDeletePolicy policy = new TopicDeletePolicy(_config); + + MockAMQQueue queue = createOwnedQueue(); + + queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null)); + + queue.setAutoDelete(false); + + policy.performPolicy(queue); + + assertFalse("Queue should not be deleted", queue.isDeleted()); + assertTrue("Connection should be closed", _connection.isClosed()); + } + + /** + * Give a auto-delete queue bound to the topic exchange the TopicDeletePolicy will + * close the connection and delete the queue + */ + public void testTopicIsClosed() + { + TopicDeletePolicy policy = new TopicDeletePolicy(_config); + + final MockAMQQueue queue = createOwnedQueue(); + + queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null)); + + setQueueToAutoDelete(queue); + + policy.performPolicy(queue); + + assertTrue("Queue should be deleted", queue.isDeleted()); + assertTrue("Connection should be closed", _connection.isClosed()); + } + + /** + * Give a queue bound to the topic exchange the TopicDeletePolicy will + * close the connection and NOT delete the queue + */ + public void testNonAutoDeleteTopicIsClosedNotDeleted() throws AMQException + { + TopicDeletePolicy policy = new TopicDeletePolicy(_config); + + MockAMQQueue queue = createOwnedQueue(); + + queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null)); + + policy.performPolicy(queue); + + assertFalse("Queue should not be deleted", queue.isDeleted()); + assertTrue("Connection should be closed", _connection.isClosed()); + } + + /** + * Give a queue bound to the topic exchange the TopicDeletePolicy suitably + * configured with the delete-persistent tag will close the connection + * and delete the queue + */ + public void testPersistentTopicIsClosedAndDeleted() + { + _config.getConfig().addProperty("delete-persistent", ""); + + TopicDeletePolicy policy = new TopicDeletePolicy(_config); + + assertTrue("Config was not updated to delete Persistent topics", + _config.deletePersistent()); + + MockAMQQueue queue = createOwnedQueue(); + + queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null)); + + policy.performPolicy(queue); + + assertTrue("Queue should be deleted", queue.isDeleted()); + assertTrue("Connection should be closed", _connection.isClosed()); + } + +} diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java index b2cb29c33f..99c5b09885 100644 --- a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java +++ b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java @@ -73,7 +73,7 @@ public class SlowConsumerTest extends QpidTestCase implements ExceptionListener, setConfigurationProperty("virtualhosts.virtualhost." + getConnectionURL().getVirtualHost().substring(1) + ".queues.slow-consumer-detection." + - "policy[@name]", "TopicDelete"); + "policy.name", "TopicDelete"); setConfigurationProperty("virtualhosts.virtualhost." + getConnectionURL().getVirtualHost().substring(1) + @@ -85,7 +85,7 @@ public class SlowConsumerTest extends QpidTestCase implements ExceptionListener, * Queue Configuration <slow-consumer-detection> - <!-- The depth before which the policy will be applied--> + <!-- The depth before which the policy will be applied--> <depth>4235264</depth> <!-- The message age before which the policy will be applied--> @@ -96,7 +96,7 @@ public class SlowConsumerTest extends QpidTestCase implements ExceptionListener, <!-- Policies configuration --> <policy> - <name>TopicDelete"</name> + <name>TopicDelete</name> <topicDelete> <delete-persistent/> </topicDelete> @@ -107,10 +107,10 @@ public class SlowConsumerTest extends QpidTestCase implements ExceptionListener, /** * Plugin Configuration - * + <slow-consumer-detection> - <delay>1</delay> - <timeunit>MINUTES</timeunit> + <delay>1</delay> + <timeunit>MINUTES</timeunit> </slow-consumer-detection> */ diff --git a/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java index 681e513ecb..3b6cd37ea9 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java +++ b/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java @@ -28,10 +28,13 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.state.AMQState; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.message.MessageContentSource; import org.apache.qpid.transport.TestNetworkDriver; @@ -196,4 +199,15 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr // The alternative is to fully implement the TestIOSession to return a CloseFuture from close(); // Then the AMQMinaProtocolSession can join on the returning future without a NPE. } + + public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException + { + super.closeSession(session, cause, message); + + //Simulate the Client responding with a CloseOK + // should really update the StateManger but we don't have access here + // changeState(AMQState.CONNECTION_CLOSED); + ((AMQChannel)session).getProtocolSession().closeSession(); + + } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index 1314a6e9d3..df92879b2e 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -35,10 +35,12 @@ import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.AMQException; +import javax.swing.*; import java.util.List; import java.util.Set; import java.util.Map; import java.util.UUID; +import java.util.concurrent.CopyOnWriteArrayList; public class MockAMQQueue implements AMQQueue { @@ -49,6 +51,9 @@ public class MockAMQQueue implements AMQQueue private PrincipalHolder _principalHolder; private AMQSessionModel _exclusiveOwner; + private AMQShortString _owner; + private List<Binding> _bindings = new CopyOnWriteArrayList<Binding>(); + private boolean _autoDelete; public MockAMQQueue(String name) { @@ -66,17 +71,17 @@ public class MockAMQQueue implements AMQQueue public void addBinding(final Binding binding) { - + _bindings.add(binding); } public void removeBinding(final Binding binding) { - + _bindings.remove(binding); } public List<Binding> getBindings() { - return null; + return _bindings; } public int getBindingCount() @@ -171,9 +176,15 @@ public class MockAMQQueue implements AMQQueue public boolean isAutoDelete() { - return false; //To change body of implemented methods use File | Settings | File Templates. + return _autoDelete; } + public void setAutoDelete(boolean autodelete) + { + _autoDelete = autodelete; + } + + public AMQShortString getOwner() { return null; //To change body of implemented methods use File | Settings | File Templates. @@ -194,17 +205,6 @@ public class MockAMQQueue implements AMQQueue return null; //To change body of implemented methods use File | Settings | File Templates. } - public void bind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void unBind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void registerSubscription(Subscription subscription, boolean exclusive) throws AMQException { //To change body of implemented methods use File | Settings | File Templates. @@ -271,8 +271,9 @@ public class MockAMQQueue implements AMQQueue } public int delete() throws AMQException - { - return 0; //To change body of implemented methods use File | Settings | File Templates. + { + _deleted = true; + return getMessageCount(); } public void enqueue(ServerMessage message) throws AMQException |
