summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/broker-plugins/experimental/slowconsumerdetection/build.xml2
-rw-r--r--java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java12
-rw-r--r--java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java2
-rw-r--r--java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyTest.java248
-rw-r--r--java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java12
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java14
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java35
7 files changed, 297 insertions, 28 deletions
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/build.xml b/java/broker-plugins/experimental/slowconsumerdetection/build.xml
index 4d01b54f2d..e93b5e89de 100644
--- a/java/broker-plugins/experimental/slowconsumerdetection/build.xml
+++ b/java/broker-plugins/experimental/slowconsumerdetection/build.xml
@@ -21,7 +21,7 @@ nn - or more contributor license agreements. See the NOTICE file
<project name="Slow Consumer Disconnect" default="build">
<property name="module.depends" value="common broker broker-plugins"/>
- <property name="module.test.depends" value="broker/test systests client management/common"/>
+ <property name="module.test.depends" value="broker/test common/test systests client management/common"/>
<property name="module.manifest" value="MANIFEST.MF"/>
<property name="module.plugin" value="true"/>
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java
index a413da387b..482168930b 100644
--- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java
+++ b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java
@@ -25,8 +25,6 @@ import org.apache.commons.configuration.ConfigurationException;
import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
-import java.util.List;
-
public class SlowConsumerDetectionPolicyConfiguration extends ConfigurationPlugin
{
@@ -61,4 +59,14 @@ public class SlowConsumerDetectionPolicyConfiguration extends ConfigurationPlugi
return _configuration.getString("name");
}
+ public void setConfiguration(String path, Configuration configuration) throws ConfigurationException
+ {
+ super.setConfiguration(path,configuration);
+
+ if (getPolicyName() == null)
+ {
+ throw new ConfigurationException("No Slow consumer policy defined.");
+ }
+ }
+
}
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java
index e69925f2b1..afd3059a77 100644
--- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java
+++ b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java
@@ -105,8 +105,6 @@ class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin
{
if (config != null)
{
-
-
if ((config.getMessageCount() != 0 && q.getMessageCount() >= config.getMessageCount()) ||
(config.getDepth() != 0 && q.getQueueDepth() >= config.getDepth()) ||
(config.getMessageAge() != 0 && q.getOldestMessageArrivalTime() >= config.getMessageAge()))
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyTest.java b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyTest.java
new file mode 100644
index 0000000000..9a7ab67b85
--- /dev/null
+++ b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyTest.java
@@ -0,0 +1,248 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugin.policies;
+
+import junit.framework.TestCase;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.exchange.DirectExchange;
+import org.apache.qpid.server.exchange.TopicExchange;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MockAMQQueue;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class TopicDeletePolicyTest extends TestCase
+{
+
+ TopicDeletePolicyConfiguration _config;
+
+ VirtualHost _defaultVhost;
+ InternalTestProtocolSession _connection;
+
+ public void setUp() throws ConfigurationException, AMQException
+ {
+ _defaultVhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getDefaultVirtualHost();
+
+ _connection = new InternalTestProtocolSession(_defaultVhost);
+
+ _config = new TopicDeletePolicyConfiguration();
+
+ XMLConfiguration config = new XMLConfiguration();
+
+ _config.setConfiguration("", config);
+ }
+
+ public void tearDown() throws Exception
+ {
+ try
+ {
+ ApplicationRegistry.remove();
+ }
+ finally
+ {
+ super.tearDown();
+ }
+ }
+
+ private MockAMQQueue createOwnedQueue()
+ {
+ MockAMQQueue queue = new MockAMQQueue("testQueue");
+
+ _defaultVhost.getQueueRegistry().registerQueue(queue);
+
+ try
+ {
+ AMQChannel channel = new AMQChannel(_connection, 0, null);
+ _connection.addChannel(channel);
+
+ queue.setExclusiveOwningSession(channel);
+ }
+ catch (AMQException e)
+ {
+ fail("Unable to create Channel:" + e.getMessage());
+ }
+
+ return queue;
+ }
+
+ private void setQueueToAutoDelete(final AMQQueue queue)
+ {
+ ((MockAMQQueue) queue).setAutoDelete(true);
+
+ queue.setDeleteOnNoConsumers(true);
+ final AMQProtocolSession.Task deleteQueueTask =
+ new AMQProtocolSession.Task()
+ {
+ public void doTask(AMQProtocolSession session) throws AMQException
+ {
+ queue.delete();
+ }
+ };
+
+ ((AMQChannel) queue.getExclusiveOwningSession()).getProtocolSession().addSessionCloseTask(deleteQueueTask);
+ }
+
+
+
+ /** Check that a null queue passed in does not upset the policy. */
+ public void testNullQueueParameter()
+ {
+ TopicDeletePolicy policy = new TopicDeletePolicy(_config);
+
+ try
+ {
+ policy.performPolicy(null);
+ }
+ catch (Exception e)
+ {
+ fail("Exception should not be thrown:" + e.getMessage());
+ }
+
+ }
+
+ /**
+ * Set a owning Session to null which means this is not an exclusive queue
+ * so the queue should not be deleted
+ */
+ public void testNonExclusiveQueue()
+ {
+ TopicDeletePolicy policy = new TopicDeletePolicy(_config);
+
+ MockAMQQueue queue = createOwnedQueue();
+
+ queue.setExclusiveOwningSession(null);
+
+ policy.performPolicy(queue);
+
+ assertFalse("Queue should not be deleted", queue.isDeleted());
+ assertFalse("Connection should not be closed", _connection.isClosed());
+ }
+
+ /**
+ * Test that exclusive JMS Queues are not deleted.
+ * Bind the queue to the direct exchange (so it is a JMS Queue).
+ *
+ * JMS Queues are not to be processed so this should not delete the queue.
+ */
+ public void testQueuesAreNotProcessed()
+ {
+ TopicDeletePolicy policy = new TopicDeletePolicy(_config);
+
+ MockAMQQueue queue = createOwnedQueue();
+
+ queue.addBinding(new Binding(null, "bindingKey", queue, new DirectExchange(), null));
+
+ policy.performPolicy(queue);
+
+ assertFalse("Queue should not be deleted", queue.isDeleted());
+ assertFalse("Connection should not be closed", _connection.isClosed());
+ }
+
+
+ /**
+ * Give a non auto-delete queue is bound to the topic exchange the
+ * TopicDeletePolicy will close the connection and delete the queue,
+ */
+ public void testNonAutoDeleteTopicIsNotClosed()
+ {
+ TopicDeletePolicy policy = new TopicDeletePolicy(_config);
+
+ MockAMQQueue queue = createOwnedQueue();
+
+ queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null));
+
+ queue.setAutoDelete(false);
+
+ policy.performPolicy(queue);
+
+ assertFalse("Queue should not be deleted", queue.isDeleted());
+ assertTrue("Connection should be closed", _connection.isClosed());
+ }
+
+ /**
+ * Give a auto-delete queue bound to the topic exchange the TopicDeletePolicy will
+ * close the connection and delete the queue
+ */
+ public void testTopicIsClosed()
+ {
+ TopicDeletePolicy policy = new TopicDeletePolicy(_config);
+
+ final MockAMQQueue queue = createOwnedQueue();
+
+ queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null));
+
+ setQueueToAutoDelete(queue);
+
+ policy.performPolicy(queue);
+
+ assertTrue("Queue should be deleted", queue.isDeleted());
+ assertTrue("Connection should be closed", _connection.isClosed());
+ }
+
+ /**
+ * Give a queue bound to the topic exchange the TopicDeletePolicy will
+ * close the connection and NOT delete the queue
+ */
+ public void testNonAutoDeleteTopicIsClosedNotDeleted() throws AMQException
+ {
+ TopicDeletePolicy policy = new TopicDeletePolicy(_config);
+
+ MockAMQQueue queue = createOwnedQueue();
+
+ queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null));
+
+ policy.performPolicy(queue);
+
+ assertFalse("Queue should not be deleted", queue.isDeleted());
+ assertTrue("Connection should be closed", _connection.isClosed());
+ }
+
+ /**
+ * Give a queue bound to the topic exchange the TopicDeletePolicy suitably
+ * configured with the delete-persistent tag will close the connection
+ * and delete the queue
+ */
+ public void testPersistentTopicIsClosedAndDeleted()
+ {
+ _config.getConfig().addProperty("delete-persistent", "");
+
+ TopicDeletePolicy policy = new TopicDeletePolicy(_config);
+
+ assertTrue("Config was not updated to delete Persistent topics",
+ _config.deletePersistent());
+
+ MockAMQQueue queue = createOwnedQueue();
+
+ queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null));
+
+ policy.performPolicy(queue);
+
+ assertTrue("Queue should be deleted", queue.isDeleted());
+ assertTrue("Connection should be closed", _connection.isClosed());
+ }
+
+}
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java
index b2cb29c33f..99c5b09885 100644
--- a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java
+++ b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java
@@ -73,7 +73,7 @@ public class SlowConsumerTest extends QpidTestCase implements ExceptionListener,
setConfigurationProperty("virtualhosts.virtualhost."
+ getConnectionURL().getVirtualHost().substring(1) +
".queues.slow-consumer-detection." +
- "policy[@name]", "TopicDelete");
+ "policy.name", "TopicDelete");
setConfigurationProperty("virtualhosts.virtualhost."
+ getConnectionURL().getVirtualHost().substring(1) +
@@ -85,7 +85,7 @@ public class SlowConsumerTest extends QpidTestCase implements ExceptionListener,
* Queue Configuration
<slow-consumer-detection>
- <!-- The depth before which the policy will be applied-->
+ <!-- The depth before which the policy will be applied-->
<depth>4235264</depth>
<!-- The message age before which the policy will be applied-->
@@ -96,7 +96,7 @@ public class SlowConsumerTest extends QpidTestCase implements ExceptionListener,
<!-- Policies configuration -->
<policy>
- <name>TopicDelete"</name>
+ <name>TopicDelete</name>
<topicDelete>
<delete-persistent/>
</topicDelete>
@@ -107,10 +107,10 @@ public class SlowConsumerTest extends QpidTestCase implements ExceptionListener,
/**
* Plugin Configuration
- *
+
<slow-consumer-detection>
- <delay>1</delay>
- <timeunit>MINUTES</timeunit>
+ <delay>1</delay>
+ <timeunit>MINUTES</timeunit>
</slow-consumer-detection>
*/
diff --git a/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
index 681e513ecb..3b6cd37ea9 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
@@ -28,10 +28,13 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.transport.TestNetworkDriver;
@@ -196,4 +199,15 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
// The alternative is to fully implement the TestIOSession to return a CloseFuture from close();
// Then the AMQMinaProtocolSession can join on the returning future without a NPE.
}
+
+ public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
+ {
+ super.closeSession(session, cause, message);
+
+ //Simulate the Client responding with a CloseOK
+ // should really update the StateManger but we don't have access here
+ // changeState(AMQState.CONNECTION_CLOSED);
+ ((AMQChannel)session).getProtocolSession().closeSession();
+
+ }
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index 1314a6e9d3..df92879b2e 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -35,10 +35,12 @@ import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.AMQException;
+import javax.swing.*;
import java.util.List;
import java.util.Set;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
public class MockAMQQueue implements AMQQueue
{
@@ -49,6 +51,9 @@ public class MockAMQQueue implements AMQQueue
private PrincipalHolder _principalHolder;
private AMQSessionModel _exclusiveOwner;
+ private AMQShortString _owner;
+ private List<Binding> _bindings = new CopyOnWriteArrayList<Binding>();
+ private boolean _autoDelete;
public MockAMQQueue(String name)
{
@@ -66,17 +71,17 @@ public class MockAMQQueue implements AMQQueue
public void addBinding(final Binding binding)
{
-
+ _bindings.add(binding);
}
public void removeBinding(final Binding binding)
{
-
+ _bindings.remove(binding);
}
public List<Binding> getBindings()
{
- return null;
+ return _bindings;
}
public int getBindingCount()
@@ -171,9 +176,15 @@ public class MockAMQQueue implements AMQQueue
public boolean isAutoDelete()
{
- return false; //To change body of implemented methods use File | Settings | File Templates.
+ return _autoDelete;
}
+ public void setAutoDelete(boolean autodelete)
+ {
+ _autoDelete = autodelete;
+ }
+
+
public AMQShortString getOwner()
{
return null; //To change body of implemented methods use File | Settings | File Templates.
@@ -194,17 +205,6 @@ public class MockAMQQueue implements AMQQueue
return null; //To change body of implemented methods use File | Settings | File Templates.
}
- public void bind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void unBind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
public void registerSubscription(Subscription subscription, boolean exclusive) throws AMQException
{
//To change body of implemented methods use File | Settings | File Templates.
@@ -271,8 +271,9 @@ public class MockAMQQueue implements AMQQueue
}
public int delete() throws AMQException
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ {
+ _deleted = true;
+ return getMessageCount();
}
public void enqueue(ServerMessage message) throws AMQException