diff options
| author | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-07-22 13:09:56 +0000 |
|---|---|---|
| committer | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-07-22 13:09:56 +0000 |
| commit | 8dbbb9ad305ca936fb924420ec31e27e9a9d0caf (patch) | |
| tree | 949ab5a030574ea199ee9821367eedb0c84131b6 /java/broker | |
| parent | cfb3a1ef5b743d68a2a78754ef0fdc750378d3cc (diff) | |
| download | qpid-python-8dbbb9ad305ca936fb924420ec31e27e9a9d0caf.tar.gz | |
QPID-2682: Move slow consumer disconnection mechanism to the broker
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@966637 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
17 files changed, 1919 insertions, 40 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionConfiguration.java new file mode 100644 index 0000000000..dd63c9b698 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionConfiguration.java @@ -0,0 +1,92 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.configuration.plugin; + +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.ConversionException; +import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; +import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class SlowConsumerDetectionConfiguration extends ConfigurationPlugin +{ + public static class SlowConsumerDetectionConfigurationFactory implements ConfigurationPluginFactory + { + public ConfigurationPlugin newInstance(String path, Configuration config) throws ConfigurationException + { + SlowConsumerDetectionConfiguration slowConsumerConfig = new SlowConsumerDetectionConfiguration(); + slowConsumerConfig.setConfiguration(path, config); + return slowConsumerConfig; + } + + public List<String> getParentPaths() + { + return Arrays.asList("virtualhosts.virtualhost.slow-consumer-detection"); + } + } + + //Set Default time unit to seconds + TimeUnit _timeUnit = TimeUnit.SECONDS; + + public String[] getElementsProcessed() + { + return new String[]{"delay", + "timeunit"}; + } + + public long getDelay() + { + return getLongValue("delay", 10); + } + + public TimeUnit getTimeUnit() + { + return _timeUnit; + } + + @Override + public void validateConfiguration() throws ConfigurationException + { + validatePositiveLong("delay"); + + String timeUnit = getStringValue("timeunit"); + + if (timeUnit != null) + { + try + { + _timeUnit = TimeUnit.valueOf(timeUnit.toUpperCase()); + } + catch (IllegalArgumentException iae) + { + throw new ConfigurationException("Unable to configure Slow Consumer Detection invalid TimeUnit:" + timeUnit); + } + } + + System.out.println("Configured SCDC"); + System.out.println("Delay:" + getDelay()); + System.out.println("TimeUnit:" + getTimeUnit()); + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java new file mode 100644 index 0000000000..8e2ecff6fb --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java @@ -0,0 +1,76 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.configuration.plugin; + +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; +import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; + +import java.util.Arrays; +import java.util.List; + +public class SlowConsumerDetectionPolicyConfiguration extends ConfigurationPlugin +{ + public static class SlowConsumerDetectionPolicyConfigurationFactory implements ConfigurationPluginFactory + { + public ConfigurationPlugin newInstance(String path, Configuration config) throws ConfigurationException + { + SlowConsumerDetectionPolicyConfiguration slowConsumerConfig = new SlowConsumerDetectionPolicyConfiguration(); + slowConsumerConfig.setConfiguration(path, config); + return slowConsumerConfig; + } + + public List<String> getParentPaths() + { + return Arrays.asList( + "virtualhosts.virtualhost.queues.slow-consumer-detection.policy", + "virtualhosts.virtualhost.queues.queue.slow-consumer-detection.policy", + "virtualhosts.virtualhost.topics.slow-consumer-detection.policy", + "virtualhosts.virtualhost.topics.topic.slow-consumer-detection.policy"); + } + } + + public String[] getElementsProcessed() + { + return new String[]{"name"}; + } + + public String getPolicyName() + { + return getStringValue("name"); + } + + @Override + public void validateConfiguration() throws ConfigurationException + { + if (getPolicyName() == null) + { + throw new ConfigurationException("No Slow consumer policy defined."); + } + } + + @Override + public String formatToString() + { + return "Policy:"+getPolicyName(); + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java new file mode 100644 index 0000000000..58131760da --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java @@ -0,0 +1,153 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.configuration.plugin; + +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; +import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; +import org.apache.qpid.server.plugins.PluginManager; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPlugin; +import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPluginFactory; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +public class SlowConsumerDetectionQueueConfiguration extends ConfigurationPlugin +{ + private SlowConsumerPolicyPlugin _policyPlugin; + + public static class SlowConsumerDetectionQueueConfigurationFactory implements ConfigurationPluginFactory + { + public ConfigurationPlugin newInstance(String path, Configuration config) throws ConfigurationException + { + SlowConsumerDetectionQueueConfiguration slowConsumerConfig = new SlowConsumerDetectionQueueConfiguration(); + slowConsumerConfig.setConfiguration(path, config); + return slowConsumerConfig; + } + + public List<String> getParentPaths() + { + return Arrays.asList( + "virtualhosts.virtualhost.queues.slow-consumer-detection", + "virtualhosts.virtualhost.queues.queue.slow-consumer-detection", + "virtualhosts.virtualhost.topics.slow-consumer-detection", + "virtualhosts.virtualhost.topics.topic.slow-consumer-detection"); + } + } + + public String[] getElementsProcessed() + { + return new String[]{"messageAge", + "depth", + "messageCount"}; + } + + public long getMessageAge() + { + return getLongValue("messageAge"); + } + + public long getDepth() + { + return getLongValue("depth"); + } + + public long getMessageCount() + { + return getLongValue("messageCount"); + } + + public SlowConsumerPolicyPlugin getPolicy() + { + return _policyPlugin; + } + + @Override + public void validateConfiguration() throws ConfigurationException + { + if (!containsPositiveLong("messageAge") && + !containsPositiveLong("depth") && + !containsPositiveLong("messageCount")) + { + throw new ConfigurationException("At least one configuration property" + + "('messageAge','depth' or 'messageCount') must be specified."); + } + + SlowConsumerDetectionPolicyConfiguration policyConfig = getConfiguration(SlowConsumerDetectionPolicyConfiguration.class.getName()); + + PluginManager pluginManager = ApplicationRegistry.getInstance().getPluginManager(); + Map<String, SlowConsumerPolicyPluginFactory> factories = pluginManager.getSlowConsumerPlugins(); + + if (policyConfig == null) + { + throw new ConfigurationException("No Slow Consumer Policy specified. Known Policies:" + factories.keySet()); + } + + if (_logger.isDebugEnabled()) + { + Iterator<?> keys = policyConfig.getConfig().getKeys(); + + while (keys.hasNext()) + { + String key = (String) keys.next(); + + _logger.debug("Policy Keys:" + key); + } + + } + + SlowConsumerPolicyPluginFactory<SlowConsumerPolicyPlugin> pluginFactory = factories.get(policyConfig.getPolicyName().toLowerCase()); + + if (pluginFactory == null) + { + throw new ConfigurationException("Unknown Slow Consumer Policy specified:" + policyConfig.getPolicyName() + " Known Policies:" + factories.keySet()); + } + + _policyPlugin = pluginFactory.newInstance(policyConfig); + + // Debug the creation of this Config + _logger.debug(this); + } + + public String formatToString() + { + StringBuilder sb = new StringBuilder(); + if (getMessageAge() > 0) + { + sb.append("Age:").append(getMessageAge()).append(":"); + } + if (getDepth() > 0) + { + sb.append("Depth:").append(getDepth()).append(":"); + } + if (getMessageCount() > 0) + { + sb.append("Count:").append(getMessageCount()).append(":"); + } + + sb.append("Policy[").append(getPolicy()).append("]"); + return sb.toString(); + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java b/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java index 717f0d1bee..97c43b940b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java @@ -35,6 +35,9 @@ import org.apache.felix.framework.util.StringMap; import org.apache.log4j.Logger; import org.apache.qpid.common.Closeable; import org.apache.qpid.server.configuration.TopicConfiguration; +import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionConfiguration.SlowConsumerDetectionConfigurationFactory; +import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionPolicyConfiguration.SlowConsumerDetectionPolicyConfigurationFactory; +import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionQueueConfiguration.SlowConsumerDetectionQueueConfigurationFactory; import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; import org.apache.qpid.server.exchange.ExchangeType; import org.apache.qpid.server.security.SecurityManager; @@ -43,6 +46,9 @@ import org.apache.qpid.server.security.access.plugins.AllowAll; import org.apache.qpid.server.security.access.plugins.DenyAll; import org.apache.qpid.server.security.access.plugins.LegacyAccess; import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory; +import org.apache.qpid.server.virtualhost.plugin.SlowConsumerDetection; +import org.apache.qpid.server.virtualhost.plugin.policies.TopicDeletePolicy; +import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPluginFactory; import org.osgi.framework.BundleActivator; import org.osgi.framework.BundleException; import org.osgi.framework.launch.Framework; @@ -57,7 +63,6 @@ public class PluginManager implements Closeable private static final Logger _logger = Logger.getLogger(PluginManager.class); private static final int FELIX_STOP_TIMEOUT = 30000; - private static final String VERSION = "2.6.0.4"; private Framework _felix; @@ -65,11 +70,14 @@ public class PluginManager implements Closeable private ServiceTracker _securityTracker = null; private ServiceTracker _configTracker = null; private ServiceTracker _virtualHostTracker = null; + private ServiceTracker _policyTracker = null; private Activator _activator; private Map<String, SecurityPluginFactory> _securityPlugins = new HashMap<String, SecurityPluginFactory>(); private Map<List<String>, ConfigurationPluginFactory> _configPlugins = new IdentityHashMap<List<String>, ConfigurationPluginFactory>(); + private Map<String, VirtualHostPluginFactory> _vhostPlugins = new HashMap<String, VirtualHostPluginFactory>(); + private Map<String, SlowConsumerPolicyPluginFactory> _policyPlugins = new HashMap<String, SlowConsumerPolicyPluginFactory>(); public PluginManager(String pluginPath, String cachePath) throws Exception { @@ -85,10 +93,23 @@ public class PluginManager implements Closeable SecurityManager.SecurityConfiguration.FACTORY, AllowAll.AllowAllConfiguration.FACTORY, DenyAll.DenyAllConfiguration.FACTORY, - LegacyAccess.LegacyAccessConfiguration.FACTORY)) + LegacyAccess.LegacyAccessConfiguration.FACTORY, + new SlowConsumerDetectionConfigurationFactory(), + new SlowConsumerDetectionPolicyConfigurationFactory(), + new SlowConsumerDetectionQueueConfigurationFactory())) { _configPlugins.put(configFactory.getParentPaths(), configFactory); } + for (SlowConsumerPolicyPluginFactory pluginFactory : Arrays.asList( + new TopicDeletePolicy.TopicDeletePolicyFactory())) + { + _policyPlugins.put(pluginFactory.getPluginName(), pluginFactory); + } + for (VirtualHostPluginFactory pluginFactory : Arrays.asList( + new SlowConsumerDetection.SlowConsumerFactory())) + { + _vhostPlugins.put(pluginFactory.getClass().getName(), pluginFactory); + } // Check the plugin directory path is set and exist if (pluginPath == null) @@ -117,6 +138,7 @@ public class PluginManager implements Closeable "org.apache.qpid.common; version=0.7," + "org.apache.qpid.exchange; version=0.7," + "org.apache.qpid.framing; version=0.7," + + "org.apache.qpid.management.common.mbeans.annotations; version=0.7," + "org.apache.qpid.protocol; version=0.7," + "org.apache.qpid.server.binding; version=0.7," + "org.apache.qpid.server.configuration; version=0.7," + @@ -157,7 +179,7 @@ public class PluginManager implements Closeable configMap.put(SYSTEMBUNDLE_ACTIVATORS_PROP, activators); if (cachePath != null) - { + { File cacheDir = new File(cachePath); if (!cacheDir.exists() && cacheDir.canWrite()) { @@ -204,12 +226,11 @@ public class PluginManager implements Closeable _virtualHostTracker = new ServiceTracker(_activator.getContext(), VirtualHostPluginFactory.class.getName(), null); _virtualHostTracker.open(); - - _logger.info("Opened service trackers"); + + _policyTracker = new ServiceTracker(_activator.getContext(), SlowConsumerPolicyPluginFactory.class.getName(), null); + _policyTracker.open(); - // Load security and configuration plugins from their trackers for access - _configPlugins.putAll(getConfigurationServices()); - _securityPlugins.putAll(getPlugins(SecurityPluginFactory.class)); + _logger.info("Opened service trackers"); } private static <T> Map<String, T> getServices(ServiceTracker tracker) @@ -234,11 +255,18 @@ public class PluginManager implements Closeable return services; } - private Map<List<String>, ConfigurationPluginFactory> getConfigurationServices() + public static <T> Map<String, T> getServices(ServiceTracker tracker, Map<String, T> plugins) + { + Map<String, T> services = getServices(tracker); + services.putAll(plugins); + return services; + } + + public Map<List<String>, ConfigurationPluginFactory> getConfigurationPlugins() { Map<List<String>, ConfigurationPluginFactory> services = new IdentityHashMap<List<String>, ConfigurationPluginFactory>(); - if (_configTracker.getServices() != null) + if (_configTracker != null && _configTracker.getServices() != null) { for (Object service : _configTracker.getServices()) { @@ -246,49 +274,30 @@ public class PluginManager implements Closeable services.put(factory.getParentPaths(), factory); } } + + services.putAll(_configPlugins); return services; } - public Map<String, ExchangeType<?>> getExchanges() - { - return getServices(_exchangeTracker); + public Map<String, VirtualHostPluginFactory> getVirtualHostPlugins() + { + return getServices(_virtualHostTracker, _vhostPlugins); } - public Map<String, VirtualHostPluginFactory> getVirtualHostPlugins() - { - return getServices(_virtualHostTracker); + public Map<String, SlowConsumerPolicyPluginFactory> getSlowConsumerPlugins() + { + return getServices(_policyTracker, _policyPlugins); } - public <P extends PluginFactory<?>> Map<String, P> getPlugins(Class<P> plugin) + public Map<String, ExchangeType<?>> getExchanges() { - // If plugins are not configured then return an empty set - if (_activator == null) - { - return new HashMap<String, P>(); - } - - ServiceTracker tracker = new ServiceTracker(_activator.getContext(), plugin.getName(), null); - tracker.open(); - - try - { - return getServices(tracker); - } - finally - { - tracker.close(); - } + return getServices(_exchangeTracker); } public Map<String, SecurityPluginFactory> getSecurityPlugins() { - return _securityPlugins; - } - - public Map<List<String>, ConfigurationPluginFactory> getConfigurationPlugins() - { - return _configPlugins; + return getServices(_securityTracker, _securityPlugins); } public void close() @@ -302,6 +311,7 @@ public class PluginManager implements Closeable _securityTracker.close(); _configTracker.close(); _virtualHostTracker.close(); + _policyTracker.close(); } finally { diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugin/ConfiguredQueueBindingListener.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugin/ConfiguredQueueBindingListener.java new file mode 100644 index 0000000000..d947e9a367 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugin/ConfiguredQueueBindingListener.java @@ -0,0 +1,86 @@ +package org.apache.qpid.server.virtualhost.plugin; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionQueueConfiguration; +import org.apache.qpid.server.exchange.AbstractExchange; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.exchange.Exchange.BindingListener; +import org.apache.qpid.server.queue.AMQQueue; + +/** + * This is a listener that caches queues that are configured for slow consumer disconnection. + * + * There should be one listener per virtual host, which can be added to all exchanges on + * that host. + * + * TODO In future, it will be possible to configure the policy at runtime, so only the queue + * itself is cached, and the configuration looked up by the housekeeping thread. This means + * that there may be occasions where the copy of the cache contents retrieved by the thread + * does not contain queues that are configured, or that configured queues are not present. + * + * @see BindingListener + */ +public class ConfiguredQueueBindingListener implements BindingListener +{ + private static final Logger _log = Logger.getLogger(ConfiguredQueueBindingListener.class); + + private String _vhostName; + private Set<AMQQueue> _cache = Collections.synchronizedSet(new HashSet<AMQQueue>()); + + public ConfiguredQueueBindingListener(String vhostName) + { + _vhostName = vhostName; + } + + /** + * @see BindingListener#bindingAdded(Exchange, Binding) + */ + public void bindingAdded(Exchange exchange, Binding binding) + { + processBinding(binding); + } + + /** + * @see BindingListener#bindingRemoved(Exchange, Binding) + */ + public void bindingRemoved(Exchange exchange, Binding binding) + { + processBinding(binding); + } + + private void processBinding(Binding binding) + { + AMQQueue queue = binding.getQueue(); + + SlowConsumerDetectionQueueConfiguration config = + queue.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName()); + if (config != null) + { + _cache.add(queue); + } + else + { + _cache.remove(queue); + } + } + + /** + * Lookup and return the cache of configured {@link AMQQueue}s. + * + * Note that when accessing the cached queues, the {@link Iterator} is not thread safe + * (see the {@link Collections#synchronizedSet(Set)} documentation) so a copy of the + * cache is returned. + * + * @return a copy of the cached {@link java.util.Set} of queues + */ + public Set<AMQQueue> getQueueCache() + { + return new HashSet<AMQQueue>(_cache); + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java new file mode 100644 index 0000000000..6acb0bc11e --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java @@ -0,0 +1,161 @@ +/* + * + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost.plugin; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionConfiguration; +import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionQueueConfiguration; +import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.plugins.Plugin; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.virtualhost.plugin.logging.SlowConsumerDetectionMessages; +import org.apache.qpid.server.virtualhost.plugins.VirtualHostHouseKeepingPlugin; +import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory; + +public class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin +{ + private SlowConsumerDetectionConfiguration _config; + private ConfiguredQueueBindingListener _listener; + + public static class SlowConsumerFactory implements VirtualHostPluginFactory + { + public SlowConsumerDetection newInstance(VirtualHost vhost) + { + SlowConsumerDetectionConfiguration config = vhost.getConfiguration().getConfiguration(SlowConsumerDetectionConfiguration.class.getName()); + + if (config == null) + { + return null; + } + + SlowConsumerDetection plugin = new SlowConsumerDetection(vhost); + plugin.configure(config); + return plugin; + } + } + + /** + * Configures the slow consumer disconnect plugin by adding a listener to each exchange on this + * cirtual host to record all the configured queues in a cache for processing by the housekeeping + * thread. + * + * @see Plugin#configure(ConfigurationPlugin) + */ + public void configure(ConfigurationPlugin config) + { + _config = (SlowConsumerDetectionConfiguration) config; + _listener = new ConfiguredQueueBindingListener(_virtualhost.getName()); + for (AMQShortString exchangeName : _virtualhost.getExchangeRegistry().getExchangeNames()) + { + _virtualhost.getExchangeRegistry().getExchange(exchangeName).addBindingListener(_listener); + } + } + + public SlowConsumerDetection(VirtualHost vhost) + { + super(vhost); + } + + public void execute() + { + CurrentActor.get().message(SlowConsumerDetectionMessages.RUNNING()); + + Set<AMQQueue> cache = _listener.getQueueCache(); + for (AMQQueue q : cache) + { + CurrentActor.get().message(SlowConsumerDetectionMessages.CHECKING_QUEUE(q.getName())); + + try + { + SlowConsumerDetectionQueueConfiguration config = + q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName()); + if (checkQueueStatus(q, config)) + { + config.getPolicy().performPolicy(q); + } + } + catch (Exception e) + { + // Don't throw exceptions as this will stop the house keeping task from running. + _logger.error("Exception in SlowConsumersDetection for queue: " + q.getName(), e); + } + } + + CurrentActor.get().message(SlowConsumerDetectionMessages.COMPLETE()); + } + + public long getDelay() + { + return _config.getDelay(); + } + + public TimeUnit getTimeUnit() + { + return _config.getTimeUnit(); + } + + /** + * Check the depth,messageSize,messageAge,messageCount values for this q + * + * @param q the queue to check + * @param config the queue configuration to compare against the queue state + * + * @return true if the queue has reached a threshold. + */ + private boolean checkQueueStatus(AMQQueue q, SlowConsumerDetectionQueueConfiguration config) + { + if (config != null) + { + _logger.info("Retrieved Queue(" + q.getName() + ") Config:" + config); + + int count = q.getMessageCount(); + + // First Check message counts + if ((config.getMessageCount() != 0 && count >= config.getMessageCount()) || + // The check queue depth + (config.getDepth() != 0 && q.getQueueDepth() >= config.getDepth()) || + // finally if we have messages on the queue check Arrival time. + // We must check count as OldestArrival time is Long.MAX_LONG when + // there are no messages. + (config.getMessageAge() != 0 && + ((count > 0) && q.getOldestMessageArrivalTime() >= config.getMessageAge()))) + { + + if (_logger.isDebugEnabled()) + { + _logger.debug("Detected Slow Consumer on Queue(" + q.getName() + ")"); + _logger.debug("Queue Count:" + q.getMessageCount() + ":" + config.getMessageCount()); + _logger.debug("Queue Depth:" + q.getQueueDepth() + ":" + config.getDepth()); + _logger.debug("Queue Arrival:" + q.getOldestMessageArrivalTime() + ":" + config.getMessageAge()); + } + + return true; + } + } + return false; + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugin/logging/SlowConsumerDetection_logmessages.properties b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugin/logging/SlowConsumerDetection_logmessages.properties new file mode 100644 index 0000000000..2714935a71 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugin/logging/SlowConsumerDetection_logmessages.properties @@ -0,0 +1,4 @@ +#SlowConsumerDetection.logMessages +RUNNING = SCD-1001 : Running +COMPLETE = SCD-1002 : Complete +CHECKING_QUEUE = SCD-1003 : Checking Status of Queue {0}
\ No newline at end of file diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugin/logging/TopicDeletePolicy_logmessages.properties b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugin/logging/TopicDeletePolicy_logmessages.properties new file mode 100644 index 0000000000..d0f5965c39 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugin/logging/TopicDeletePolicy_logmessages.properties @@ -0,0 +1,3 @@ +#TopicDeletePolicy.logMessages +DELETING_QUEUE = TDP-1001 : Deleting Queue +DISCONNECTING = TDP-1002 : Disconnecting Session
\ No newline at end of file diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java new file mode 100644 index 0000000000..3bd4ae8d4e --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java @@ -0,0 +1,141 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost.plugin.policies; + +import org.apache.commons.configuration.ConfigurationException; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionPolicyConfiguration; +import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; +import org.apache.qpid.server.exchange.TopicExchange; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.virtualhost.plugin.logging.TopicDeletePolicyMessages; +import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPlugin; +import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPluginFactory; + +public class TopicDeletePolicy implements SlowConsumerPolicyPlugin +{ + Logger _logger = Logger.getLogger(TopicDeletePolicy.class); + private TopicDeletePolicyConfiguration _configuration; + + public static class TopicDeletePolicyFactory implements SlowConsumerPolicyPluginFactory + { + public TopicDeletePolicy newInstance(ConfigurationPlugin configuration) throws ConfigurationException + { + TopicDeletePolicyConfiguration config = + configuration.getConfiguration(TopicDeletePolicyConfiguration.class.getName()); + + TopicDeletePolicy policy = new TopicDeletePolicy(); + policy.configure(config); + return policy; + } + + public String getPluginName() + { + return "topicdelete"; + } + + public Class<TopicDeletePolicy> getPluginClass() + { + return TopicDeletePolicy.class; + } + } + + public void performPolicy(AMQQueue q) + { + if (q == null) + { + return; + } + + AMQSessionModel owner = q.getExclusiveOwningSession(); + + // Only process exclusive queues + if (owner == null) + { + return; + } + + //Only process Topics + if (!validateQueueIsATopic(q)) + { + return; + } + + try + { + CurrentActor.get().message(owner.getLogSubject(),TopicDeletePolicyMessages.DISCONNECTING()); + // Close the consumer . this will cause autoDelete Queues to be purged + owner.getConnectionModel(). + closeSession(owner, AMQConstant.RESOURCE_ERROR, + "Consuming to slow."); + + // Actively delete non autoDelete queues if deletePersistent is set + if (!q.isAutoDelete() && (_configuration != null && _configuration.deletePersistent())) + { + CurrentActor.get().message(q.getLogSubject(), TopicDeletePolicyMessages.DELETING_QUEUE()); + q.delete(); + } + + } + catch (AMQException e) + { + _logger.warn("Unable to close consumer:" + owner + ", on queue:" + q.getName()); + } + + } + + /** + * Check the queue bindings to validate the queue is bound to the + * topic exchange. + * + * @param q the Queue + * + * @return true iff Q is bound to a TopicExchange + */ + private boolean validateQueueIsATopic(AMQQueue q) + { + for (Binding binding : q.getBindings()) + { + if (binding.getExchange() instanceof TopicExchange) + { + return true; + } + } + + return false; + } + + public void configure(ConfigurationPlugin config) + { + _configuration = (TopicDeletePolicyConfiguration) config; + } + + @Override + public String toString() + { + return "TopicDelete" + (_configuration == null ? "" : "[" + _configuration + "]"); + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfiguration.java new file mode 100644 index 0000000000..e6ad1cbcc3 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfiguration.java @@ -0,0 +1,81 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost.plugin.policies; + +import java.util.Arrays; +import java.util.List; + +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; +import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; + +public class TopicDeletePolicyConfiguration extends ConfigurationPlugin +{ + + public static class TopicDeletePolicyConfigurationFactory + implements ConfigurationPluginFactory + { + public ConfigurationPlugin newInstance(String path, + Configuration config) + throws ConfigurationException + { + TopicDeletePolicyConfiguration slowConsumerConfig = + new TopicDeletePolicyConfiguration(); + slowConsumerConfig.setConfiguration(path, config); + return slowConsumerConfig; + } + + public List<String> getParentPaths() + { + return Arrays.asList( + "virtualhosts.virtualhost.queues.slow-consumer-detection.policy.topicDelete", + "virtualhosts.virtualhost.queues.queue.slow-consumer-detection.policy.topicDelete", + "virtualhosts.virtualhost.topics.slow-consumer-detection.policy.topicDelete", + "virtualhosts.virtualhost.topics.topic.slow-consumer-detection.policy.topicDelete"); + } + } + + public String[] getElementsProcessed() + { + return new String[]{"delete-persistent"}; + } + + @Override + public void validateConfiguration() throws ConfigurationException + { + // No validation required. + } + + public boolean deletePersistent() + { + // If we don't have configuration then we don't deletePersistent Queues + return (hasConfiguration() && contains("delete-persistent")); + } + + @Override + public String formatToString() + { + return (deletePersistent()?"delete-durable":""); + } + + +} diff --git a/java/broker/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPlugin.java b/java/broker/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPlugin.java new file mode 100644 index 0000000000..7f600abdc9 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPlugin.java @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.slowconsumerdetection.policies; + +import org.apache.qpid.server.plugins.Plugin; +import org.apache.qpid.server.queue.AMQQueue; + +public interface SlowConsumerPolicyPlugin extends Plugin +{ + public void performPolicy(AMQQueue Queue); +} diff --git a/java/broker/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPluginFactory.java b/java/broker/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPluginFactory.java new file mode 100644 index 0000000000..b2fe6766a6 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPluginFactory.java @@ -0,0 +1,27 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.slowconsumerdetection.policies; + +import org.apache.qpid.server.plugins.PluginFactory; + +public interface SlowConsumerPolicyPluginFactory<P extends SlowConsumerPolicyPlugin> extends PluginFactory<P> +{ +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionConfigurationTest.java b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionConfigurationTest.java new file mode 100644 index 0000000000..40dc382d30 --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionConfigurationTest.java @@ -0,0 +1,346 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost.plugin; + +import org.apache.commons.configuration.CompositeConfiguration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionConfiguration; +import org.apache.qpid.server.util.InternalBrokerBaseCase; + +import java.util.concurrent.TimeUnit; + +/** + * Provide Unit Test coverage of the virtualhost SlowConsumer Configuration + * This is what controls how often the plugin will execute + */ +public class SlowConsumerDetectionConfigurationTest extends InternalBrokerBaseCase +{ + + /** + * Default Testing: + * + * Provide a fully complete and valid configuration specifying 'delay' and + * 'timeunit' and ensure that it is correctly processed. + * + * Ensure no exceptions are thrown and that we get the same values back that + * were put into the configuration. + */ + public void testConfigLoadingValidConfig() + { + SlowConsumerDetectionConfiguration config = new SlowConsumerDetectionConfiguration(); + + XMLConfiguration xmlconfig = new XMLConfiguration(); + + long DELAY=10; + String TIMEUNIT=TimeUnit.MICROSECONDS.toString(); + xmlconfig.addProperty("delay", String.valueOf(DELAY)); + xmlconfig.addProperty("timeunit", TIMEUNIT); + + // Create a CompositeConfiguration as this is what the broker uses + CompositeConfiguration composite = new CompositeConfiguration(); + composite.addConfiguration(xmlconfig); + + try + { + config.setConfiguration("", composite); + } + catch (ConfigurationException e) + { + e.printStackTrace(); + fail(e.getMessage()); + } + + assertEquals("Delay not correctly returned.", DELAY, config.getDelay()); + assertEquals("TimeUnit not correctly returned.", + TIMEUNIT, String.valueOf(config.getTimeUnit())); + } + + /** + * Default Testing: + * + * Test Missing TimeUnit value gets default. + * + * The TimeUnit value is optional and default to SECONDS. + * + * Test that if we do not specify a TimeUnit then we correctly get seconds. + * + * Also verify that relying on the default does not impact the setting of + * the 'delay' value. + * + */ + public void testConfigLoadingMissingTimeUnitDefaults() + { + SlowConsumerDetectionConfiguration config = new SlowConsumerDetectionConfiguration(); + + XMLConfiguration xmlconfig = new XMLConfiguration(); + + long DELAY=10; + xmlconfig.addProperty("delay", String.valueOf(DELAY)); + + // Create a CompositeConfiguration as this is what the broker uses + CompositeConfiguration composite = new CompositeConfiguration(); + composite.addConfiguration(xmlconfig); + try + { + config.setConfiguration("", composite); + } + catch (ConfigurationException e) + { + e.printStackTrace(); + fail(e.getMessage()); + } + + assertEquals("Delay not correctly returned.", DELAY, config.getDelay()); + assertEquals("Default TimeUnit incorrect", TimeUnit.SECONDS, config.getTimeUnit()); + } + + /** + * Input Testing: + * + * TimeUnit parsing requires the String value be in UpperCase. + * Ensure we can handle when the user doesn't know this. + * + * Same test as 'testConfigLoadingValidConfig' but checking that + * the timeunit field is not case sensitive. + * i.e. the toUpper is being correctly applied. + */ + public void testConfigLoadingValidConfigStrangeTimeUnit() + { + SlowConsumerDetectionConfiguration config = new SlowConsumerDetectionConfiguration(); + + XMLConfiguration xmlconfig = new XMLConfiguration(); + + long DELAY=10; + + xmlconfig.addProperty("delay", DELAY); + xmlconfig.addProperty("timeunit", "MiCrOsEcOnDs"); + + // Create a CompositeConfiguration as this is what the broker uses + CompositeConfiguration composite = new CompositeConfiguration(); + composite.addConfiguration(xmlconfig); + + try + { + config.setConfiguration("", composite); + } + catch (ConfigurationException e) + { + e.printStackTrace(); + fail(e.getMessage()); + } + + assertEquals("Delay not correctly returned.", DELAY, config.getDelay()); + assertEquals("TimeUnit not correctly returned.", + TimeUnit.MICROSECONDS.toString(), String.valueOf(config.getTimeUnit())); + + } + + /** + * Failure Testing: + * + * Test that delay must be long not a string value. + * Provide a delay as a written value not a long. 'ten'. + * + * This should throw a configuration exception which is being trapped and + * verified to be the right exception, a NumberFormatException. + * + */ + public void testConfigLoadingInValidDelayString() + { + SlowConsumerDetectionConfiguration config = new SlowConsumerDetectionConfiguration(); + + XMLConfiguration xmlconfig = new XMLConfiguration(); + + xmlconfig.addProperty("delay", "ten"); + xmlconfig.addProperty("timeunit", TimeUnit.MICROSECONDS.toString()); + + // Create a CompositeConfiguration as this is what the broker uses + CompositeConfiguration composite = new CompositeConfiguration(); + composite.addConfiguration(xmlconfig); + + try + { + config.setConfiguration("", composite); + fail("Configuration should fail to validate"); + } + catch (ConfigurationException e) + { + Throwable cause = e.getCause(); + + assertEquals("Cause not correct", NumberFormatException.class, cause.getClass()); + } + } + + /** + * Failure Testing: + * + * Test that negative delays are invalid. + * + * Delay must be a positive value as negative delay means doesn't make sense. + * + * Configuration exception with a useful message should be thrown here. + * + */ + public void testConfigLoadingInValidDelayNegative() + { + SlowConsumerDetectionConfiguration config = new SlowConsumerDetectionConfiguration(); + + XMLConfiguration xmlconfig = new XMLConfiguration(); + + xmlconfig.addProperty("delay", "-10"); + xmlconfig.addProperty("timeunit", TimeUnit.MICROSECONDS.toString()); + + // Create a CompositeConfiguration as this is what the broker uses + CompositeConfiguration composite = new CompositeConfiguration(); + composite.addConfiguration(xmlconfig); + + try + { + config.setConfiguration("", composite); + fail("Configuration should fail to validate"); + } + catch (ConfigurationException e) + { + Throwable cause = e.getCause(); + + assertNotNull("Configuration Exception must not be null.", cause); + assertEquals("Cause not correct", + ConfigurationException.class, cause.getClass()); + assertEquals("Incorrect message.", + "SlowConsumerDetectionConfiguration: 'delay' must be a Positive Long value.", + cause.getMessage()); + } + } + + /** + * Failure Testing: + * + * Test that delay cannot be 0. + * + * A zero delay means run constantly. This is not how VirtualHostTasks + * are designed to be run so we dis-allow the use of 0 delay. + * + * Same test as 'testConfigLoadingInValidDelayNegative' but with a 0 value. + * + */ + public void testConfigLoadingInValidDelayZero() + { + SlowConsumerDetectionConfiguration config = new SlowConsumerDetectionConfiguration(); + + XMLConfiguration xmlconfig = new XMLConfiguration(); + + xmlconfig.addProperty("delay", "0"); + xmlconfig.addProperty("timeunit", TimeUnit.MICROSECONDS.toString()); + + // Create a CompositeConfiguration as this is what the broker uses + CompositeConfiguration composite = new CompositeConfiguration(); + composite.addConfiguration(xmlconfig); + + try + { + config.setConfiguration("", composite); + fail("Configuration should fail to validate"); + } + catch (ConfigurationException e) + { + Throwable cause = e.getCause(); + + assertNotNull("Configuration Exception must not be null.", cause); + assertEquals("Cause not correct", + ConfigurationException.class, cause.getClass()); + assertEquals("Incorrect message.", + "SlowConsumerDetectionConfiguration: 'delay' must be a Positive Long value.", + cause.getMessage()); + } + } + + /** + * Failure Testing: + * + * Test that missing delay fails. + * If we have no delay then we do not pick a default. So a Configuration + * Exception is thrown. + * + * */ + public void testConfigLoadingInValidMissingDelay() + { + SlowConsumerDetectionConfiguration config = new SlowConsumerDetectionConfiguration(); + + XMLConfiguration xmlconfig = new XMLConfiguration(); + + xmlconfig.addProperty("timeunit", TimeUnit.SECONDS.toString()); + + // Create a CompositeConfiguration as this is what the broker uses + CompositeConfiguration composite = new CompositeConfiguration(); + composite.addConfiguration(xmlconfig); + try + { + config.setConfiguration("", composite); + fail("Configuration should fail to validate"); + } + catch (ConfigurationException e) + { + assertEquals("Incorrect message.", "SlowConsumerDetectionConfiguration: unable to configure invalid delay:null", e.getMessage()); + } + } + + /** + * Failure Testing: + * + * Test that erroneous TimeUnit fails. + * + * Valid TimeUnit values vary based on the JVM version i.e. 1.6 added HOURS/DAYS etc. + * + * We don't test the values for TimeUnit are accepted other than MILLISECONDS in the + * positive testing at the start. + * + * Here we ensure that an erroneous for TimeUnit correctly throws an exception. + * + * We test with 'foo', which will never be a TimeUnit + * + */ + public void testConfigLoadingInValidTimeUnit() + { + SlowConsumerDetectionConfiguration config = new SlowConsumerDetectionConfiguration(); + + String TIMEUNIT = "foo"; + XMLConfiguration xmlconfig = new XMLConfiguration(); + + xmlconfig.addProperty("delay", "10"); + xmlconfig.addProperty("timeunit", TIMEUNIT); + + // Create a CompositeConfiguration as this is what the broker uses + CompositeConfiguration composite = new CompositeConfiguration(); + composite.addConfiguration(xmlconfig); + try + { + config.setConfiguration("", composite); + fail("Configuration should fail to validate"); + } + catch (ConfigurationException e) + { + assertEquals("Incorrect message.", "Unable to configure Slow Consumer Detection invalid TimeUnit:" + TIMEUNIT, e.getMessage()); + } + } + + +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionPolicyConfigurationTest.java b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionPolicyConfigurationTest.java new file mode 100644 index 0000000000..67c177f099 --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionPolicyConfigurationTest.java @@ -0,0 +1,104 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost.plugin; + +import org.apache.commons.configuration.CompositeConfiguration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionPolicyConfiguration; +import org.apache.qpid.server.util.InternalBrokerBaseCase; + +/** + * Test class to ensure that the policy configuration can be processed. + */ +public class SlowConsumerDetectionPolicyConfigurationTest extends InternalBrokerBaseCase +{ + + /** + * Input Testing: + * + * Test that a given String can be set and retrieved through the configuration + * + * No validation is being performed to ensure that the policy exists. Only + * that a value can be set for the policy. + * + */ + public void testConfigLoadingValidConfig() + { + SlowConsumerDetectionPolicyConfiguration config = new SlowConsumerDetectionPolicyConfiguration(); + + XMLConfiguration xmlconfig = new XMLConfiguration(); + + String policyName = "TestPolicy"; + xmlconfig.addProperty("name", policyName); + + // Create a CompositeConfiguration as this is what the broker uses + CompositeConfiguration composite = new CompositeConfiguration(); + composite.addConfiguration(xmlconfig); + + try + { + config.setConfiguration("", composite); + } + catch (ConfigurationException e) + { + e.printStackTrace(); + fail(e.getMessage()); + } + + assertEquals("Policy name not retrieved as expected.", + policyName, config.getPolicyName()); + } + + /** + * Failure Testing: + * + * Test that providing a configuration section without the 'name' field + * causes an exception to be thrown. + * + * An empty configuration is provided and the thrown exception message + * is checked to confirm the right reason. + * + */ + public void testConfigLoadingInValidConfig() + { + SlowConsumerDetectionPolicyConfiguration config = new SlowConsumerDetectionPolicyConfiguration(); + + XMLConfiguration xmlconfig = new XMLConfiguration(); + + + // Create a CompositeConfiguration as this is what the broker uses + CompositeConfiguration composite = new CompositeConfiguration(); + composite.addConfiguration(xmlconfig); + + try + { + config.setConfiguration("", composite); + fail("Config is invalid so won't validate."); + } + catch (ConfigurationException e) + { + e.printStackTrace(); + assertEquals("Exception message not as expected.", "No Slow consumer policy defined.", e.getMessage()); + } + } + +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionQueueConfigurationTest.java b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionQueueConfigurationTest.java new file mode 100644 index 0000000000..23828d5c61 --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionQueueConfigurationTest.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.virtualhost.plugin; + +import org.apache.commons.configuration.CompositeConfiguration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionQueueConfiguration; +import org.apache.qpid.server.util.InternalBrokerBaseCase; + +/** + * Unit test the QueueConfiguration processing. + * + * This is slightly awkward as the {@link SlowConsumerDetectionQueueConfiguration} + * requries that a policy be available. + * <p> + * So all the Valid test much catch the ensuing {@link ConfigurationException} and + * validate that the error is due to a lack of a valid policy. + */ +public class SlowConsumerDetectionQueueConfigurationTest extends InternalBrokerBaseCase +{ + /** + * Test a fully loaded configuration file. + * + * It is not an error to have all control values specified. + * <p> + * Here we need to catch the {@link ConfigurationException} that ensues due to lack + * of a policy plugin. + */ + public void testConfigLoadingValidConfig() + { + SlowConsumerDetectionQueueConfiguration config = new SlowConsumerDetectionQueueConfiguration(); + + XMLConfiguration xmlconfig = new XMLConfiguration(); + + xmlconfig.addProperty("messageAge", "60000"); + xmlconfig.addProperty("depth", "1024"); + xmlconfig.addProperty("messageCount", "10"); + + // Create a CompositeConfiguration as this is what the broker uses + CompositeConfiguration composite = new CompositeConfiguration(); + composite.addConfiguration(xmlconfig); + + try + { + config.setConfiguration("", composite); + fail("No Policies are avaialbe to load in a unit test"); + } + catch (ConfigurationException e) + { + assertTrue("Exception message incorrect, was: " + e.getMessage(), + e.getMessage().startsWith("No Slow Consumer Policy specified. Known Policies:[")); + } + } + + /** + * When we do not specify any control value then a {@link ConfigurationException} + * must be thrown to remind us. + */ + public void testConfigLoadingMissingConfig() + { + SlowConsumerDetectionQueueConfiguration config = new SlowConsumerDetectionQueueConfiguration(); + + XMLConfiguration xmlconfig = new XMLConfiguration(); + + // Create a CompositeConfiguration as this is what the broker uses + CompositeConfiguration composite = new CompositeConfiguration(); + composite.addConfiguration(xmlconfig); + + try + { + config.setConfiguration("", composite); + fail("No Policies are avaialbe to load in a unit test"); + } + catch (ConfigurationException e) + { + + assertEquals("At least one configuration property('messageAge','depth'" + + " or 'messageCount') must be specified.", e.getMessage()); + } + } + + /** + * Setting messageAge on its own is enough to have a valid configuration + * + * Here we need to catch the {@link ConfigurationException} that ensues due to lack + * of a policy plugin. + */ + public void testConfigLoadingMessageAgeOk() + { + SlowConsumerDetectionQueueConfiguration config = new SlowConsumerDetectionQueueConfiguration(); + + XMLConfiguration xmlconfig = new XMLConfiguration(); + xmlconfig.addProperty("messageAge", "60000"); + + // Create a CompositeConfiguration as this is what the broker uses + CompositeConfiguration composite = new CompositeConfiguration(); + composite.addConfiguration(xmlconfig); + + try + { + config.setConfiguration("", composite); + fail("No Policies are avaialbe to load in a unit test"); + } + catch (ConfigurationException e) + { + assertTrue("Exception message incorrect, was: " + e.getMessage(), + e.getMessage().startsWith("No Slow Consumer Policy specified. Known Policies:[")); + } + } + + /** + * Setting depth on its own is enough to have a valid configuration. + * + * Here we need to catch the {@link ConfigurationException} that ensues due to lack + * of a policy plugin. + */ + public void testConfigLoadingDepthOk() + { + SlowConsumerDetectionQueueConfiguration config = new SlowConsumerDetectionQueueConfiguration(); + + XMLConfiguration xmlconfig = new XMLConfiguration(); + xmlconfig.addProperty("depth", "1024"); + + // Create a CompositeConfiguration as this is what the broker uses + CompositeConfiguration composite = new CompositeConfiguration(); + composite.addConfiguration(xmlconfig); + + try + { + config.setConfiguration("", composite); + fail("No Policies are avaialbe to load in a unit test"); + } + catch (ConfigurationException e) + { + assertTrue("Exception message incorrect, was: " + e.getMessage(), + e.getMessage().startsWith("No Slow Consumer Policy specified. Known Policies:[")); + } + } + + /** + * Setting messageCount on its own is enough to have a valid configuration. + * + * Here we need to catch the {@link ConfigurationException} that ensues due to lack + * of a policy plugin. + */ + public void testConfigLoadingMessageCountOk() + { + SlowConsumerDetectionQueueConfiguration config = new SlowConsumerDetectionQueueConfiguration(); + + XMLConfiguration xmlconfig = new XMLConfiguration(); + xmlconfig.addProperty("messageCount", "10"); + + // Create a CompositeConfiguration as this is what the broker uses + CompositeConfiguration composite = new CompositeConfiguration(); + composite.addConfiguration(xmlconfig); + + try + { + config.setConfiguration("", composite); + fail("No Policies are avaialbe to load in a unit test"); + } + catch (ConfigurationException e) + { + assertTrue("Exception message incorrect, was: " + e.getMessage(), + e.getMessage().startsWith("No Slow Consumer Policy specified. Known Policies:[")); + } + } +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfigurationTest.java b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfigurationTest.java new file mode 100644 index 0000000000..8b729a0f43 --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfigurationTest.java @@ -0,0 +1,88 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost.plugin.policies; + +import org.apache.commons.configuration.CompositeConfiguration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.qpid.server.util.InternalBrokerBaseCase; + +/** + * Test to ensure TopicDelete Policy configuration can be loaded. + */ +public class TopicDeletePolicyConfigurationTest extends InternalBrokerBaseCase +{ + /** + * Test without any configuration being provided that the + * deletePersistent option is disabled. + */ + public void testNoConfigNoDeletePersistent() + { + TopicDeletePolicyConfiguration config = new TopicDeletePolicyConfiguration(); + + assertFalse("TopicDelete Configuration with no config should not delete persistent queues.", + config.deletePersistent()); + } + + /** + * Test that with the correct configuration the deletePersistent option can + * be enabled. + * + * Test creates a new Configuration object and passes in the xml snippet + * that the ConfigurationPlugin would receive during normal execution. + * This is the XML that would be matched for this plugin: + * <topicdelete> + * <delete-persistent> + * <topicdelete> + * + * So it would be subset and passed in as just: + * <delete-persistent> + * + * + * The property should therefore be enabled. + * + */ + public void testConfigDeletePersistent() + { + TopicDeletePolicyConfiguration config = new TopicDeletePolicyConfiguration(); + + XMLConfiguration xmlconfig = new XMLConfiguration(); + + xmlconfig.addProperty("delete-persistent",""); + + // Create a CompositeConfiguration as this is what the broker uses + CompositeConfiguration composite = new CompositeConfiguration(); + composite.addConfiguration(xmlconfig); + + try + { + config.setConfiguration("",composite); + } + catch (ConfigurationException e) + { + fail(e.getMessage()); + } + + assertTrue("A configured TopicDelete should delete persistent queues.", + config.deletePersistent()); + } + +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyTest.java b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyTest.java new file mode 100644 index 0000000000..364766dfa7 --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyTest.java @@ -0,0 +1,293 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost.plugin.policies; + +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.exchange.DirectExchange; +import org.apache.qpid.server.exchange.TopicExchange; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.InternalTestProtocolSession; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.MockAMQQueue; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.util.InternalBrokerBaseCase; +import org.apache.qpid.server.virtualhost.VirtualHost; + +public class TopicDeletePolicyTest extends InternalBrokerBaseCase +{ + + TopicDeletePolicyConfiguration _config; + + VirtualHost _defaultVhost; + InternalTestProtocolSession _connection; + + public void setUp() throws Exception + { + super.setUp(); + + _defaultVhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getDefaultVirtualHost(); + + _connection = new InternalTestProtocolSession(_defaultVhost); + + _config = new TopicDeletePolicyConfiguration(); + + XMLConfiguration config = new XMLConfiguration(); + + _config.setConfiguration("", config); + } + + private MockAMQQueue createOwnedQueue() + { + MockAMQQueue queue = new MockAMQQueue("testQueue"); + + _defaultVhost.getQueueRegistry().registerQueue(queue); + + try + { + AMQChannel channel = new AMQChannel(_connection, 0, null); + _connection.addChannel(channel); + + queue.setExclusiveOwningSession(channel); + } + catch (AMQException e) + { + fail("Unable to create Channel:" + e.getMessage()); + } + + return queue; + } + + private void setQueueToAutoDelete(final AMQQueue queue) + { + ((MockAMQQueue) queue).setAutoDelete(true); + + queue.setDeleteOnNoConsumers(true); + final AMQProtocolSession.Task deleteQueueTask = + new AMQProtocolSession.Task() + { + public void doTask(AMQProtocolSession session) throws AMQException + { + queue.delete(); + } + }; + + ((AMQChannel) queue.getExclusiveOwningSession()).getProtocolSession().addSessionCloseTask(deleteQueueTask); + } + + /** Check that a null queue passed in does not upset the policy. */ + public void testNullQueueParameter() throws ConfigurationException + { + TopicDeletePolicy policy = new TopicDeletePolicy(); + policy.configure(_config); + + try + { + policy.performPolicy(null); + } + catch (Exception e) + { + fail("Exception should not be thrown:" + e.getMessage()); + } + + } + + /** + * Set a owning Session to null which means this is not an exclusive queue + * so the queue should not be deleted + */ + public void testNonExclusiveQueue() + { + TopicDeletePolicy policy = new TopicDeletePolicy(); + policy.configure(_config); + + MockAMQQueue queue = createOwnedQueue(); + + queue.setExclusiveOwningSession(null); + + policy.performPolicy(queue); + + assertFalse("Queue should not be deleted", queue.isDeleted()); + assertFalse("Connection should not be closed", _connection.isClosed()); + } + + /** + * Test that exclusive JMS Queues are not deleted. + * Bind the queue to the direct exchange (so it is a JMS Queue). + * + * JMS Queues are not to be processed so this should not delete the queue. + */ + public void testQueuesAreNotProcessed() + { + TopicDeletePolicy policy = new TopicDeletePolicy(); + policy.configure(_config); + + MockAMQQueue queue = createOwnedQueue(); + + queue.addBinding(new Binding(null, "bindingKey", queue, new DirectExchange(), null)); + + policy.performPolicy(queue); + + assertFalse("Queue should not be deleted", queue.isDeleted()); + assertFalse("Connection should not be closed", _connection.isClosed()); + } + + /** + * Give a non auto-delete queue is bound to the topic exchange the + * TopicDeletePolicy will close the connection and delete the queue, + */ + public void testNonAutoDeleteTopicIsNotClosed() + { + TopicDeletePolicy policy = new TopicDeletePolicy(); + policy.configure(_config); + + MockAMQQueue queue = createOwnedQueue(); + + queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null)); + + queue.setAutoDelete(false); + + policy.performPolicy(queue); + + assertFalse("Queue should not be deleted", queue.isDeleted()); + assertTrue("Connection should be closed", _connection.isClosed()); + } + + /** + * Give a auto-delete queue bound to the topic exchange the TopicDeletePolicy will + * close the connection and delete the queue + */ + public void testTopicIsClosed() + { + TopicDeletePolicy policy = new TopicDeletePolicy(); + policy.configure(_config); + + final MockAMQQueue queue = createOwnedQueue(); + + queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null)); + + setQueueToAutoDelete(queue); + + policy.performPolicy(queue); + + assertTrue("Queue should be deleted", queue.isDeleted()); + assertTrue("Connection should be closed", _connection.isClosed()); + } + + /** + * Give a queue bound to the topic exchange the TopicDeletePolicy will + * close the connection and NOT delete the queue + */ + public void testNonAutoDeleteTopicIsClosedNotDeleted() + { + TopicDeletePolicy policy = new TopicDeletePolicy(); + policy.configure(_config); + + MockAMQQueue queue = createOwnedQueue(); + + queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null)); + + policy.performPolicy(queue); + + assertFalse("Queue should not be deleted", queue.isDeleted()); + assertTrue("Connection should be closed", _connection.isClosed()); + } + + /** + * Give a queue bound to the topic exchange the TopicDeletePolicy suitably + * configured with the delete-persistent tag will close the connection + * and delete the queue + */ + public void testPersistentTopicIsClosedAndDeleted() + { + //Set the config to delete persistent queues + _config.getConfig().addProperty("delete-persistent", ""); + + TopicDeletePolicy policy = new TopicDeletePolicy(); + policy.configure(_config); + + assertTrue("Config was not updated to delete Persistent topics", + _config.deletePersistent()); + + MockAMQQueue queue = createOwnedQueue(); + + queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null)); + + policy.performPolicy(queue); + + assertTrue("Queue should be deleted", queue.isDeleted()); + assertTrue("Connection should be closed", _connection.isClosed()); + } + + /** + * Give a queue bound to the topic exchange the TopicDeletePolicy not + * configured to close a persistent queue + */ + public void testPersistentTopicIsClosedAndDeletedNullConfig() + { + TopicDeletePolicy policy = new TopicDeletePolicy(); + // Explicity say we are not configuring the policy. + policy.configure(null); + + MockAMQQueue queue = createOwnedQueue(); + + queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null)); + + policy.performPolicy(queue); + + assertFalse("Queue should not be deleted", queue.isDeleted()); + assertTrue("Connection should be closed", _connection.isClosed()); + } + + public void testNonExclusiveQueueNullConfig() + { + _config = null; + testNonExclusiveQueue(); + } + + public void testQueuesAreNotProcessedNullConfig() + { + _config = null; + testQueuesAreNotProcessed(); + } + + public void testNonAutoDeleteTopicIsNotClosedNullConfig() + { + _config = null; + testNonAutoDeleteTopicIsNotClosed(); + } + + public void testTopicIsClosedNullConfig() + { + _config = null; + testTopicIsClosed(); + } + + public void testNonAutoDeleteTopicIsClosedNotDeletedNullConfig() throws AMQException + { + _config = null; + testNonAutoDeleteTopicIsClosedNotDeleted(); + } + +} |
