diff options
| author | Keith Wall <kwall@apache.org> | 2015-03-03 14:56:40 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2015-03-03 14:56:40 +0000 |
| commit | 9dc57fe738f366d875c2319dafdfa2c50ce2f20b (patch) | |
| tree | be6634866a966f358fcb1ba6ba29dfb5c9c340c1 /qpid/java/broker-core/src | |
| parent | fe37626d4fd8fb3ee5b3146a5159024a3d6d3357 (diff) | |
| download | qpid-python-9dc57fe738f366d875c2319dafdfa2c50ce2f20b.tar.gz | |
merge from trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1663717 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core/src')
77 files changed, 2403 insertions, 549 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java index 6c50fe7cfd..e88763dd1d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java @@ -23,9 +23,12 @@ package org.apache.qpid.server; import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.net.URL; import java.security.PrivilegedAction; import java.security.PrivilegedExceptionAction; +import java.util.HashSet; import java.util.Properties; +import java.util.Set; import java.util.concurrent.TimeoutException; import javax.security.auth.Subject; @@ -34,6 +37,7 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.log4j.PropertyConfigurator; +import org.apache.qpid.common.QpidProperties; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.configuration.updater.TaskExecutorImpl; @@ -154,6 +158,8 @@ public class Broker implements BrokerShutdownProvider private void startupImpl(final BrokerOptions options) throws Exception { + populateSystemPropertiesFromDefaults(options.getInitialSystemProperties()); + String storeLocation = options.getConfigurationStoreLocation(); String storeType = options.getConfigurationStoreType(); @@ -321,6 +327,37 @@ public class Broker implements BrokerShutdownProvider } } + public static void populateSystemPropertiesFromDefaults(final String initialProperties) throws IOException + { + URL initialPropertiesLocation; + if(initialProperties == null) + { + initialPropertiesLocation = Broker.class.getClassLoader().getResource("system.properties"); + } + else + { + initialPropertiesLocation = (new File(initialProperties)).toURI().toURL(); + } + + Properties props = new Properties(QpidProperties.asProperties()); + if(initialPropertiesLocation != null) + { + + try(InputStream inStream = initialPropertiesLocation.openStream()) + { + props.load(inStream); + } + } + + Set<String> propertyNames = new HashSet<>(props.stringPropertyNames()); + propertyNames.removeAll(System.getProperties().stringPropertyNames()); + for (String propName : propertyNames) + { + System.setProperty(propName, props.getProperty(propName)); + } + } + + private class ShutdownService implements Runnable { public void run() diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/BrokerOptions.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/BrokerOptions.java index 59075dfb57..ff3d9063f0 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/BrokerOptions.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/BrokerOptions.java @@ -78,6 +78,7 @@ public class BrokerOptions private boolean _overwriteConfigurationStore; private Map<String, String> _configProperties = new HashMap<String,String>(); private boolean _startupLoggedToSystemOut = true; + private String _initialSystemProperties; public Map<String, Object> convertToSystemConfigAttributes() { @@ -390,4 +391,24 @@ public class BrokerOptions { this._startupLoggedToSystemOut = startupLoggedToSystemOut; } + + /** + * Get the location of initial JVM system properties to set. This can be URL or a file path + * + * @return the location of initial JVM system properties to set. + */ + public String getInitialSystemProperties() + { + return _initialSystemProperties; + } + + /** + * Set the location of initial properties file to set as JVM system properties. This can be URL or a file path + * + * @param initialSystemProperties the location of initial JVM system properties. + */ + public void setInitialSystemProperties(String initialSystemProperties) + { + _initialSystemProperties = initialSystemProperties; + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java index 76c6b6007f..6012e2e8db 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.binding; -import java.security.AccessControlException; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -45,10 +44,8 @@ import org.apache.qpid.server.model.ManagedAttributeField; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.StateTransition; -import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.util.StateChangeListener; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; public class BindingImpl extends AbstractConfiguredObject<BindingImpl> @@ -108,26 +105,6 @@ public class BindingImpl } } - @Override - protected void onCreate() - { - super.onCreate(); - try - { - _queue.getVirtualHost().getSecurityManager().authoriseCreateBinding(this); - } - catch(AccessControlException e) - { - deleted(); - throw e; - } - if (isDurable()) - { - _queue.getVirtualHost().getDurableConfigurationStore().create(asObjectRecord()); - } - - } - private static Map<String, Object> enhanceWithDurable(Map<String, Object> attributes, final AMQQueue queue, final ExchangeImpl exchange) @@ -263,12 +240,6 @@ public class BindingImpl { _arguments = arguments; BindingImpl.super.setAttribute(ARGUMENTS, getActualAttributes().get(ARGUMENTS), arguments); - if (isDurable()) - { - VirtualHostImpl<?, ?, ?> vhost = - (VirtualHostImpl<?, ?, ?>) _exchange.getParent(VirtualHost.class); - vhost.getDurableConfigurationStore().update(true, asObjectRecord()); - } } } ); @@ -278,6 +249,8 @@ public class BindingImpl @Override public void validateOnCreate() { + _queue.getVirtualHost().getSecurityManager().authoriseCreateBinding(this); + AMQQueue queue = getAMQQueue(); Map<String, Object> arguments = getArguments(); if (arguments!=null && !arguments.isEmpty() && FilterSupport.argumentsContainFilter(arguments)) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java index 765e1e4fa5..d6dbe37a6b 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java @@ -48,6 +48,7 @@ public class BrokerProperties public static final String PROPERTY_QPID_HOME = "QPID_HOME"; public static final String PROPERTY_QPID_WORK = "QPID_WORK"; public static final String PROPERTY_LOG_RECORDS_BUFFER_SIZE = "qpid.broker_log_records_buffer_size"; + public static final String POSIX_FILE_PERMISSIONS = "qpid.default_posix_file_permissions"; private BrokerProperties() { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListener.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListener.java index 21715f7406..d5bbe16446 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListener.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListener.java @@ -25,7 +25,6 @@ import java.util.Collection; import org.apache.qpid.server.model.ConfigurationChangeListener; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.State; -import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.store.DurableConfigurationStore; public class StoreConfigurationChangeListener implements ConfigurationChangeListener @@ -43,7 +42,10 @@ public class StoreConfigurationChangeListener implements ConfigurationChangeList { if (newState == State.DELETED) { - _store.remove(object.asObjectRecord()); + if(object.isDurable()) + { + _store.remove(object.asObjectRecord()); + } object.removeChangeListener(this); } } @@ -51,20 +53,23 @@ public class StoreConfigurationChangeListener implements ConfigurationChangeList @Override public void childAdded(ConfiguredObject<?> object, ConfiguredObject<?> child) { - // exclude VirtualHostNode children from storing in broker store - if (!(object instanceof VirtualHostNode)) + if (!object.managesChildStorage()) { - child.addChangeListener(this); - _store.update(true,child.asObjectRecord()); + if(object.isDurable() && child.isDurable()) + { + child.addChangeListener(this); + _store.update(true, child.asObjectRecord()); - Class<? extends ConfiguredObject> categoryClass = child.getCategoryClass(); - Collection<Class<? extends ConfiguredObject>> childTypes = child.getModel().getChildTypes(categoryClass); + Class<? extends ConfiguredObject> categoryClass = child.getCategoryClass(); + Collection<Class<? extends ConfiguredObject>> childTypes = + child.getModel().getChildTypes(categoryClass); - for(Class<? extends ConfiguredObject> childClass : childTypes) - { - for (ConfiguredObject<?> grandchild : child.getChildren(childClass)) + for (Class<? extends ConfiguredObject> childClass : childTypes) { - childAdded(child, grandchild); + for (ConfiguredObject<?> grandchild : child.getChildren(childClass)) + { + childAdded(child, grandchild); + } } } } @@ -74,14 +79,20 @@ public class StoreConfigurationChangeListener implements ConfigurationChangeList @Override public void childRemoved(ConfiguredObject object, ConfiguredObject child) { - _store.remove(child.asObjectRecord()); + if(child.isDurable()) + { + _store.remove(child.asObjectRecord()); + } child.removeChangeListener(this); } @Override public void attributeSet(ConfiguredObject object, String attributeName, Object oldAttributeValue, Object newAttributeValue) { - _store.update(false, object.asObjectRecord()); + if(object.isDurable()) + { + _store.update(false, object.asObjectRecord()); + } } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 6587bc76b2..cf23e3dd91 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -177,17 +177,6 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> } @Override - protected void onCreate() - { - super.onCreate(); - if(isDurable()) - { - getVirtualHost().getDurableConfigurationStore().create(asObjectRecord()); - } - - } - - @Override public EventLogger getEventLogger() { return _virtualHost.getEventLogger(); @@ -213,12 +202,6 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> throw new RequiredExchangeException(getName()); } - if (isDurable() && !isAutoDelete()) - { - getVirtualHost().getDurableConfigurationStore().remove(asObjectRecord()); - - } - if(_closed.compareAndSet(false,true)) { List<BindingImpl> bindings = new ArrayList<BindingImpl>(_bindings); @@ -241,11 +224,6 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> } _closeTaskList.clear(); - if (isDurable() && !isAutoDelete()) - { - getVirtualHost().getDurableConfigurationStore().remove(asObjectRecord()); - - } } deleted(); } @@ -665,10 +643,6 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> doRemoveBinding(b); queue.removeBinding(b); - if (b.isDurable()) - { - _virtualHost.getDurableConfigurationStore().remove(b.asObjectRecord()); - } b.delete(); } @@ -905,10 +879,6 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> protected void changeAttributes(final Map<String, Object> attributes) { super.changeAttributes(attributes); - if (isDurable() && getState() != State.DELETED) - { - this.getVirtualHost().getDurableConfigurationStore().update(false, asObjectRecord()); - } } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java index 127a8d9e52..fcc34ee4de 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java @@ -62,7 +62,8 @@ public class DefaultDestination implements MessageDestination final AMQQueue q = _virtualHost.getQueue(routingAddress); if(q == null) { - if(routingAddress != null && routingAddress.contains("/") && !routingAddress.startsWith("/")) + routingAddress = _virtualHost.getLocalAddress(routingAddress); + if(routingAddress.contains("/") && !routingAddress.startsWith("/")) { String[] parts = routingAddress.split("/",2); ExchangeImpl exchange = _virtualHost.getExchange(parts[0]); @@ -71,7 +72,7 @@ public class DefaultDestination implements MessageDestination return exchange.send(message, parts[1], instanceProperties, txn, postEnqueueAction); } } - else if(routingAddress == null || !routingAddress.contains("/")) + else if(!routingAddress.contains("/")) { ExchangeImpl exchange = _virtualHost.getExchange(routingAddress); if(exchange != null) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java index 597fc44e4c..de796a846a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java @@ -87,6 +87,12 @@ class HeadersBinding +"' with arguments: " + _binding.getArguments()); _filter = new MessageFilter() { + @Override + public String getName() + { + return ""; + } + @Override public boolean matches(Filterable message) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilter.java new file mode 100644 index 0000000000..dbd6a5f6f6 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilter.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.filter; + +import org.apache.qpid.common.AMQPFilterTypes; + +public final class ArrivalTimeFilter implements MessageFilter +{ + private final long _startingFrom; + + public ArrivalTimeFilter(final long startingFrom) + { + _startingFrom = startingFrom; + } + + @Override + public String getName() + { + return AMQPFilterTypes.REPLAY_PERIOD.toString(); + } + + @Override + public boolean matches(final Filterable message) + { + return message.getArrivalTime() >= _startingFrom; + } + +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java new file mode 100644 index 0000000000..8c55c8ac76 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.filter; + +import java.util.List; + +import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.server.plugin.MessageFilterFactory; +import org.apache.qpid.server.plugin.PluggableService; + +@PluggableService +public final class ArrivalTimeFilterFactory implements MessageFilterFactory +{ + + @Override + public MessageFilter newInstance(final List<String> arguments) + { + if(arguments == null || arguments.size() != 1) + { + throw new IllegalArgumentException("Cannot create a filter from these arguments: " + arguments); + } + String arg = arguments.get(0); + long startingFrom= Long.parseLong(arg); + + return new ArrivalTimeFilter(startingFrom); + } + + @Override + public String getType() + { + return AMQPFilterTypes.REPLAY_PERIOD.toString(); + } + +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java index 69fc520a8f..ad14fa423a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java @@ -14,26 +14,62 @@ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. - * + * under the License. * + * */ package org.apache.qpid.server.filter; -// -// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html> -// import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; -public interface FilterManager +public class FilterManager { - void add(MessageFilter filter); - void remove(MessageFilter filter); + private final Map<String, MessageFilter> _filters = new ConcurrentHashMap<>(); + + public FilterManager() + { + } + + public void add(String name, MessageFilter filter) + { + _filters.put(name, filter); + } + + public boolean allAllow(Filterable msg) + { + for (MessageFilter filter : _filters.values()) + { + if (!filter.matches(msg)) + { + return false; + } + } + return true; + } + + public Iterator<MessageFilter> filters() + { + return _filters.values().iterator(); + } + + public boolean hasFilters() + { + return !_filters.isEmpty(); + } + + public boolean hasFilter(final String name) + { + return _filters.containsKey(name); + } - boolean allAllow(Filterable msg); + @Override + public String toString() + { + return _filters.toString(); + } - Iterator<MessageFilter> filters(); - boolean hasFilters(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java index a159a8506b..28f7fe3554 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.filter; +import java.util.Map; + import org.apache.log4j.Logger; import org.apache.qpid.common.AMQPFilterTypes; @@ -27,8 +29,6 @@ import org.apache.qpid.filter.SelectorParsingException; import org.apache.qpid.filter.selector.ParseException; import org.apache.qpid.filter.selector.TokenMgrError; -import java.util.Map; - public class FilterManagerFactory { @@ -54,20 +54,13 @@ public class FilterManagerFactory if (selector instanceof String && !selector.equals("")) { - manager = new SimpleFilterManager(); + manager = new FilterManager(); try { - manager.add(new JMSSelectorFilter((String)selector)); - } - catch (ParseException e) - { - throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selector + "\"", e); - } - catch (SelectorParsingException e) - { - throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selector + "\"", e); + MessageFilter filter = new JMSSelectorFilter((String)selector); + manager.add(filter.getName(), filter); } - catch (TokenMgrError e) + catch (ParseException | SelectorParsingException | TokenMgrError e) { throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selector + "\"", e); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java index d0b1670a45..6b8ae2f552 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java @@ -26,12 +26,14 @@ import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.WeakHashMap; + import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.filter.SelectorParsingException; import org.apache.qpid.filter.selector.ParseException; import org.apache.qpid.filter.selector.TokenMgrError; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageSource; +import org.apache.qpid.server.plugin.PluggableService; import org.apache.qpid.server.queue.AMQQueue; public class FilterSupport @@ -57,15 +59,7 @@ public class FilterSupport { selector = new JMSSelectorFilter(selectorString); } - catch (ParseException e) - { - throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e); - } - catch (SelectorParsingException e) - { - throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e); - } - catch (TokenMgrError e) + catch (ParseException | SelectorParsingException | TokenMgrError e) { throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e); } @@ -119,6 +113,7 @@ public class FilterSupport } } + @PluggableService public static final class NoLocalFilter implements MessageFilter { private final MessageSource _queue; @@ -128,6 +123,12 @@ public class FilterSupport _queue = queue; } + @Override + public String getName() + { + return AMQPFilterTypes.NO_LOCAL.toString(); + } + public boolean matches(Filterable message) { @@ -165,6 +166,8 @@ public class FilterSupport { return _queue != null ? _queue.hashCode() : 0; } + + } static final class CompoundFilter implements MessageFilter @@ -178,6 +181,12 @@ public class FilterSupport _jmsSelectorFilter = jmsSelectorFilter; } + @Override + public String getName() + { + return ""; + } + public boolean matches(Filterable message) { return _noLocalFilter.matches(message) && _jmsSelectorFilter.matches(message); @@ -216,5 +225,7 @@ public class FilterSupport result = 31 * result + (_jmsSelectorFilter != null ? _jmsSelectorFilter.hashCode() : 0); return result; } + + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java index 589e888059..295f9ae074 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java @@ -34,6 +34,10 @@ public interface Filterable Object getConnectionReference(); + long getMessageNumber(); + + long getArrivalTime(); + public class Factory { @@ -41,6 +45,7 @@ public interface Filterable { return new Filterable() { + @Override public AMQMessageHeader getMessageHeader() { @@ -64,6 +69,18 @@ public interface Filterable { return message.getConnectionReference(); } + + @Override + public long getMessageNumber() + { + return message.getMessageNumber(); + } + + @Override + public long getArrivalTime() + { + return message.getArrivalTime(); + } }; } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java index 744e4e4e9d..a36049cd23 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java @@ -25,14 +25,18 @@ import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; import org.apache.log4j.Logger; + +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.filter.BooleanExpression; import org.apache.qpid.filter.FilterableMessage; import org.apache.qpid.filter.SelectorParsingException; import org.apache.qpid.filter.selector.ParseException; import org.apache.qpid.filter.selector.SelectorParser; import org.apache.qpid.filter.selector.TokenMgrError; +import org.apache.qpid.server.plugin.PluggableService; +@PluggableService public class JMSSelectorFilter implements MessageFilter { private final static Logger _logger = org.apache.log4j.Logger.getLogger(JMSSelectorFilter.class); @@ -46,6 +50,12 @@ public class JMSSelectorFilter implements MessageFilter _matcher = new SelectorParser().parse(selector); } + @Override + public String getName() + { + return AMQPFilterTypes.JMS_SELECTOR.toString(); + } + public boolean matches(Filterable message) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilterFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilterFactory.java new file mode 100644 index 0000000000..233edc78cd --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilterFactory.java @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.filter; + +import java.util.List; + +import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.filter.SelectorParsingException; +import org.apache.qpid.filter.selector.ParseException; +import org.apache.qpid.filter.selector.TokenMgrError; +import org.apache.qpid.server.plugin.MessageFilterFactory; +import org.apache.qpid.server.plugin.PluggableService; + +@PluggableService +public final class JMSSelectorFilterFactory implements MessageFilterFactory +{ + public String getType() + { + return AMQPFilterTypes.JMS_SELECTOR.toString(); + } + + @Override + public MessageFilter newInstance(final List<String> arguments) + { + if(arguments == null || arguments.size() != 1) + { + throw new IllegalArgumentException("Cannot create a filter from these arguments: " + arguments); + } + String arg = arguments.get(0); + try + { + return new JMSSelectorFilter(arg); + } + catch (ParseException | TokenMgrError | SelectorParsingException e) + { + throw new IllegalArgumentException("Cannot create an JMS Selector from '" + arg + "'", e); + } + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java index d7dbbea166..226d646efd 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java @@ -22,5 +22,6 @@ package org.apache.qpid.server.filter; public interface MessageFilter { + String getName(); boolean matches(Filterable message); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java deleted file mode 100644 index 111fb6a333..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.server.filter; - -import org.apache.log4j.Logger; - -import java.util.Iterator; -import java.util.concurrent.ConcurrentLinkedQueue; - -public class SimpleFilterManager implements FilterManager -{ - private final Logger _logger = Logger.getLogger(SimpleFilterManager.class); - - private final ConcurrentLinkedQueue<MessageFilter> _filters; - private String _toString = ""; - - public SimpleFilterManager() - { - _logger.debug("Creating SimpleFilterManager"); - _filters = new ConcurrentLinkedQueue<MessageFilter>(); - } - - public SimpleFilterManager(JMSSelectorFilter messageFilter) - { - this(); - add(messageFilter); - } - - public void add(MessageFilter filter) - { - _filters.add(filter); - updateStringValue(); - } - - public void remove(MessageFilter filter) - { - _filters.remove(filter); - updateStringValue(); - } - - public boolean allAllow(Filterable msg) - { - for (MessageFilter filter : _filters) - { - if (!filter.matches(msg)) - { - return false; - } - } - return true; - } - - @Override - public Iterator<MessageFilter> filters() - { - return _filters.iterator(); - } - - public boolean hasFilters() - { - return !_filters.isEmpty(); - } - - - @Override - public String toString() - { - return _toString; - } - - private void updateStringValue() - { - StringBuilder toString = new StringBuilder(); - for (MessageFilter filter : _filters) - { - toString.append(filter.toString()); - toString.append(","); - } - - if (_filters.size() > 0) - { - //Remove the last ',' - toString.deleteCharAt(toString.length()-1); - } - _toString = toString.toString(); - } -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java index aef769dc4f..76608cffbf 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java @@ -122,8 +122,11 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im private final TaskExecutor _taskExecutor; private final Class<? extends ConfiguredObject> _category; + private final Class<? extends ConfiguredObject> _typeClass; private final Class<? extends ConfiguredObject> _bestFitInterface; private final Model _model; + private final boolean _managesChildStorage; + @ManagedAttributeField private long _createdTime; @@ -206,6 +209,8 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im _model = model; _category = ConfiguredObjectTypeRegistry.getCategory(getClass()); + Class<? extends ConfiguredObject> typeClass = model.getTypeRegistry().getTypeClass(getClass()); + _typeClass = typeClass == null ? _category : typeClass; _attributeTypes = model.getTypeRegistry().getAttributeTypes(getClass()); _automatedFields = model.getTypeRegistry().getAutomatedFields(getClass()); @@ -242,6 +247,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } _type = ConfiguredObjectTypeRegistry.getType(getClass()); + _managesChildStorage = managesChildren(_category) || managesChildren(_typeClass); _bestFitInterface = calculateBestFitInterface(); if(attributes.get(TYPE) != null && !_type.equals(attributes.get(TYPE))) @@ -315,6 +321,11 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } } + private boolean managesChildren(final Class<? extends ConfiguredObject> clazz) + { + return clazz.getAnnotation(ManagedObject.class).managesChildren(); + } + private Class<? extends ConfiguredObject> calculateBestFitInterface() { Set<Class<? extends ConfiguredObject>> candidates = new HashSet<Class<? extends ConfiguredObject>>(); @@ -1056,11 +1067,24 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im return _model; } + @Override public Class<? extends ConfiguredObject> getCategoryClass() { return _category; } + @Override + public Class<? extends ConfiguredObject> getTypeClass() + { + return _typeClass; + } + + @Override + public boolean managesChildStorage() + { + return _managesChildStorage; + } + public Map<String,String> getContext() { return _context == null ? Collections.<String,String>emptyMap() : Collections.unmodifiableMap(_context); @@ -1219,8 +1243,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im if(attr != null && (attr.isAutomated() || attr.isDerived())) { Object value = attr.getValue((X)this); - if(value != null && attr.isSecure() && - !SecurityManager.isSystemProcess()) + if(value != null && !SecurityManager.isSystemProcess() && attr.isSecureValue(value)) { return SECURE_VALUES.get(value.getClass()); } @@ -1620,8 +1643,9 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im { Object desired = attributes.get(name); Object expected = getAttribute(name); - if(((_attributes.get(name) != null && !_attributes.get(name).equals(attributes.get(name))) - || attributes.get(name) != null) + Object currentValue = _attributes.get(name); + if(((currentValue != null && !currentValue.equals(desired)) + || (currentValue == null && desired != null)) && changeAttribute(name, expected, desired)) { attributeSet(name, expected, desired); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java index 15e804e6f5..24e62ce7de 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java @@ -50,6 +50,25 @@ abstract class AttributeValueConverter<T> } }; + static final AttributeValueConverter<Object> OBJECT_CONVERTER = new AttributeValueConverter<Object>() + { + @Override + public Object convert(final Object value, final ConfiguredObject object) + { + if(value instanceof String) + { + return AbstractConfiguredObject.interpolate(object, (String) value); + } + else if(value == null) + { + return null; + } + else + { + return value; + } + } + }; static final AttributeValueConverter<UUID> UUID_CONVERTER = new AttributeValueConverter<UUID>() { @Override @@ -398,7 +417,17 @@ abstract class AttributeValueConverter<T> } else if(Map.class.isAssignableFrom(type)) { - return (AttributeValueConverter<X>) MAP_CONVERTER; + if(returnType instanceof ParameterizedType) + { + Type keyType = ((ParameterizedType) returnType).getActualTypeArguments()[0]; + Type valueType = ((ParameterizedType) returnType).getActualTypeArguments()[1]; + + return (AttributeValueConverter<X>) new GenericMapConverter(keyType,valueType); + } + else + { + return (AttributeValueConverter<X>) MAP_CONVERTER; + } } else if(Collection.class.isAssignableFrom(type)) { @@ -416,6 +445,10 @@ abstract class AttributeValueConverter<T> { return (AttributeValueConverter<X>) new ConfiguredObjectConverter(type); } + else if(Object.class == type) + { + return (AttributeValueConverter<X>) OBJECT_CONVERTER; + } throw new IllegalArgumentException("Cannot create attribute converter of type " + type.getName()); } @@ -575,6 +608,62 @@ abstract class AttributeValueConverter<T> } } + public static class GenericMapConverter extends AttributeValueConverter<Map> + { + + private final AttributeValueConverter<?> _keyConverter; + private final AttributeValueConverter<?> _valueConverter; + + + public GenericMapConverter(final Type keyType, final Type valueType) + { + _keyConverter = getConverter(getRawType(keyType), keyType); + + _valueConverter = getConverter(getRawType(valueType), valueType); + } + + + @Override + public Map convert(final Object value, final ConfiguredObject object) + { + if(value instanceof Map) + { + Map<?,?> original = (Map<?,?>)value; + Map converted = new LinkedHashMap(original.size()); + for(Map.Entry<?,?> entry : original.entrySet()) + { + converted.put(_keyConverter.convert(entry.getKey(),object), + _valueConverter.convert(entry.getValue(), object)); + } + return Collections.unmodifiableMap(converted); + } + else if(value == null) + { + return null; + } + else + { + if(value instanceof String) + { + String interpolated = AbstractConfiguredObject.interpolate(object, (String) value); + ObjectMapper objectMapper = new ObjectMapper(); + try + { + return convert(objectMapper.readValue(interpolated, Map.class), object); + } + catch (IOException e) + { + // fall through to the non-JSON single object case + } + } + + throw new IllegalArgumentException("Cannot convert type " + value.getClass() + " to a Map"); + } + + } + } + + static final class EnumConverter<X extends Enum<X>> extends AttributeValueConverter<X> { private final Class<X> _klazz; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredAutomatedAttribute.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredAutomatedAttribute.java index 9fca898dc0..342b7ac0ba 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredAutomatedAttribute.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredAutomatedAttribute.java @@ -28,6 +28,7 @@ import java.lang.reflect.Type; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.regex.Pattern; import org.apache.log4j.Logger; @@ -37,6 +38,7 @@ public class ConfiguredAutomatedAttribute<C extends ConfiguredObject, T> extend private final ManagedAttribute _annotation; private final Method _validValuesMethod; + private final Pattern _secureValuePattern; ConfiguredAutomatedAttribute(final Class<C> clazz, final Method getter, @@ -53,6 +55,16 @@ public class ConfiguredAutomatedAttribute<C extends ConfiguredObject, T> extend validValuesMethod = getValidValuesMethod(validValue, clazz); } _validValuesMethod = validValuesMethod; + + String secureValueFilter = _annotation.secureValueFilter(); + if (secureValueFilter == null || "".equals(secureValueFilter)) + { + _secureValuePattern = null; + } + else + { + _secureValuePattern = Pattern.compile(secureValueFilter); + } } private Method getValidValuesMethod(final String validValue, final Class<C> clazz) @@ -140,6 +152,11 @@ public class ConfiguredAutomatedAttribute<C extends ConfiguredObject, T> extend return _annotation.description(); } + public Pattern getSecureValueFilter() + { + return _secureValuePattern; + } + public Collection<String> validValues() { if(_validValuesMethod != null) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredDerivedAttribute.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredDerivedAttribute.java index 71488edb8c..20fd0264c6 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredDerivedAttribute.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredDerivedAttribute.java @@ -21,10 +21,12 @@ package org.apache.qpid.server.model; import java.lang.reflect.Method; +import java.util.regex.Pattern; public class ConfiguredDerivedAttribute<C extends ConfiguredObject, T> extends ConfiguredObjectAttribute<C,T> { private final DerivedAttribute _annotation; + private final Pattern _secureValuePattern; ConfiguredDerivedAttribute(final Class<C> clazz, final Method getter, @@ -32,6 +34,16 @@ public class ConfiguredDerivedAttribute<C extends ConfiguredObject, T> extends { super(clazz, getter); _annotation = annotation; + + String secureValueFilter = _annotation.secureValueFilter(); + if (secureValueFilter == null || "".equals(secureValueFilter)) + { + _secureValuePattern = null; + } + else + { + _secureValuePattern = Pattern.compile(secureValueFilter); + } } public boolean isAutomated() @@ -72,4 +84,10 @@ public class ConfiguredDerivedAttribute<C extends ConfiguredObject, T> extends return _annotation.description(); } + @Override + public Pattern getSecureValueFilter() + { + return _secureValuePattern; + } + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java index 89fda6798b..bfe9c8b15d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java @@ -239,6 +239,9 @@ public interface ConfiguredObject<X extends ConfiguredObject<X>> void setAttributes(Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException; Class<? extends ConfiguredObject> getCategoryClass(); + Class<? extends ConfiguredObject> getTypeClass(); + + boolean managesChildStorage(); <C extends ConfiguredObject<C>> C findConfiguredObject(Class<C> clazz, String name); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttribute.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttribute.java index 73b7839a8e..94610a6cb5 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttribute.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttribute.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.model; import java.lang.reflect.Method; import java.lang.reflect.Type; +import java.util.regex.Pattern; public abstract class ConfiguredObjectAttribute<C extends ConfiguredObject, T> extends ConfiguredObjectAttributeOrStatistic<C,T> { @@ -49,6 +50,25 @@ public abstract class ConfiguredObjectAttribute<C extends ConfiguredObject, T> e public abstract String getDescription(); + public abstract Pattern getSecureValueFilter(); + + public boolean isSecureValue(Object value) + { + if (isSecure()) + { + Pattern filter = getSecureValueFilter(); + if (filter == null) + { + return true; + } + else + { + return filter.matcher(String.valueOf(value)).matches(); + } + } + return false; + } + public T convert(final Object value, C object) { final AttributeValueConverter<T> converter = getConverter(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java index 27d914c639..5026df0e19 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java @@ -156,7 +156,7 @@ public class ConfiguredObjectFactoryImpl implements ConfiguredObjectFactory factory = categoryFactories.get(_defaultTypes.get(category)); if(factory == null) { - throw new NoFactoryForTypeException(category, _defaultTypes.get(category)); + throw new NoFactoryForTypeException(category, type); } } return factory; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java index d134c43bda..d0c6fb041e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java @@ -385,7 +385,7 @@ public class ConfiguredObjectTypeRegistry return null; } - private Class<? extends ConfiguredObject> getTypeClass(final Class<? extends ConfiguredObject> clazz) + public Class<? extends ConfiguredObject> getTypeClass(final Class<? extends ConfiguredObject> clazz) { String typeName = getType(clazz); Class<? extends ConfiguredObject> typeClass = null; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/DerivedAttribute.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/DerivedAttribute.java index e5c17a17e4..6de6bf25c3 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/DerivedAttribute.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/DerivedAttribute.java @@ -32,4 +32,5 @@ public @interface DerivedAttribute boolean persist() default false; String description() default ""; boolean oversize() default false; + String secureValueFilter() default ""; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ManagedAttribute.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ManagedAttribute.java index 05b2c610ba..2f96299703 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ManagedAttribute.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ManagedAttribute.java @@ -37,4 +37,5 @@ public @interface ManagedAttribute String[] validValues() default {}; boolean oversize() default false; String oversizedAltText() default ""; + String secureValueFilter() default ""; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java index 46fbaac3f2..ba1f262cfc 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java @@ -21,6 +21,8 @@ package org.apache.qpid.server.model; import java.util.Collection; +import java.util.List; +import java.util.Map; import org.apache.qpid.server.queue.QueueEntryVisitor; import org.apache.qpid.server.store.MessageDurability; @@ -48,6 +50,8 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X> String QUEUE_FLOW_STOPPED = "queueFlowStopped"; String MAXIMUM_MESSAGE_TTL = "maximumMessageTtl"; String MINIMUM_MESSAGE_TTL = "minimumMessageTtl"; + String DEFAULT_FILTERS = "defaultFilters"; + String ENSURE_NONDESTRUCTIVE_CONSUMERS = "ensureNondestructiveConsumers"; String QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT = "queue.minimumEstimatedMemoryFootprint"; @ManagedContextDefault( name = QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT) @@ -67,6 +71,9 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X> @ManagedAttribute( defaultValue = "NONE" ) ExclusivityPolicy getExclusive(); + @ManagedAttribute( defaultValue = "false" ) + boolean isEnsureNondestructiveConsumers(); + @DerivedAttribute( persist = true ) String getOwner(); @@ -155,6 +162,9 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X> @ManagedAttribute long getMaximumMessageTtl(); + @ManagedAttribute + Map<String, Map<String,List<String>>> getDefaultFilters(); + //children Collection<? extends Binding> getBindings(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/RemoteReplicationNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/RemoteReplicationNode.java index 21cbfcf194..3b751a3a10 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/RemoteReplicationNode.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/RemoteReplicationNode.java @@ -20,9 +20,6 @@ */ package org.apache.qpid.server.model; -import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.model.ManagedObject; - @ManagedObject(category=true, managesChildren=false, creatable=false) public interface RemoteReplicationNode<X extends RemoteReplicationNode<X>> extends ConfiguredObject<X> { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemConfig.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemConfig.java index a69808180d..c1a05a9e35 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemConfig.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemConfig.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.model; +import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.LogRecorder; import org.apache.qpid.server.store.DurableConfigurationStore; @@ -37,6 +38,9 @@ public interface SystemConfig<X extends SystemConfig<X>> extends ConfiguredObjec String INITIAL_CONFIGURATION_LOCATION = "initialConfigurationLocation"; String STARTUP_LOGGED_TO_SYSTEM_OUT = "startupLoggedToSystemOut"; + @ManagedContextDefault(name = BrokerProperties.POSIX_FILE_PERMISSIONS) + String DEFAULT_POSIX_FILE_PERMISSIONS = "rw-r-----"; + @ManagedAttribute(defaultValue = "false") boolean isManagementMode(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java index 79f37b66cb..cc758ba7c9 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java @@ -22,14 +22,16 @@ package org.apache.qpid.server.model; import java.security.AccessControlException; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.UUID; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.store.MessageStore; -@ManagedObject( managesChildren = true, defaultType = "ProvidedStore") +@ManagedObject( defaultType = "ProvidedStore") public interface VirtualHost<X extends VirtualHost<X, Q, E>, Q extends Queue<?>, E extends Exchange<?> > extends ConfiguredObject<X> { @@ -42,6 +44,9 @@ public interface VirtualHost<X extends VirtualHost<X, Q, E>, Q extends Queue<?>, String STORE_TRANSACTION_OPEN_TIMEOUT_WARN = "storeTransactionOpenTimeoutWarn"; String HOUSE_KEEPING_THREAD_COUNT = "houseKeepingThreadCount"; String MODEL_VERSION = "modelVersion"; + String ENABLED_CONNECTION_VALIDATORS = "enabledConnectionValidators"; + String DISABLED_CONNECTION_VALIDATORS = "disabledConnectionValidators"; + String GLOBAL_ADDRESS_DOMAINS = "globalAddressDomains"; @ManagedContextDefault( name = "queue.deadLetterQueueEnabled") public static final boolean DEFAULT_DEAD_LETTER_QUEUE_ENABLED = false; @@ -88,6 +93,21 @@ public interface VirtualHost<X extends VirtualHost<X, Q, E>, Q extends Queue<?>, @DerivedAttribute( persist = true ) String getModelVersion(); + @ManagedContextDefault( name = "virtualhost.enabledConnectionValidators") + String DEFAULT_ENABLED_VALIDATORS = "[]"; + + @ManagedAttribute( defaultValue = "${virtualhost.enabledConnectionValidators}") + List<String> getEnabledConnectionValidators(); + + @ManagedContextDefault( name = "virtualhost.disabledConnectionValidators") + String DEFAULT_DISABLED_VALIDATORS = "[]"; + + @ManagedAttribute( defaultValue = "${virtualhost.disabledConnectionValidators}") + List<String> getDisabledConnectionValidators(); + + @ManagedAttribute( defaultValue = "[]") + List<String> getGlobalAddressDomains(); + @ManagedStatistic long getQueueCount(); @@ -129,6 +149,8 @@ public interface VirtualHost<X extends VirtualHost<X, Q, E>, Q extends Queue<?>, void delete(); + String getRedirectHost(AmqpPort<?> port); + public static interface Transaction { void dequeue(MessageInstance entry); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java index fa35e725c9..3bc19ef7bd 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java @@ -24,7 +24,7 @@ import java.util.Collection; import org.apache.qpid.server.store.DurableConfigurationStore; -@ManagedObject(category=true, managesChildren=false) +@ManagedObject(category=true, managesChildren=true) public interface VirtualHostNode<X extends VirtualHostNode<X>> extends ConfiguredObject<X> { String QPID_INITIAL_CONFIG_VIRTUALHOST_CONFIG_VAR = "qpid.initial_config_virtualhost_config"; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java index eae438754b..0cbb80d722 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java @@ -80,7 +80,7 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection { Map<String,Object> attributes = new HashMap<String, Object>(); attributes.put(ID, UUID.randomUUID()); - attributes.put(NAME, _connection.getRemoteAddressString().replaceAll("/", "")); + attributes.put(NAME, "[" + _connection.getConnectionId() + "] " + _connection.getRemoteAddressString().replaceAll("/", "")); attributes.put(DURABLE, false); return attributes; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProvider.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProvider.java index 631ed3e8f7..cb9727f3f6 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProvider.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProvider.java @@ -25,7 +25,7 @@ import org.apache.qpid.server.model.GroupProvider; import org.apache.qpid.server.model.ManagedAttribute; import org.apache.qpid.server.model.ManagedObject; -@ManagedObject( category = false, type = "GroupFile" ) +@ManagedObject( category = false, type = "GroupFile", managesChildren = true ) public interface FileBasedGroupProvider<X extends FileBasedGroupProvider<X>> extends GroupProvider<X>, GroupManagingGroupProvider { String PATH="path"; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java index 19aec414de..327b7ddfe9 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java @@ -34,6 +34,7 @@ import java.util.UUID; import org.apache.log4j.Logger; +import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.Broker; @@ -50,6 +51,7 @@ import org.apache.qpid.server.security.access.Operation; import org.apache.qpid.server.security.auth.UsernamePrincipal; import org.apache.qpid.server.security.group.FileGroupDatabase; import org.apache.qpid.server.security.group.GroupPrincipal; +import org.apache.qpid.server.util.FileHelper; public class FileBasedGroupProviderImpl extends AbstractConfiguredObject<FileBasedGroupProviderImpl> implements FileBasedGroupProvider<FileBasedGroupProviderImpl> @@ -162,9 +164,11 @@ public class FileBasedGroupProviderImpl { throw new IllegalConfigurationException(String.format("Cannot create groups file at '%s'",_path)); } + try { - file.createNewFile(); + String posixFileAttributes = getContextValue(String.class, BrokerProperties.POSIX_FILE_PERMISSIONS); + new FileHelper().createNewFile(file, posixFileAttributes); } catch (IOException e) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java index e3ded3006d..7046f2973e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java @@ -21,14 +21,14 @@ package org.apache.qpid.server.model.adapter; -import java.io.ByteArrayOutputStream; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.FileLock; import java.nio.channels.OverlappingFileLockException; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -38,6 +38,9 @@ import java.util.Set; import java.util.TreeMap; import org.apache.log4j.Logger; +import org.apache.qpid.server.configuration.BrokerProperties; +import org.apache.qpid.server.util.BaseAction; +import org.apache.qpid.server.util.FileHelper; import org.codehaus.jackson.JsonParser; import org.codehaus.jackson.JsonProcessingException; import org.codehaus.jackson.map.ObjectMapper; @@ -118,7 +121,7 @@ public class FileSystemPreferencesProviderImpl FileSystemPreferencesStore store = new FileSystemPreferencesStore(new File(_path)); // we need to check and create file if it does not exist every time on open - store.createIfNotExist(); + store.createIfNotExist(getContextValue(String.class, BrokerProperties.POSIX_FILE_PERMISSIONS)); store.open(); _store = store; _open = true; @@ -184,6 +187,7 @@ public class FileSystemPreferencesProviderImpl if(_store != null) { + _store.close(); _store.delete(); deleted(); _authenticationProvider.setPreferencesProvider(null); @@ -280,7 +284,7 @@ public class FileSystemPreferencesProviderImpl else { FileSystemPreferencesStore store = new FileSystemPreferencesStore(new File(_path)); - store.createIfNotExist(); + store.createIfNotExist(getContextValue(String.class, BrokerProperties.POSIX_FILE_PERMISSIONS)); store.open(); _store = store; } @@ -334,9 +338,9 @@ public class FileSystemPreferencesProviderImpl { private final ObjectMapper _objectMapper; private final Map<String, Map<String, Object>> _preferences; + private final FileHelper _fileHelper; private File _storeFile; private FileLock _storeLock; - private RandomAccessFile _storeRAF; public FileSystemPreferencesStore(File preferencesFile) { @@ -345,9 +349,10 @@ public class FileSystemPreferencesProviderImpl _objectMapper.configure(SerializationConfig.Feature.INDENT_OUTPUT, true); _objectMapper.configure(JsonParser.Feature.ALLOW_COMMENTS, true); _preferences = new TreeMap<String, Map<String, Object>>(); + _fileHelper = new FileHelper(); } - public void createIfNotExist() + public void createIfNotExist(String filePermissions) { if (!_storeFile.exists()) { @@ -358,7 +363,8 @@ public class FileSystemPreferencesProviderImpl } try { - if (_storeFile.createNewFile() && !_storeFile.exists()) + Path path = _fileHelper.createNewFile(_storeFile, filePermissions); + if (!Files.exists(path)) { throw new IllegalConfigurationException(String.format("Cannot create preferences store file at '%s'", _storeFile.getAbsolutePath())); } @@ -391,43 +397,20 @@ public class FileSystemPreferencesProviderImpl } try { - _storeRAF = new RandomAccessFile(_storeFile, "rw"); - FileChannel fileChannel = _storeRAF.getChannel(); - try - { - _storeLock = fileChannel.tryLock(); - } - catch (OverlappingFileLockException e) - { - _storeLock = null; - } - if (_storeLock == null) + getFileLock(_storeFile.getPath() + ".lck"); + if (_storeFile.length() > 0) { - throw new IllegalConfigurationException("Cannot get lock on store file " + _storeFile.getName() - + " is another instance running?"); - } - long fileSize = fileChannel.size(); - if (fileSize > 0) - { - ByteBuffer buffer = ByteBuffer.allocate((int) fileSize); - fileChannel.read(buffer); - buffer.rewind(); - buffer.flip(); - byte[] data = buffer.array(); - try - { - Map<String, Map<String, Object>> preferencesMap = _objectMapper.readValue(data, - new TypeReference<Map<String, Map<String, Object>>>() - { - }); - _preferences.putAll(preferencesMap); - } - catch (JsonProcessingException e) - { - throw new IllegalConfigurationException("Cannot parse preferences json in " + _storeFile.getName(), e); - } + Map<String, Map<String, Object>> preferencesMap = _objectMapper.readValue(_storeFile, + new TypeReference<Map<String, Map<String, Object>>>() + { + }); + _preferences.putAll(preferencesMap); } } + catch (JsonProcessingException e) + { + throw new IllegalConfigurationException("Cannot parse preferences json in " + _storeFile.getName(), e); + } catch (IOException e) { throw new IllegalConfigurationException("Cannot load preferences from " + _storeFile.getName(), e); @@ -443,6 +426,7 @@ public class FileSystemPreferencesProviderImpl if (_storeLock != null) { _storeLock.release(); + _storeLock.channel().close(); } } catch (IOException e) @@ -452,22 +436,7 @@ public class FileSystemPreferencesProviderImpl finally { _storeLock = null; - try - { - if (_storeRAF != null) - { - _storeRAF.close(); - } - } - catch (IOException e) - { - LOGGER.error("Cannot close preferences file", e); - } - finally - { - _storeRAF = null; - _preferences.clear(); - } + _preferences.clear(); } } } @@ -544,16 +513,14 @@ public class FileSystemPreferencesProviderImpl checkStoreOpened(); try { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - _objectMapper.writeValue(baos, _preferences); - FileChannel channel = _storeRAF.getChannel(); - long currentSize = channel.size(); - channel.position(0); - channel.write(ByteBuffer.wrap(baos.toByteArray())); - if (currentSize > baos.size()) + _fileHelper.writeFileSafely(_storeFile.toPath(), new BaseAction<File, IOException>() { - channel.truncate(baos.size()); - } + @Override + public void performAction(File file) throws IOException + { + _objectMapper.writeValue(file, _preferences); + } + }); } catch (IOException e) { @@ -569,5 +536,32 @@ public class FileSystemPreferencesProviderImpl } } + private void getFileLock(String lockFilePath) + { + File lockFile = new File(lockFilePath); + try + { + lockFile.createNewFile(); + lockFile.deleteOnExit(); + + @SuppressWarnings("resource") + FileOutputStream out = new FileOutputStream(lockFile); + FileChannel channel = out.getChannel(); + _storeLock = channel.tryLock(); + } + catch (IOException ioe) + { + throw new IllegalStateException("Cannot create the lock file " + lockFile.getName(), ioe); + } + catch(OverlappingFileLockException e) + { + _storeLock = null; + } + + if(_storeLock == null) + { + throw new IllegalStateException("Cannot get lock on file " + lockFile.getAbsolutePath() + ". Is another instance running?"); + } + } } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConnectionValidator.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConnectionValidator.java new file mode 100644 index 0000000000..11f8944863 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConnectionValidator.java @@ -0,0 +1,28 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.plugin; + +import org.apache.qpid.server.protocol.AMQConnectionModel; + +public interface ConnectionValidator extends Pluggable +{ + boolean validateConnectionCreation(AMQConnectionModel<?, ?> connection); +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFilterFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFilterFactory.java new file mode 100644 index 0000000000..372642310e --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFilterFactory.java @@ -0,0 +1,30 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.plugin; + +import java.util.List; + +import org.apache.qpid.server.filter.MessageFilter; + +public interface MessageFilterFactory extends Pluggable +{ + MessageFilter newInstance(List<String> arguments); +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/QpidServiceLoader.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/QpidServiceLoader.java index f70afb12ba..e6100efda7 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/QpidServiceLoader.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/QpidServiceLoader.java @@ -19,8 +19,11 @@ package org.apache.qpid.server.plugin; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.ServiceLoader; import org.apache.log4j.Logger; @@ -47,6 +50,16 @@ public class QpidServiceLoader return instancesOf(clazz, true); } + public <C extends Pluggable> Map<String,C> getInstancesByType(Class<C> clazz) + { + Map<String,C> instances = new HashMap<>(); + for(C instance : instancesOf(clazz)) + { + instances.put(instance.getType(), instance); + } + return Collections.unmodifiableMap(instances); + } + private <C extends Pluggable> Iterable<C> instancesOf(Class<C> clazz, boolean atLeastOne) { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 60999fb2be..639d569e8f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -30,7 +30,9 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; @@ -52,6 +54,7 @@ import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException; import org.apache.qpid.server.filter.FilterManager; +import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; @@ -75,6 +78,8 @@ import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.QueueNotificationListener; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.StateTransition; +import org.apache.qpid.server.plugin.MessageFilterFactory; +import org.apache.qpid.server.plugin.QpidServiceLoader; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.SecurityManager; @@ -186,6 +191,9 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> @ManagedAttributeField private MessageDurability _messageDurability; + @ManagedAttributeField + private Map<String, Map<String,List<String>>> _defaultFilters; + private Object _exclusiveOwner; // could be connection, session, Principal or a String for the container name private final Set<NotificationCheck> _notificationChecks = @@ -241,12 +249,15 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> private long _minimumMessageTtl; @ManagedAttributeField private long _maximumMessageTtl; + @ManagedAttributeField + private boolean _ensureNondestructiveConsumers; private final AtomicBoolean _recovering = new AtomicBoolean(true); private final ConcurrentLinkedQueue<EnqueueRequest> _postRecoveryQueue = new ConcurrentLinkedQueue<>(); private final QueueRunner _queueRunner = new QueueRunner(this); private boolean _closing; + private final ConcurrentMap<String,MessageFilter> _defaultFiltersMap = new ConcurrentHashMap<>(); protected AbstractQueue(Map<String, Object> attributes, VirtualHostImpl virtualHost) { @@ -283,11 +294,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> }); } - if (isDurable()) - { - _virtualHost.getDurableConfigurationStore().create(asObjectRecord()); - } - else if(getMessageDurability() != MessageDurability.NEVER) + if(!isDurable() && getMessageDurability() != MessageDurability.NEVER) { Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), new PrivilegedAction<Object>() @@ -351,17 +358,9 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> case PRINCIPAL: _exclusiveOwner = sessionModel.getConnectionModel().getAuthorizedPrincipal(); - if(isDurable()) - { - _virtualHost.getDurableConfigurationStore().update(false,asObjectRecord()); - } break; case CONTAINER: _exclusiveOwner = sessionModel.getConnectionModel().getRemoteContainerName(); - if(isDurable()) - { - _virtualHost.getDurableConfigurationStore().update(false,asObjectRecord()); - } break; case CONNECTION: _exclusiveOwner = sessionModel.getConnectionModel(); @@ -450,6 +449,40 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } _maxAsyncDeliveries = getContextValue(Integer.class, Queue.MAX_ASYNCHRONOUS_DELIVERIES); + + if(_defaultFilters != null) + { + QpidServiceLoader qpidServiceLoader = new QpidServiceLoader(); + final Map<String, MessageFilterFactory> messageFilterFactories = + qpidServiceLoader.getInstancesByType(MessageFilterFactory.class); + + for (Map.Entry<String,Map<String,List<String>>> entry : _defaultFilters.entrySet()) + { + String name = String.valueOf(entry.getKey()); + Map<String, List<String>> filterValue = entry.getValue(); + if(filterValue.size() == 1) + { + String filterTypeName = String.valueOf(filterValue.keySet().iterator().next()); + MessageFilterFactory filterFactory = messageFilterFactories.get(filterTypeName); + if(filterFactory != null) + { + List<String> filterArguments = filterValue.values().iterator().next(); + _defaultFiltersMap.put(name, filterFactory.newInstance(filterArguments)); + } + else + { + throw new IllegalArgumentException("Unknown filter type " + filterTypeName + ", known types are: " + messageFilterFactories.keySet()); + } + } + else + { + throw new IllegalArgumentException("Filter value should be a map with one entry, having the type as key and the value being the filter arguments, not " + filterValue); + + } + + } + } + updateAlertChecks(); } @@ -555,6 +588,12 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } @Override + public Map<String, Map<String, List<String>>> getDefaultFilters() + { + return _defaultFilters; + } + + @Override public final MessageDurability getMessageDurability() { return _messageDurability; @@ -573,6 +612,14 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } @Override + public boolean isEnsureNondestructiveConsumers() + { + return _ensureNondestructiveConsumers; + } + + + + @Override public Collection<String> getAvailableAttributes() { return new ArrayList<String>(_arguments.keySet()); @@ -603,7 +650,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> @Override public synchronized QueueConsumerImpl addConsumer(final ConsumerTarget target, - final FilterManager filters, + FilterManager filters, final Class<? extends ServerMessage> messageClass, final String consumerName, EnumSet<ConsumerImpl.Option> optionSet) @@ -699,6 +746,26 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> { throw new ExistingConsumerPreventsExclusive(); } + if(!_defaultFiltersMap.isEmpty()) + { + if(filters == null) + { + filters = new FilterManager(); + } + for (Map.Entry<String,MessageFilter> filter : _defaultFiltersMap.entrySet()) + { + if(!filters.hasFilter(filter.getKey())) + { + filters.add(filter.getKey(), filter.getValue()); + } + } + } + + if(_ensureNondestructiveConsumers) + { + optionSet = EnumSet.copyOf(optionSet); + optionSet.removeAll(EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES, ConsumerImpl.Option.ACQUIRES)); + } QueueConsumerImpl consumer = new QueueConsumerImpl(this, target, diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java index 49732e8345..367a12057d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java @@ -62,11 +62,16 @@ public class QueueArgumentsConverter public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue"; + public static final String QPID_DEFAULT_FILTERS = "qpid.default_filters"; + + public static final String QPID_ENSURE_NONDESTRUCTIVE_CONSUMERS = "qpid.ensure_nondestructive_consumers"; /** * No-local queue argument is used to support the no-local feature of Durable Subscribers. */ public static final String QPID_NO_LOCAL = "no-local"; + static final Map<String, String> ATTRIBUTE_MAPPINGS = new LinkedHashMap<String, String>(); + static { ATTRIBUTE_MAPPINGS.put(X_QPID_MINIMUM_ALERT_REPEAT_GAP, Queue.ALERT_REPEAT_GAP); @@ -99,6 +104,8 @@ public class QueueArgumentsConverter ATTRIBUTE_MAPPINGS.put(QPID_NO_LOCAL, Queue.NO_LOCAL); ATTRIBUTE_MAPPINGS.put(QPID_MESSAGE_DURABILITY, Queue.MESSAGE_DURABILITY); + ATTRIBUTE_MAPPINGS.put(QPID_DEFAULT_FILTERS, Queue.DEFAULT_FILTERS); + ATTRIBUTE_MAPPINGS.put(QPID_ENSURE_NONDESTRUCTIVE_CONSUMERS, Queue.ENSURE_NONDESTRUCTIVE_CONSUMERS); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 452c5ff14f..917c951b6d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -371,11 +371,16 @@ public abstract class QueueEntryImpl implements QueueEntry } } - private void dequeue() + private boolean dequeue() { EntryState state = _state; - if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE)) + while(state.getState() == State.ACQUIRED && !_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE)) + { + state = _state; + } + + if(state.getState() == State.ACQUIRED) { if (state instanceof ConsumerAcquiredState || state instanceof LockedAcquiredState) { @@ -387,7 +392,11 @@ public abstract class QueueEntryImpl implements QueueEntry { notifyStateChange(state.getState() , QueueEntry.State.DEQUEUED); } - + return true; + } + else + { + return false; } } @@ -420,9 +429,10 @@ public abstract class QueueEntryImpl implements QueueEntry public void delete() { - dequeue(); - - dispose(); + if(dequeue()) + { + dispose(); + } } public int routeToAlternate(final Action<? super MessageInstance> action, ServerTransaction txn) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStore.java index 0607f4b3d3..8b6a83d443 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStore.java @@ -62,7 +62,7 @@ public interface FileKeyStore<X extends FileKeyStore<X>> extends KeyStore<X> @ManagedAttribute(defaultValue = "${this:path}") String getDescription(); - @ManagedAttribute( mandatory = true, secure = true, oversize = true, oversizedAltText = OVER_SIZED_ATTRIBUTE_ALTERNATIVE_TEXT) + @ManagedAttribute( mandatory = true, secure = true, oversize = true, oversizedAltText = OVER_SIZED_ATTRIBUTE_ALTERNATIVE_TEXT, secureValueFilter = "^data\\:.*") String getStoreUrl(); @DerivedAttribute diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStore.java index 78509182b5..f239b83f27 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStore.java @@ -31,7 +31,7 @@ public interface NonJavaKeyStore<X extends NonJavaKeyStore<X>> extends KeyStore< @ManagedAttribute(defaultValue = "${this:subjectName}") String getDescription(); - @ManagedAttribute( mandatory = true, secure = true, oversize = true, oversizedAltText = OVER_SIZED_ATTRIBUTE_ALTERNATIVE_TEXT ) + @ManagedAttribute( mandatory = true, secure = true, oversize = true, oversizedAltText = OVER_SIZED_ATTRIBUTE_ALTERNATIVE_TEXT, secureValueFilter = "^data\\:.*") String getPrivateKeyUrl(); @ManagedAttribute( mandatory = true, oversize = true, oversizedAltText = OVER_SIZED_ATTRIBUTE_ALTERNATIVE_TEXT ) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/database/AbstractPasswordFilePrincipalDatabase.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/database/AbstractPasswordFilePrincipalDatabase.java index cb5bc54cd2..2c692ddf4d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/database/AbstractPasswordFilePrincipalDatabase.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/database/AbstractPasswordFilePrincipalDatabase.java @@ -22,6 +22,8 @@ package org.apache.qpid.server.security.auth.database; import org.apache.log4j.Logger; import org.apache.qpid.server.security.auth.UsernamePrincipal; +import org.apache.qpid.server.util.BaseAction; +import org.apache.qpid.server.util.FileHelper; import javax.security.auth.callback.PasswordCallback; import javax.security.auth.login.AccountNotFoundException; @@ -36,7 +38,6 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; @@ -45,9 +46,9 @@ public abstract class AbstractPasswordFilePrincipalDatabase<U extends PasswordPr protected static final String DEFAULT_ENCODING = "utf-8"; private final Pattern _regexp = Pattern.compile(":"); - private final Map<String, U> _userMap = new HashMap<String, U>(); + private final Map<String, U> _userMap = new HashMap<>(); private final ReentrantLock _userUpdate = new ReentrantLock(); - private final Random _random = new Random(); + private final FileHelper _fileHelper = new FileHelper(); private File _passwordFile; public final void open(File passwordFile) throws IOException @@ -181,7 +182,7 @@ public abstract class AbstractPasswordFilePrincipalDatabase<U extends PasswordPr try { _userUpdate.lock(); - final Map<String, U> newUserMap = new HashMap<String, U>(); + final Map<String, U> newUserMap = new HashMap<>(); BufferedReader reader = null; try @@ -224,71 +225,33 @@ public abstract class AbstractPasswordFilePrincipalDatabase<U extends PasswordPr protected abstract Logger getLogger(); - protected File createTempFileOnSameFilesystem() - { - File liveFile = _passwordFile; - File tmp; - - do - { - tmp = new File(liveFile.getPath() + _random.nextInt() + ".tmp"); - } - while(tmp.exists()); - tmp.deleteOnExit(); - return tmp; - } - - protected void swapTempFileToLive(final File temp) throws IOException + protected void savePasswordFile() throws IOException { - File live = _passwordFile; - // Remove any existing ".old" file - final File old = new File(live.getAbsoluteFile() + ".old"); - if (old.exists()) + try { - old.delete(); - } + _userUpdate.lock(); - // Create an new ".old" file - if(!live.renameTo(old)) - { - //unable to rename the existing file to the backup name - getLogger().error("Could not backup the existing password file"); - throw new IOException("Could not backup the existing password file"); + _fileHelper.writeFileSafely(_passwordFile.toPath(), new BaseAction<File,IOException>() + { + @Override + public void performAction(File file) throws IOException + { + writeToFile(file); + } + }); } - - // Move temp file to be the new "live" file - if(!temp.renameTo(live)) + finally { - //failed to rename the new file to the required filename - if(!old.renameTo(live)) - { - //unable to return the backup to required filename - getLogger().error( - "Could not rename the new password file into place, and unable to restore original file"); - throw new IOException("Could not rename the new password file into place, and unable to restore original file"); - } - - getLogger().error("Could not rename the new password file into place"); - throw new IOException("Could not rename the new password file into place"); + _userUpdate.unlock(); } } - protected void savePasswordFile() throws IOException + private void writeToFile(File tmp) throws IOException { - try - { - _userUpdate.lock(); - - BufferedReader reader = null; - PrintStream writer = null; - - File tmp = createTempFileOnSameFilesystem(); - - try + try(PrintStream writer = new PrintStream(tmp); + BufferedReader reader = new BufferedReader(new FileReader(_passwordFile))) { - writer = new PrintStream(tmp); - reader = new BufferedReader(new FileReader(_passwordFile)); String line; while ((line = reader.readLine()) != null) @@ -346,32 +309,6 @@ public abstract class AbstractPasswordFilePrincipalDatabase<U extends PasswordPr getLogger().error("Unable to create the new password file: " + e); throw new IOException("Unable to create the new password file",e); } - finally - { - - try - { - if (reader != null) - { - reader.close(); - } - } - finally - { - if (writer != null) - { - writer.close(); - } - } - - } - - swapTempFileToLive(tmp); - } - finally - { - _userUpdate.unlock(); - } } protected abstract U createUserFromPassword(Principal principal, char[] passwd); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/Base64MD5PasswordDatabaseAuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/Base64MD5PasswordDatabaseAuthenticationManager.java index 6fe649ff04..0b5e17306c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/Base64MD5PasswordDatabaseAuthenticationManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/Base64MD5PasswordDatabaseAuthenticationManager.java @@ -28,7 +28,7 @@ import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; import org.apache.qpid.server.security.auth.database.Base64MD5PasswordFilePrincipalDatabase; import org.apache.qpid.server.security.auth.database.PrincipalDatabase; -@ManagedObject( category = false, type = "Base64MD5PasswordFile" ) +@ManagedObject( category = false, managesChildren = true, type = "Base64MD5PasswordFile" ) public class Base64MD5PasswordDatabaseAuthenticationManager extends PrincipalDatabaseAuthenticationManager<Base64MD5PasswordDatabaseAuthenticationManager> { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PlainPasswordDatabaseAuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PlainPasswordDatabaseAuthenticationManager.java index e6d2fcf44c..b86bfac0ad 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PlainPasswordDatabaseAuthenticationManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PlainPasswordDatabaseAuthenticationManager.java @@ -28,7 +28,7 @@ import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; import org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase; import org.apache.qpid.server.security.auth.database.PrincipalDatabase; -@ManagedObject( category = false, type = "PlainPasswordFile" ) +@ManagedObject( category = false, managesChildren = true, type = "PlainPasswordFile" ) public class PlainPasswordDatabaseAuthenticationManager extends PrincipalDatabaseAuthenticationManager<PlainPasswordDatabaseAuthenticationManager> { public static final String PROVIDER_TYPE = "PlainPasswordFile"; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java index d3c9635502..cf165ff4af 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java @@ -23,6 +23,8 @@ package org.apache.qpid.server.security.auth.manager; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; import java.security.AccessControlException; import java.security.Principal; import java.util.Collection; @@ -40,6 +42,7 @@ import javax.security.sasl.SaslServer; import org.apache.log4j.Logger; +import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.Broker; @@ -56,6 +59,7 @@ import org.apache.qpid.server.security.auth.AuthenticationResult; import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus; import org.apache.qpid.server.security.auth.UsernamePrincipal; import org.apache.qpid.server.security.auth.database.PrincipalDatabase; +import org.apache.qpid.server.util.FileHelper; public abstract class PrincipalDatabaseAuthenticationManager<T extends PrincipalDatabaseAuthenticationManager<T>> extends AbstractAuthenticationManager<T> @@ -96,7 +100,11 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal { try { - passwordFile.createNewFile(); + Path path = new FileHelper().createNewFile(passwordFile, getContextValue(String.class, BrokerProperties.POSIX_FILE_PERMISSIONS)); + if (!Files.exists(path)) + { + throw new IllegalConfigurationException(String.format("Cannot create password file at '%s'", _path)); + } } catch (IOException e) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/FileGroupDatabase.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/FileGroupDatabase.java index 0a02ce38fc..1e9e646e35 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/FileGroupDatabase.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/FileGroupDatabase.java @@ -34,6 +34,8 @@ import java.util.concurrent.ConcurrentSkipListSet; import org.apache.commons.lang.StringUtils; import org.apache.log4j.Logger; +import org.apache.qpid.server.util.BaseAction; +import org.apache.qpid.server.util.FileHelper; import org.apache.qpid.server.util.ServerScopedRuntimeException; /** @@ -232,9 +234,9 @@ public class FileGroupDatabase implements GroupDatabase } } - private synchronized void writeGroupFile(String groupFile) throws IOException + private synchronized void writeGroupFile(final String groupFile) throws IOException { - Properties propertiesFile = new Properties(); + final Properties propertiesFile = new Properties(); for (String group : _groupToUserMap.keySet()) { @@ -244,19 +246,19 @@ public class FileGroupDatabase implements GroupDatabase propertiesFile.setProperty(group + ".users", userList); } - String comment = "Written " + new Date(); - FileOutputStream fileOutputStream = new FileOutputStream(groupFile); - try - { - propertiesFile.store(fileOutputStream, comment); - } - finally + + new FileHelper().writeFileSafely(new File(groupFile).toPath(), new BaseAction<File, IOException>() { - if(fileOutputStream != null) + @Override + public void performAction(File file) throws IOException { - fileOutputStream.close(); + String comment = "Written " + new Date(); + try(FileOutputStream fileOutputStream = new FileOutputStream(file)) + { + propertiesFile.store(fileOutputStream, comment); + } } - } + }); } private void validatePropertyNameIsGroupName(String propertyName) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java index 0de6543713..ad671d7e99 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java @@ -27,6 +27,8 @@ import java.io.IOException; import java.nio.channels.FileChannel; import java.nio.channels.FileLock; import java.nio.channels.OverlappingFileLockException; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -40,6 +42,9 @@ import java.util.Map; import java.util.UUID; import org.apache.log4j.Logger; +import org.apache.qpid.server.configuration.BrokerProperties; +import org.apache.qpid.server.util.BaseAction; +import org.apache.qpid.server.util.FileHelper; import org.codehaus.jackson.JsonGenerator; import org.codehaus.jackson.JsonProcessingException; import org.codehaus.jackson.Version; @@ -85,6 +90,7 @@ public class JsonFileConfigStore implements DurableConfigurationStore private final Map<String, List<UUID>> _idsByType = new HashMap<String, List<UUID>>(); private final ObjectMapper _objectMapper = new ObjectMapper(); private final Class<? extends ConfiguredObject> _rootClass; + private final FileHelper _fileHelper; private Map<String,Class<? extends ConfiguredObject>> _classNameMapping; private String _directoryName; @@ -123,6 +129,7 @@ public class JsonFileConfigStore implements DurableConfigurationStore _objectMapper.registerModule(_module); _objectMapper.enable(SerializationConfig.Feature.INDENT_OUTPUT); _rootClass = rootClass; + _fileHelper = new FileHelper(); } @Override @@ -173,7 +180,7 @@ public class JsonFileConfigStore implements DurableConfigurationStore _directoryName = fileFromSettings.getParent(); _configFileName = fileFromSettings.getName(); _backupFileName = fileFromSettings.getName() + ".bak"; - _tempFileName = fileFromSettings.getName() + ".tmp";; + _tempFileName = fileFromSettings.getName() + ".tmp"; _lockFileName = fileFromSettings.getName() + ".lck"; } @@ -191,56 +198,45 @@ public class JsonFileConfigStore implements DurableConfigurationStore checkDirectoryIsWritable(_directoryName); getFileLock(); - if(!fileExists(_configFileName)) + Path storeFile = new File(_directoryName, _configFileName).toPath(); + Path backupFile = new File(_directoryName, _backupFileName).toPath(); + if(!Files.exists(storeFile)) { - if(!fileExists(_backupFileName)) + if(!Files.exists(backupFile)) { - File newFile = new File(_directoryName, _configFileName); try { - _objectMapper.writeValue(newFile, Collections.emptyMap()); + String posixFileAttributes = _parent.getContextValue(String.class, BrokerProperties.POSIX_FILE_PERMISSIONS); + storeFile = _fileHelper.createNewFile(storeFile, posixFileAttributes); + _objectMapper.writeValue(storeFile.toFile(), Collections.emptyMap()); } catch (IOException e) { - throw new StoreException("Could not write configuration file " + newFile, e); + throw new StoreException("Could not write configuration file " + storeFile, e); } } else { - renameFile(_backupFileName, _configFileName); + try + { + _fileHelper.atomicFileMoveOrReplace(backupFile, storeFile); + } + catch (IOException e) + { + throw new StoreException("Could not move backup to configuration file " + storeFile, e); + } } } - deleteFileIfExists(_backupFileName); - } - - private void renameFile(String fromFileName, String toFileName) - { - File toFile = deleteFileIfExists(toFileName); - File fromFile = new File(_directoryName, fromFileName); - if(!fromFile.renameTo(toFile)) + try { - throw new StoreException("Cannot rename file " + fromFile.getAbsolutePath() + " to " + toFile.getAbsolutePath()); + Files.deleteIfExists(backupFile); } - } - - private File deleteFileIfExists(final String toFileName) - { - File toFile = new File(_directoryName, toFileName); - if(toFile.exists()) + catch (IOException e) { - if(!toFile.delete()) - { - throw new StoreException("Cannot delete file " + toFile.getAbsolutePath()); - } + throw new StoreException("Could not delete backup file " + backupFile, e); } - return toFile; - } - private boolean fileExists(String fileName) - { - File file = new File(_directoryName, fileName); - return file.exists(); } private void getFileLock() @@ -396,7 +392,7 @@ public class JsonFileConfigStore implements DurableConfigurationStore private void save() { UUID rootId = getRootId(); - Map<String, Object> data = null; + final Map<String, Object> data; if (rootId == null) { @@ -409,15 +405,18 @@ public class JsonFileConfigStore implements DurableConfigurationStore try { - deleteFileIfExists(_tempFileName); - deleteFileIfExists(_backupFileName); - - File tmpFile = new File(_directoryName, _tempFileName); - _objectMapper.writeValue(tmpFile, data); - renameFile(_configFileName, _backupFileName); - renameFile(tmpFile.getName(),_configFileName); - tmpFile.delete(); - deleteFileIfExists(_backupFileName); + Path tmpFile = new File(_directoryName, _tempFileName).toPath(); + _fileHelper.writeFileSafely( new File(_directoryName, _configFileName).toPath(), + new File(_directoryName, _backupFileName).toPath(), + tmpFile, + new BaseAction<File, IOException>() + { + @Override + public void performAction(File file) throws IOException + { + _objectMapper.writeValue(file, data); + } + }); } catch (IOException e) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java index 10d8a5d61c..c59fd821c3 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java @@ -20,8 +20,10 @@ */ package org.apache.qpid.server.store; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; @@ -30,14 +32,19 @@ import java.util.UUID; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.configuration.IllegalConfigurationException; +import org.apache.qpid.server.configuration.store.StoreConfigurationChangeListener; import org.apache.qpid.server.filter.FilterSupport; import org.apache.qpid.server.model.Binding; +import org.apache.qpid.server.model.ConfigurationChangeListener; +import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.queue.QueueArgumentsConverter; +import org.apache.qpid.server.util.Action; public class VirtualHostStoreUpgraderAndRecoverer { @@ -509,12 +516,100 @@ public class VirtualHostStoreUpgraderAndRecoverer } - public void perform(DurableConfigurationStore durableConfigurationStore) + public void perform(final DurableConfigurationStore durableConfigurationStore) { String virtualHostCategory = VirtualHost.class.getSimpleName(); GenericStoreUpgrader upgraderHandler = new GenericStoreUpgrader(virtualHostCategory, VirtualHost.MODEL_VERSION, durableConfigurationStore, _upgraders); upgraderHandler.upgrade(); new GenericRecoverer(_virtualHostNode).recover(upgraderHandler.getRecords()); + + final StoreConfigurationChangeListener configChangeListener = new StoreConfigurationChangeListener(durableConfigurationStore); + if(_virtualHostNode.getVirtualHost() != null) + { + applyRecursively(_virtualHostNode.getVirtualHost(), new Action<ConfiguredObject<?>>() + { + @Override + public void performAction(final ConfiguredObject<?> object) + { + object.addChangeListener(configChangeListener); + } + }); + } + _virtualHostNode.addChangeListener(new ConfigurationChangeListener() + { + @Override + public void stateChanged(final ConfiguredObject<?> object, final State oldState, final State newState) + { + + } + + @Override + public void childAdded(final ConfiguredObject<?> object, final ConfiguredObject<?> child) + { + if(child instanceof VirtualHost) + { + applyRecursively(child, new Action<ConfiguredObject<?>>() + { + @Override + public void performAction(final ConfiguredObject<?> object) + { + if(object.isDurable()) + { + durableConfigurationStore.update(true, object.asObjectRecord()); + object.addChangeListener(configChangeListener); + } + } + }); + + } + } + + @Override + public void childRemoved(final ConfiguredObject<?> object, final ConfiguredObject<?> child) + { + if(child instanceof VirtualHost) + { + child.removeChangeListener(configChangeListener); + } + } + + @Override + public void attributeSet(final ConfiguredObject<?> object, + final String attributeName, + final Object oldAttributeValue, + final Object newAttributeValue) + { + + } + }); + } + + private void applyRecursively(final ConfiguredObject<?> object, final Action<ConfiguredObject<?>> action) + { + applyRecursively(object, action, new HashSet<ConfiguredObject<?>>()); + } + + private void applyRecursively(final ConfiguredObject<?> object, + final Action<ConfiguredObject<?>> action, + final HashSet<ConfiguredObject<?>> visited) + { + if(!visited.contains(object)) + { + visited.add(object); + action.performAction(object); + for(Class<? extends ConfiguredObject> childClass : object.getModel().getChildTypes(object.getCategoryClass())) + { + Collection<? extends ConfiguredObject> children = object.getChildren(childClass); + if(children != null) + { + for(ConfiguredObject<?> child : children) + { + applyRecursively(child, action, visited); + } + } + } + } } + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java index 8d35e1a94f..d2351b5500 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java @@ -36,8 +36,13 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.slf4j.LoggerFactory; + + public class SelectorThread extends Thread { + private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(SelectorThread.class); + public static final String IO_THREAD_NAME_PREFIX = "NCS-"; private final Queue<Runnable> _tasks = new ConcurrentLinkedQueue<>(); private final Queue<NonBlockingConnection> _unregisteredConnections = new ConcurrentLinkedQueue<>(); @@ -165,7 +170,8 @@ public class SelectorThread extends Thread NonBlockingConnection connection = iterator.next(); int period = connection.getTicker().getTimeToNextTick(currentTime); - if (period < 0 || connection.isStateChanged()) + + if (period <= 0 || connection.isStateChanged()) { toBeScheduled.add(connection); connection.getSocketChannel().register(_selector, 0).cancel(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/Action.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/Action.java index 0d53b4d03b..715709a1b2 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/Action.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/Action.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.util; -public interface Action<T> +public interface Action<T> extends BaseAction<T, RuntimeException> { void performAction(T object); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/BaseAction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/BaseAction.java new file mode 100644 index 0000000000..f7dbcb4d3c --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/BaseAction.java @@ -0,0 +1,26 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.util; + +public interface BaseAction<T, E extends Exception> +{ + void performAction(T object) throws E; +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/FileHelper.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/FileHelper.java new file mode 100644 index 0000000000..0e1a28f220 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/FileHelper.java @@ -0,0 +1,133 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.util; + +import java.io.File; +import java.io.IOException; +import java.nio.file.AtomicMoveNotSupportedException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.nio.file.attribute.PosixFileAttributeView; +import java.nio.file.attribute.PosixFilePermission; +import java.nio.file.attribute.PosixFilePermissions; +import java.util.Set; + +public class FileHelper +{ + + public void writeFileSafely(Path targetFile, BaseAction<File, IOException> operation) throws IOException + { + String name = targetFile.toFile().getName(); + writeFileSafely(targetFile, + targetFile.resolveSibling(name + ".bak"), + targetFile.resolveSibling(name + ".tmp"), + operation); + } + + public void writeFileSafely(Path targetFile, Path backupFile, Path tmpFile, BaseAction<File, IOException> write) throws IOException + { + Files.deleteIfExists(tmpFile); + Files.deleteIfExists(backupFile); + + Set<PosixFilePermission> permissions = null; + if (Files.exists(targetFile) && isPosixFileSystem(targetFile)) + { + permissions = Files.getPosixFilePermissions(targetFile); + } + + tmpFile = createNewFile(tmpFile, permissions); + + write.performAction(tmpFile.toFile()); + + atomicFileMoveOrReplace(targetFile, backupFile); + + if (permissions != null) + { + Files.setPosixFilePermissions(backupFile, permissions); + } + + atomicFileMoveOrReplace(tmpFile, targetFile); + + Files.deleteIfExists(tmpFile); + Files.deleteIfExists(backupFile); + } + + public Path createNewFile(File newFile, String posixFileAttributes) throws IOException + { + return createNewFile(newFile.toPath(), posixFileAttributes); + } + + public Path createNewFile(Path newFile, String posixFileAttributes) throws IOException + { + Set<PosixFilePermission> permissions = posixFileAttributes == null ? null : PosixFilePermissions.fromString(posixFileAttributes); + return createNewFile(newFile, permissions ); + } + + public Path createNewFile(Path newFile, Set<PosixFilePermission> permissions) throws IOException + { + if (!Files.exists(newFile)) + { + newFile = Files.createFile(newFile); + } + + if (permissions != null && isPosixFileSystem(newFile)) + { + Files.setPosixFilePermissions(newFile, permissions); + } + + return newFile; + } + + public boolean isPosixFileSystem(Path path) throws IOException + { + while (!Files.exists(path)) + { + path = path.getParent(); + + if (path == null) + { + return false; + } + } + return Files.getFileStore(path).supportsFileAttributeView(PosixFileAttributeView.class); + } + + public Path atomicFileMoveOrReplace(Path sourceFile, Path targetFile) throws IOException + { + try + { + return Files.move(sourceFile, targetFile, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING); + } + catch(AtomicMoveNotSupportedException e) + { + if (sourceFile.toFile().renameTo(targetFile.toFile())) + { + return targetFile; + } + else + { + throw new RuntimeException("Atomic move is unsupported and rename from : '" + + sourceFile + "' to: '" + targetFile + "' failed."); + } + } + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index dce902b126..06917f0161 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -63,6 +63,8 @@ import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.*; import org.apache.qpid.server.model.adapter.ConnectionAdapter; +import org.apache.qpid.server.model.port.AmqpPort; +import org.apache.qpid.server.plugin.ConnectionValidator; import org.apache.qpid.server.plugin.QpidServiceLoader; import org.apache.qpid.server.plugin.SystemNodeCreator; import org.apache.qpid.server.protocol.AMQConnectionModel; @@ -75,7 +77,6 @@ import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.access.Operation; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.store.ConfiguredObjectRecord; -import org.apache.qpid.server.store.ConfiguredObjectRecordImpl; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; @@ -94,6 +95,8 @@ import org.apache.qpid.server.util.MapValueConverter; public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> extends AbstractConfiguredObject<X> implements VirtualHostImpl<X, AMQQueue<?>, ExchangeImpl<?>>, IConnectionRegistry.RegistryChangeListener, EventListener { + private final Collection<ConnectionValidator> _connectionValidators = new ArrayList<>(); + private static enum BlockingType { STORE, FILESYSTEM }; private static final String USE_ASYNC_RECOVERY = "use_async_message_store_recovery"; @@ -162,6 +165,14 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte @ManagedAttributeField private int _housekeepingThreadCount; + @ManagedAttributeField + private List<String> _enabledConnectionValidators; + + @ManagedAttributeField + private List<String> _disabledConnectionValidators; + + @ManagedAttributeField + private List<String> _globalAddressDomains; private boolean _useAsyncRecoverer; @@ -212,6 +223,13 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte { throw new IllegalArgumentException(getClass().getSimpleName() + " must be durable"); } + if(getGlobalAddressDomains() != null) + { + for(String domain : getGlobalAddressDomains()) + { + validateGlobalAddressDomain(domain); + } + } } @Override @@ -230,6 +248,26 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte throw new IntegrityViolationException("Cannot delete default virtual host '" + getName() + "'"); } } + if(changedAttributes.contains(GLOBAL_ADDRESS_DOMAINS)) + { + VirtualHost<?, ?, ?> virtualHost = (VirtualHost<?, ?, ?>) proxyForValidation; + if(virtualHost.getGlobalAddressDomains() != null) + { + for(String name : virtualHost.getGlobalAddressDomains()) + { + validateGlobalAddressDomain(name); + } + } + } + } + + private void validateGlobalAddressDomain(final String name) + { + String regex = "/(/?)([\\w_\\-:.\\$]+/)*[\\w_\\-:.\\$]+"; + if(!name.matches(regex)) + { + throw new IllegalArgumentException("'"+name+"' is not a valid global address domain"); + } } @Override @@ -243,8 +281,17 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte { super.validateOnCreate(); validateMessageStoreCreation(); + if(getGlobalAddressDomains() != null) + { + for(String name : getGlobalAddressDomains()) + { + validateGlobalAddressDomain(name); + } + } } + + private void validateMessageStoreCreation() { MessageStore store = createMessageStore(); @@ -293,10 +340,20 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte _messageStore.addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_OVERFULL); _messageStore.addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); - addChangeListener(new StoreUpdatingChangeListener()); + _fileSystemMaxUsagePercent = getContextValue(Integer.class, Broker.STORE_FILESYSTEM_MAX_USAGE_PERCENT); - _fileSystemMaxUsagePercent = getContextValue(Integer.class, Broker.STORE_FILESYSTEM_MAX_USAGE_PERCENT); + QpidServiceLoader serviceLoader = new QpidServiceLoader(); + for(ConnectionValidator validator : serviceLoader.instancesOf(ConnectionValidator.class)) + { + if((_enabledConnectionValidators.isEmpty() + && (_disabledConnectionValidators.isEmpty()) || !_disabledConnectionValidators.contains(validator.getType())) + || _enabledConnectionValidators.contains(validator.getType())) + { + _connectionValidators.add(validator); + } + + } } private void checkVHostStateIsActive() @@ -438,6 +495,20 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte return _eventLogger; } + @Override + public boolean authoriseCreateConnection(final AMQConnectionModel<?, ?> connection) + { + getSecurityManager().authoriseCreateConnection(connection); + for(ConnectionValidator validator : _connectionValidators) + { + if(!validator.validateConnectionCreation(connection)) + { + return false; + } + } + return true; + } + /** * Initialise a housekeeping task to iterate over queues cleaning expired messages with no consumers * and checking for idle or open transactions that have exceeded the permitted thresholds. @@ -526,9 +597,42 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } @Override + public List<String> getEnabledConnectionValidators() + { + return _enabledConnectionValidators; + } + + @Override + public List<String> getDisabledConnectionValidators() + { + return _disabledConnectionValidators; + } + + @Override + public List<String> getGlobalAddressDomains() + { + return _globalAddressDomains; + } + + @Override public AMQQueue<?> getQueue(String name) { - return (AMQQueue<?>) getChildByName(Queue.class, name); + AMQQueue<?> childByName = (AMQQueue<?>) getChildByName(Queue.class, name); + if(childByName == null && getGlobalAddressDomains() != null) + { + for(String domain : getGlobalAddressDomains()) + { + if(name.startsWith(domain + "/")) + { + childByName = (AMQQueue<?>) getChildByName(Queue.class,name.substring(domain.length())); + if(childByName != null) + { + break; + } + } + } + } + return childByName; } @Override @@ -556,14 +660,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte { int purged = queue.deleteAndReturnCount(); - if (queue.isDurable() && !(queue.getLifetimePolicy() - == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE - || queue.getLifetimePolicy() - == LifetimePolicy.DELETE_ON_SESSION_END)) - { - DurableConfigurationStore store = getDurableConfigurationStore(); - store.remove(queue.asObjectRecord()); - } return purged; } @@ -614,7 +710,22 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte @Override public ExchangeImpl getExchange(String name) { - return getChildByName(ExchangeImpl.class,name); + ExchangeImpl childByName = getChildByName(ExchangeImpl.class, name); + if(childByName == null && getGlobalAddressDomains() != null) + { + for(String domain : getGlobalAddressDomains()) + { + if(name.startsWith(domain + "/")) + { + childByName = getChildByName(ExchangeImpl.class,name.substring(domain.length())); + if(childByName != null) + { + break; + } + } + } + } + return childByName; } @Override @@ -671,6 +782,23 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte exchange.deleteWithChecks(); } + @Override + public String getLocalAddress(final String routingAddress) + { + String localAddress = routingAddress; + if(getGlobalAddressDomains() != null) + { + for(String domain : getGlobalAddressDomains()) + { + if(localAddress.length() > routingAddress.length() - domain.length() && routingAddress.startsWith(domain + "/")) + { + localAddress = routingAddress.substring(domain.length()); + } + } + } + return localAddress; + } + public SecurityManager getSecurityManager() { return _broker.getSecurityManager(); @@ -886,6 +1014,12 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } } + @Override + public String getRedirectHost(final AmqpPort<?> port) + { + return null; + } + private class VirtualHostHouseKeepingTask extends HouseKeepingTask { public VirtualHostHouseKeepingTask() @@ -1390,14 +1524,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte return total; } - @Override - protected void onCreate() - { - super.onCreate(); - ConfiguredObjectRecord record = asObjectRecord(); - getDurableConfigurationStore().create(new ConfiguredObjectRecordImpl(record.getId(), record.getType(), record.getAttributes())); - } - @StateTransition( currentState = { State.UNINITIALIZED,State.ERRORED }, desiredState = State.ACTIVE ) private void onActivate() { @@ -1509,44 +1635,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte onActivate(); } - private class StoreUpdatingChangeListener implements ConfigurationChangeListener - { - @Override - public void stateChanged(final ConfiguredObject<?> object, final State oldState, final State newState) - { - if (object == AbstractVirtualHost.this && isDurable() && newState == State.DELETED) - { - getDurableConfigurationStore().remove(asObjectRecord()); - object.removeChangeListener(this); - } - } - - @Override - public void childAdded(final ConfiguredObject<?> object, final ConfiguredObject<?> child) - { - - } - - @Override - public void childRemoved(final ConfiguredObject<?> object, final ConfiguredObject<?> child) - { - - } - - @Override - public void attributeSet(final ConfiguredObject<?> object, - final String attributeName, - final Object oldAttributeValue, - final Object newAttributeValue) - { - if (object == AbstractVirtualHost.this && isDurable() && getState() != State.DELETED && isAttributePersisted(attributeName) - && !(attributeName.equals(VirtualHost.DESIRED_STATE) && newAttributeValue.equals(State.DELETED))) - { - getDurableConfigurationStore().update(false, asObjectRecord()); - } - } - } - private class FileSystemSpaceChecker extends HouseKeepingTask { private boolean _fileSystemFull; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index 29729b6c7d..dff8a80dd9 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -33,6 +33,7 @@ import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.model.NoFactoryForTypeException; import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.security.SecurityManager; @@ -108,4 +109,7 @@ public interface VirtualHostImpl< X extends VirtualHostImpl<X,Q,E>, Q extends AM EventLogger getEventLogger(); + boolean authoriseCreateConnection(AMQConnectionModel<?, ?> connection); + + String getLocalAddress(String routingAddress); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHost.java new file mode 100644 index 0000000000..5e87b2e511 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHost.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhostnode; + +import org.apache.qpid.server.exchange.ExchangeImpl; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.virtualhost.NonStandardVirtualHost; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; + +public interface RedirectingVirtualHost<X extends RedirectingVirtualHost<X>> + extends VirtualHostImpl<X, AMQQueue<?>, ExchangeImpl<?>>, + NonStandardVirtualHost<X,AMQQueue<?>,ExchangeImpl<?>> +{ +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java new file mode 100644 index 0000000000..cacc981e9b --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java @@ -0,0 +1,517 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhostnode; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ScheduledFuture; + +import org.apache.qpid.server.connection.IConnectionRegistry; +import org.apache.qpid.server.exchange.ExchangeImpl; +import org.apache.qpid.server.logging.EventLogger; +import org.apache.qpid.server.message.MessageDestination; +import org.apache.qpid.server.message.MessageSource; +import org.apache.qpid.server.model.AbstractConfiguredObject; +import org.apache.qpid.server.model.BrokerModel; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.Connection; +import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.ManagedAttributeField; +import org.apache.qpid.server.model.ManagedObject; +import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.VirtualHostAlias; +import org.apache.qpid.server.model.VirtualHostNode; +import org.apache.qpid.server.model.port.AmqpPort; +import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.protocol.LinkRegistry; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.stats.StatisticsCounter; +import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.txn.DtxRegistry; +import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException; +import org.apache.qpid.server.virtualhost.HouseKeepingTask; +import org.apache.qpid.server.virtualhost.RequiredExchangeException; + +@ManagedObject( category = false, type = RedirectingVirtualHostImpl.TYPE, register = false ) +class RedirectingVirtualHostImpl + extends AbstractConfiguredObject<RedirectingVirtualHostImpl> + implements RedirectingVirtualHost<RedirectingVirtualHostImpl> +{ + public static final String TYPE = "REDIRECTOR"; + private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; + + @ManagedAttributeField + private boolean _queue_deadLetterQueueEnabled; + + @ManagedAttributeField + private long _housekeepingCheckPeriod; + + @ManagedAttributeField + private long _storeTransactionIdleTimeoutClose; + + @ManagedAttributeField + private long _storeTransactionIdleTimeoutWarn; + + @ManagedAttributeField + private long _storeTransactionOpenTimeoutClose; + + @ManagedAttributeField + private long _storeTransactionOpenTimeoutWarn; + @ManagedAttributeField + private int _housekeepingThreadCount; + + @ManagedAttributeField + private List<String> _enabledConnectionValidators; + + @ManagedAttributeField + private List<String> _disabledConnectionValidators; + + @ManagedAttributeField + private List<String> _globalAddressDomains; + + @ManagedObjectFactoryConstructor + public RedirectingVirtualHostImpl(final Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode) + { + super(parentsMap(virtualHostNode), attributes); + + _messagesDelivered = new StatisticsCounter("messages-delivered-" + getName()); + _dataDelivered = new StatisticsCounter("bytes-delivered-" + getName()); + _messagesReceived = new StatisticsCounter("messages-received-" + getName()); + _dataReceived = new StatisticsCounter("bytes-received-" + getName()); + setState(State.UNAVAILABLE); + } + + @Override + protected void validateChange(final ConfiguredObject<?> proxyForValidation, final Set<String> changedAttributes) + { + super.validateChange(proxyForValidation, changedAttributes); + + throwUnsupportedForRedirector(); + } + + @Override + public String getModelVersion() + { + return BrokerModel.MODEL_VERSION; + } + + @Override + protected <C extends ConfiguredObject> C addChild(final Class<C> childClass, + final Map<String, Object> attributes, + final ConfiguredObject... otherParents) + { + throwUnsupportedForRedirector(); + return null; + } + + @Override + public ExchangeImpl createExchange(final Map<String, Object> attributes) + { + throwUnsupportedForRedirector(); + return null; + } + + @Override + public void removeExchange(final ExchangeImpl<?> exchange, final boolean force) + throws ExchangeIsAlternateException, RequiredExchangeException + { + throwUnsupportedForRedirector(); + } + + @Override + public MessageDestination getMessageDestination(final String name) + { + return null; + } + + @Override + public ExchangeImpl<?> getExchange(final String name) + { + return null; + } + + @Override + public AMQQueue<?> createQueue(final Map<String, Object> attributes) + { + throwUnsupportedForRedirector(); + return null; + } + + @Override + public void executeTransaction(final TransactionalOperation op) + { + throwUnsupportedForRedirector(); + } + + @Override + public Collection<String> getExchangeTypeNames() + { + return getObjectFactory().getSupportedTypes(Exchange.class); + } + + @Override + public String getRedirectHost(final AmqpPort<?> port) + { + return ((RedirectingVirtualHostNode<?>)(getParent(VirtualHostNode.class))).getRedirects().get(port); + } + + @Override + public boolean isQueue_deadLetterQueueEnabled() + { + return false; + } + + @Override + public long getHousekeepingCheckPeriod() + { + return 0; + } + + @Override + public long getStoreTransactionIdleTimeoutClose() + { + return 0; + } + + @Override + public long getStoreTransactionIdleTimeoutWarn() + { + return 0; + } + + @Override + public long getStoreTransactionOpenTimeoutClose() + { + return 0; + } + + @Override + public long getStoreTransactionOpenTimeoutWarn() + { + return 0; + } + + @Override + public int getHousekeepingThreadCount() + { + return 0; + } + + @Override + public long getQueueCount() + { + return 0; + } + + @Override + public long getExchangeCount() + { + return 0; + } + + @Override + public long getConnectionCount() + { + return 0; + } + + @Override + public long getBytesIn() + { + return 0; + } + + @Override + public long getBytesOut() + { + return 0; + } + + @Override + public long getMessagesIn() + { + return 0; + } + + @Override + public long getMessagesOut() + { + return 0; + } + + @Override + public Collection<VirtualHostAlias> getAliases() + { + return Collections.emptyList(); + } + + @Override + public Collection<Connection> getConnections() + { + return Collections.emptyList(); + } + + @Override + public IConnectionRegistry getConnectionRegistry() + { + return null; + } + + @Override + public AMQQueue<?> getQueue(final String name) + { + return null; + } + + @Override + public MessageSource getMessageSource(final String name) + { + return null; + } + + @Override + public AMQQueue<?> getQueue(final UUID id) + { + return null; + } + + @Override + public Collection<AMQQueue<?>> getQueues() + { + return Collections.emptyList(); + } + + @Override + public int removeQueue(final AMQQueue<?> queue) + { + throwUnsupportedForRedirector(); + return 0; + } + + @Override + public Collection<ExchangeImpl<?>> getExchanges() + { + return Collections.emptyList(); + } + + @Override + public DurableConfigurationStore getDurableConfigurationStore() + { + return null; + } + + @Override + public ExchangeImpl<?> getExchange(final UUID id) + { + return null; + } + + @Override + public MessageDestination getDefaultDestination() + { + return null; + } + + @Override + public MessageStore getMessageStore() + { + return null; + } + + @Override + public void setTargetSize(final long targetSize) + { + + } + + @Override + public long getTotalQueueDepthBytes() + { + return 0l; + } + + @Override + public org.apache.qpid.server.security.SecurityManager getSecurityManager() + { + return null; + } + + @Override + public void scheduleHouseKeepingTask(final long period, final HouseKeepingTask task) + { + } + + @Override + public long getHouseKeepingTaskCount() + { + return 0; + } + + @Override + public long getHouseKeepingCompletedTaskCount() + { + return 0; + } + + @Override + public int getHouseKeepingPoolSize() + { + return 0; + } + + @Override + public void setHouseKeepingPoolSize(final int newSize) + { + } + + @Override + public int getHouseKeepingActiveCount() + { + return 0; + } + + @Override + public DtxRegistry getDtxRegistry() + { + return null; + } + + @Override + public LinkRegistry getLinkRegistry(final String remoteContainerId) + { + return null; + } + + @Override + public ScheduledFuture<?> scheduleTask(final long delay, final Runnable timeoutTask) + { + throwUnsupportedForRedirector(); + return null; + } + + @Override + public boolean getDefaultDeadLetterQueueEnabled() + { + return false; + } + + @Override + public EventLogger getEventLogger() + { + return null; + } + + @Override + public void registerMessageReceived(final long messageSize, final long timestamp) + { + throwUnsupportedForRedirector(); + } + + @Override + public void registerMessageDelivered(final long messageSize) + { + throwUnsupportedForRedirector(); + } + + @Override + public StatisticsCounter getMessageDeliveryStatistics() + { + return _messagesDelivered; + } + + @Override + public StatisticsCounter getMessageReceiptStatistics() + { + return _messagesReceived; + } + + @Override + public StatisticsCounter getDataDeliveryStatistics() + { + return _dataDelivered; + } + + @Override + public StatisticsCounter getDataReceiptStatistics() + { + return _dataReceived; + } + + @Override + public void resetStatistics() + { + } + + @Override + public boolean authoriseCreateConnection(final AMQConnectionModel<?, ?> connection) + { + return false; + } + + @Override + public List<String> getEnabledConnectionValidators() + { + return _enabledConnectionValidators; + } + + @Override + public List<String> getDisabledConnectionValidators() + { + return _disabledConnectionValidators; + } + + @Override + public List<String> getGlobalAddressDomains() + { + return _globalAddressDomains; + } + + @Override + public String getLocalAddress(final String routingAddress) + { + String localAddress = routingAddress; + if(getGlobalAddressDomains() != null) + { + for(String domain : getGlobalAddressDomains()) + { + if(localAddress.length() > routingAddress.length() - domain.length() && routingAddress.startsWith(domain + "/")) + { + localAddress = routingAddress.substring(domain.length()); + } + } + } + return localAddress; + } + + private void throwUnsupportedForRedirector() + { + throw new IllegalStateException("The virtual host state of " + getState() + + " does not permit this operation."); + } + + +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNode.java new file mode 100644 index 0000000000..636681db72 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNode.java @@ -0,0 +1,36 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhostnode; + +import java.util.Map; + +import org.apache.qpid.server.model.ManagedAttribute; +import org.apache.qpid.server.model.ManagedObject; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.VirtualHostNode; + +@ManagedObject(type= RedirectingVirtualHostNodeImpl.VIRTUAL_HOST_NODE_TYPE, category=false, validChildTypes = "org.apache.qpid.server.virtualhostnode.RedirectingVirtualHostNodeImpl#getSupportedChildTypes()") +public interface RedirectingVirtualHostNode<X extends RedirectingVirtualHostNode<X>> extends VirtualHostNode<X> +{ + + @ManagedAttribute( defaultValue = "{}") + Map<Port<?>, String> getRedirects(); +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java new file mode 100644 index 0000000000..c94d113514 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java @@ -0,0 +1,124 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhostnode; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.server.model.AbstractConfiguredObject; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.ManagedAttributeField; +import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.RemoteReplicationNode; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.StateTransition; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.store.DurableConfigurationStore; + + +public class RedirectingVirtualHostNodeImpl + extends AbstractConfiguredObject<RedirectingVirtualHostNodeImpl> implements RedirectingVirtualHostNode<RedirectingVirtualHostNodeImpl> +{ + private static final Logger LOGGER = LoggerFactory.getLogger(RedirectingVirtualHostImpl.class); + public static final String VIRTUAL_HOST_NODE_TYPE = "Redirector"; + + + @ManagedAttributeField + private String _virtualHostInitialConfiguration; + + @ManagedAttributeField + private Map<Port<?>,String> _redirects; + + private RedirectingVirtualHostImpl _virtualHost; + + @ManagedObjectFactoryConstructor + public RedirectingVirtualHostNodeImpl(Map<String, Object> attributes, Broker<?> parent) + { + super(Collections.<Class<? extends ConfiguredObject>,ConfiguredObject<?>>singletonMap(Broker.class, parent), + attributes); + } + + @StateTransition( currentState = {State.UNINITIALIZED, State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE ) + protected void doActivate() + { + try + { + _virtualHost = new RedirectingVirtualHostImpl(Collections.<String,Object>singletonMap(ConfiguredObject.NAME,getName()), this); + _virtualHost.create(); + setState(State.ACTIVE); + } + catch(RuntimeException e) + { + setState(State.ERRORED); + if (getParent(Broker.class).isManagementMode()) + { + LOGGER.warn("Failed to make " + this + " active.", e); + } + else + { + throw e; + } + } + } + + @Override + public String getVirtualHostInitialConfiguration() + { + return _virtualHostInitialConfiguration; + } + + @Override + public VirtualHost<?, ?, ?> getVirtualHost() + { + return _virtualHost; + } + + @Override + public DurableConfigurationStore getConfigurationStore() + { + return null; + } + + @Override + public Collection<? extends RemoteReplicationNode> getRemoteReplicationNodes() + { + return Collections.emptySet(); + } + + @Override + public Map<Port<?>, String> getRedirects() + { + return _redirects; + } + + public static Map<String, Collection<String>> getSupportedChildTypes() + { + Collection<String> validVhostTypes = Collections.singleton(RedirectingVirtualHostImpl.TYPE); + return Collections.singletonMap(VirtualHost.class.getSimpleName(), validVhostTypes); + } + +} diff --git a/qpid/java/broker-core/src/main/resources/system.properties b/qpid/java/broker-core/src/main/resources/system.properties deleted file mode 100644 index 661b0cba77..0000000000 --- a/qpid/java/broker-core/src/main/resources/system.properties +++ /dev/null @@ -1,20 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - - diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/BrokerOptionsTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/BrokerOptionsTest.java index 8c115c6e62..2846950bc1 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/BrokerOptionsTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/BrokerOptionsTest.java @@ -309,4 +309,14 @@ public class BrokerOptionsTest extends QpidTestCase assertEquals("unexpected number of entries", 2, props.keySet().size()); assertEquals("value", props.get("name")); } + + + public void testSetInitialSystemProperties() + { + assertNull("Unexpected default value for initial system properties", _options.getInitialSystemProperties()); + + _options.setInitialSystemProperties("test.properties"); + + assertEquals("test.properties", _options.getInitialSystemProperties()); + } } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/BrokerTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/BrokerTest.java new file mode 100644 index 0000000000..195414c7d7 --- /dev/null +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/BrokerTest.java @@ -0,0 +1,101 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server; + + +import java.io.File; +import java.util.HashMap; +import java.util.Map; + + +import org.apache.qpid.server.model.BrokerModel; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.test.utils.TestFileUtils; +import org.apache.qpid.util.FileUtils; +import org.codehaus.jackson.map.ObjectMapper; + +public class BrokerTest extends QpidTestCase +{ + private static final String INITIAL_SYSTEM_PROPERTY = "test"; + private static final String INITIAL_SYSTEM_PROPERTY_VALUE = "testValue"; + + private File _initialSystemProperties; + private File _initialConfiguration; + private File _brokerWork; + private Broker _broker; + + @Override + public void setUp() throws Exception + { + super.setUp(); + + // create empty initial configuration + Map<String,Object> initialConfig = new HashMap<>(); + initialConfig.put(ConfiguredObject.NAME, "test"); + initialConfig.put(org.apache.qpid.server.model.Broker.MODEL_VERSION, BrokerModel.MODEL_VERSION); + + ObjectMapper mapper = new ObjectMapper(); + String config = mapper.writeValueAsString(initialConfig); + _initialConfiguration = TestFileUtils.createTempFile(this, ".initial-config.json", config); + _brokerWork = TestFileUtils.createTestDirectory("qpid-work", true); + _initialSystemProperties = TestFileUtils.createTempFile(this, ".initial-system.properties", + INITIAL_SYSTEM_PROPERTY + "=" + INITIAL_SYSTEM_PROPERTY_VALUE + + "\nQPID_WORK=" + _brokerWork.getAbsolutePath() + "_test"); + setTestSystemProperty("QPID_WORK", _brokerWork.getAbsolutePath()); + } + + public void tearDown() throws Exception + { + try + { + super.tearDown(); + } + finally + { + if (_broker != null) + { + _broker.shutdown(); + } + System.clearProperty(INITIAL_SYSTEM_PROPERTY); + FileUtils.delete(_brokerWork, true); + FileUtils.delete(_initialSystemProperties, false); + FileUtils.delete(_initialConfiguration, false); + } + } + + public void testInitialSystemPropertiesAreSetOnBrokerStartup() throws Exception + { + BrokerOptions options = new BrokerOptions(); + options.setInitialSystemProperties(_initialSystemProperties.getAbsolutePath()); + options.setSkipLoggingConfiguration(true); + options.setStartupLoggedToSystemOut(true); + options.setInitialConfigurationLocation(_initialConfiguration.getAbsolutePath()); + _broker = new Broker(); + _broker.startup(options); + + // test JVM system property should be set from initial system config file + assertEquals("Unexpected JVM system property", INITIAL_SYSTEM_PROPERTY_VALUE, System.getProperty(INITIAL_SYSTEM_PROPERTY)); + + // existing system property should not be overridden + assertEquals("Unexpected QPID_WORK system property", _brokerWork.getAbsolutePath(), System.getProperty("QPID_WORK")); + } +} diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/binding/BindingImplTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/binding/BindingImplTest.java index 93fa9114fb..d004f16466 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/binding/BindingImplTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/binding/BindingImplTest.java @@ -35,6 +35,8 @@ import org.apache.qpid.server.model.Binding; import org.apache.qpid.server.model.BrokerModel; import org.apache.qpid.server.model.Model; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.test.utils.QpidTestCase; public class BindingImplTest extends QpidTestCase @@ -57,7 +59,11 @@ public class BindingImplTest extends QpidTestCase attributes.put(Binding.ARGUMENTS, arguments); attributes.put(Binding.NAME, getTestName()); AMQQueue queue = mock(AMQQueue.class); + VirtualHostImpl vhost = mock(VirtualHostImpl.class); + SecurityManager securityManager = mock(SecurityManager.class); + when(vhost.getSecurityManager()).thenReturn(securityManager); when(queue.getTaskExecutor()).thenReturn(_taskExecutor); + when(queue.getVirtualHost()).thenReturn(vhost); when(queue.getModel()).thenReturn(_model); ExchangeImpl exchange = mock(ExchangeImpl.class); when(exchange.getTaskExecutor()).thenReturn(_taskExecutor); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java index 14ff640c57..f5a9217ef3 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java @@ -57,6 +57,7 @@ public class StoreConfigurationChangeListenerTest extends QpidTestCase notifyBrokerStarted(); UUID id = UUID.randomUUID(); ConfiguredObject object = mock(VirtualHost.class); + when(object.isDurable()).thenReturn(true); when(object.getId()).thenReturn(id); ConfiguredObjectRecord record = mock(ConfiguredObjectRecord.class); when(object.asObjectRecord()).thenReturn(record); @@ -69,11 +70,13 @@ public class StoreConfigurationChangeListenerTest extends QpidTestCase notifyBrokerStarted(); Broker broker = mock(Broker.class); when(broker.getCategoryClass()).thenReturn(Broker.class); + when(broker.isDurable()).thenReturn(true); VirtualHost child = mock(VirtualHost.class); when(child.getCategoryClass()).thenReturn(VirtualHost.class); Model model = mock(Model.class); when(model.getChildTypes(any(Class.class))).thenReturn(Collections.<Class<? extends ConfiguredObject>>emptyList()); when(child.getModel()).thenReturn(model); + when(child.isDurable()).thenReturn(true); _listener.childAdded(broker, child); verify(_store).update(eq(true), any(ConfiguredObjectRecord.class)); } @@ -83,15 +86,17 @@ public class StoreConfigurationChangeListenerTest extends QpidTestCase notifyBrokerStarted(); Broker broker = mock(Broker.class); when(broker.getCategoryClass()).thenReturn(Broker.class); + when(broker.isDurable()).thenReturn(true); _listener.attributeSet(broker, Broker.CONNECTION_SESSION_COUNT_LIMIT, null, 1); verify(_store).update(eq(false),any(ConfiguredObjectRecord.class)); } - public void testChildAddedForVirtualHostNode() + public void testChildAddedWhereParentManagesChildStorage() { notifyBrokerStarted(); VirtualHostNode<?> object = mock(VirtualHostNode.class); + when(object.managesChildStorage()).thenReturn(true); VirtualHost<?,?,?> virtualHost = mock(VirtualHost.class); _listener.childAdded(object, virtualHost); verifyNoMoreInteractions(_store); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java index ca440bc432..d625fcba75 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java @@ -38,7 +38,6 @@ import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.filter.MessageFilter; -import org.apache.qpid.server.filter.SimpleFilterManager; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; @@ -103,16 +102,23 @@ public class MockConsumer implements ConsumerTarget { if(_messageIds != null) { - SimpleFilterManager filters = new SimpleFilterManager(); - filters.add(new MessageFilter() + FilterManager filters = new FilterManager(); + MessageFilter filter = new MessageFilter() { @Override + public String getName() + { + return ""; + } + + @Override public boolean matches(final Filterable message) { final String messageId = message.getMessageHeader().getMessageId(); return _messageIds.contains(messageId); } - }); + }; + filters.add(filter.getName(), filter); return filters; } else diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java index 4485d5cc85..c21a386eaa 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java @@ -45,6 +45,7 @@ import org.mockito.stubbing.Answer; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.configuration.store.StoreConfigurationChangeListener; import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.connection.IConnectionRegistry.RegistryChangeListener; @@ -66,6 +67,7 @@ public class VirtualHostTest extends QpidTestCase private VirtualHostNode<?> _virtualHostNode; private DurableConfigurationStore _configStore; private VirtualHost<?, ?, ?> _virtualHost; + private StoreConfigurationChangeListener _storeConfigurationChangeListener; @Override protected void setUp() throws Exception @@ -79,9 +81,13 @@ public class VirtualHostTest extends QpidTestCase when(_broker.getTaskExecutor()).thenReturn(_taskExecutor); _virtualHostNode = mock(VirtualHostNode.class); + when(_virtualHostNode.isDurable()).thenReturn(true); _configStore = mock(DurableConfigurationStore.class); + _storeConfigurationChangeListener = new StoreConfigurationChangeListener(_configStore); + when(_virtualHostNode.getConfigurationStore()).thenReturn(_configStore); + // Virtualhost needs the EventLogger from the SystemContext. when(_virtualHostNode.getParent(Broker.class)).thenReturn(_broker); @@ -122,7 +128,7 @@ public class VirtualHostTest extends QpidTestCase assertEquals("Unexpected name", virtualHostName, virtualHost.getName()); assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState()); - verify(_configStore).create(matchesRecord(virtualHost.getId(), virtualHost.getType())); + verify(_configStore).update(eq(true),matchesRecord(virtualHost.getId(), virtualHost.getType())); } public void testDeleteVirtualHost() @@ -170,7 +176,7 @@ public class VirtualHostTest extends QpidTestCase virtualHost.start(); assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState()); - verify(_configStore, times(1)).create(matchesRecord(virtualHost.getId(), virtualHost.getType())); + verify(_configStore, times(1)).update(eq(true), matchesRecord(virtualHost.getId(), virtualHost.getType())); verify(_configStore, times(2)).update(eq(false), matchesRecord(virtualHost.getId(), virtualHost.getType())); } @@ -293,7 +299,7 @@ public class VirtualHostTest extends QpidTestCase assertNotNull(queue.getId()); assertEquals(queueName, queue.getName()); - verify(_configStore).create(matchesRecord(queue.getId(), queue.getType())); + verify(_configStore).update(eq(true),matchesRecord(queue.getId(), queue.getType())); } public void testCreateNonDurableQueue() @@ -396,7 +402,10 @@ public class VirtualHostTest extends QpidTestCase attributes.put(VirtualHost.TYPE, TestMemoryVirtualHost.VIRTUAL_HOST_TYPE); TestMemoryVirtualHost host = new TestMemoryVirtualHost(attributes, _virtualHostNode); + host.addChangeListener(_storeConfigurationChangeListener); host.create(); + // Fire the child added event on the node + _storeConfigurationChangeListener.childAdded(_virtualHostNode,host); _virtualHost = host; return host; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/singleton/AbstractConfiguredObjectTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/singleton/AbstractConfiguredObjectTest.java index 46f205116a..081fbe4edf 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/singleton/AbstractConfiguredObjectTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/singleton/AbstractConfiguredObjectTest.java @@ -22,14 +22,18 @@ import java.security.PrivilegedAction; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import javax.security.auth.Subject; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.model.AbstractConfiguredObject; +import org.apache.qpid.server.model.ConfigurationChangeListener; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Model; +import org.apache.qpid.server.model.State; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.test.utils.QpidTestCase; @@ -476,4 +480,84 @@ public class AbstractConfiguredObjectTest extends QpidTestCase } + + public void testAttributeSetListenerFiring() + { + final String objectName = "listenerFiring"; + + Map<String, Object> attributes = new HashMap<>(); + attributes.put(ConfiguredObject.NAME, objectName); + attributes.put(TestSingleton.STRING_VALUE, "first"); + + final TestSingleton object = _model.getObjectFactory().create(TestSingleton.class, attributes); + + final AtomicInteger listenerCount = new AtomicInteger(); + final LinkedHashMap<String, String> updates = new LinkedHashMap<>(); + object.addChangeListener(new NoopConfigurationChangeListener() + { + @Override + public void attributeSet(final ConfiguredObject<?> object, + final String attributeName, + final Object oldAttributeValue, + final Object newAttributeValue) + { + listenerCount.incrementAndGet(); + String delta = String.valueOf(oldAttributeValue) + "=>" + String.valueOf(newAttributeValue); + updates.put(attributeName, delta); + } + }); + + // Set updated value (should cause listener to fire) + object.setAttributes(Collections.singletonMap(TestSingleton.STRING_VALUE, "second")); + + assertEquals(1, listenerCount.get()); + String delta = updates.remove(TestSingleton.STRING_VALUE); + assertEquals("first=>second", delta); + + // Set unchanged value (should not cause listener to fire) + object.setAttributes(Collections.singletonMap(TestSingleton.STRING_VALUE, "second")); + assertEquals(1, listenerCount.get()); + + // Set value to null (should cause listener to fire) + object.setAttributes(Collections.singletonMap(TestSingleton.STRING_VALUE, null)); + assertEquals(2, listenerCount.get()); + delta = updates.remove(TestSingleton.STRING_VALUE); + assertEquals("second=>null", delta); + + // Set to null again (should not cause listener to fire) + object.setAttributes(Collections.singletonMap(TestSingleton.STRING_VALUE, null)); + assertEquals(2, listenerCount.get()); + + // Set updated value (should cause listener to fire) + object.setAttributes(Collections.singletonMap(TestSingleton.STRING_VALUE, "third")); + assertEquals(3, listenerCount.get()); + delta = updates.remove(TestSingleton.STRING_VALUE); + assertEquals("null=>third", delta); + } + + private static class NoopConfigurationChangeListener implements ConfigurationChangeListener + { + @Override + public void stateChanged(final ConfiguredObject<?> object, final State oldState, final State newState) + { + } + + @Override + public void childAdded(final ConfiguredObject<?> object, final ConfiguredObject<?> child) + { + } + + @Override + public void childRemoved(final ConfiguredObject<?> object, final ConfiguredObject<?> child) + { + } + + @Override + public void attributeSet(final ConfiguredObject<?> object, + final String attributeName, + final Object oldAttributeValue, + final Object newAttributeValue) + { + } + } } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java index 799fc71d74..2427470707 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java @@ -93,6 +93,7 @@ public class LastValueQueueListTest extends TestCase ServerMessage message = createTestServerMessage(null); QueueEntry addedEntry = _list.add(message); + addedEntry.acquire(); addedEntry.delete(); int numberOfEntries = countEntries(_list); @@ -113,6 +114,7 @@ public class LastValueQueueListTest extends TestCase ServerMessage message = createTestServerMessage(TEST_KEY_VALUE); QueueEntry addedEntry = _list.add(message); + addedEntry.acquire(); addedEntry.delete(); int numberOfEntries = countEntries(_list); @@ -173,6 +175,7 @@ public class LastValueQueueListTest extends TestCase assertEquals(1, countEntries(_list)); assertEquals(1, _list.getLatestValuesMap().size()); + addedEntry.acquire(); addedEntry.delete(); assertEquals(0, countEntries(_list)); @@ -193,7 +196,9 @@ public class LastValueQueueListTest extends TestCase assertEquals(2, countEntries(_list)); assertEquals(2, _list.getLatestValuesMap().size()); + addedEntry1.acquire(); addedEntry1.delete(); + addedEntry2.acquire(); addedEntry2.delete(); assertEquals(0, countEntries(_list)); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java index 40b6c1bebd..2101a2297f 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java @@ -113,6 +113,7 @@ public abstract class QueueEntryImplTestBase extends TestCase */ private void delete() { + _queueEntry.acquire(); _queueEntry.delete(); assertTrue("Queue entry should be in DELETED state after invoking of delete method", _queueEntry.isDeleted()); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java index a0ab7cd454..7f5ea06dcf 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java @@ -196,6 +196,7 @@ public abstract class QueueEntryListTestBase extends TestCase final QueueEntry head = getTestList().getHead(); final QueueEntry first = getTestList().next(head); + first.acquire(); first.delete(); final QueueEntry second = getTestList().next(head); @@ -226,6 +227,7 @@ public abstract class QueueEntryListTestBase extends TestCase assertNull(list.next(queueEntry2)); //'delete' the 2nd QueueEntry + queueEntry2.acquire(); queueEntry2.delete(); assertTrue("Deleting node should have succeeded", queueEntry2.isDeleted()); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java index a2d314d629..a27db98400 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java @@ -109,6 +109,7 @@ public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase public void testTraverseWithDeletedEntries() { // Delete 2nd queue entry + _queueEntry2.acquire(); _queueEntry2.delete(); assertTrue(_queueEntry2.isDeleted()); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java index d9a176c688..f88ce5f5f9 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java @@ -137,6 +137,7 @@ public class SortedQueueEntryTest extends QueueEntryImplTestBase public void testTraverseWithDeletedEntries() { // Delete 2nd queue entry + _queueEntry3.acquire(); _queueEntry3.delete(); assertTrue(_queueEntry3.isDeleted()); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java index 28d22a5a44..73d14a843f 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java @@ -155,7 +155,7 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase public void testScavenge() throws Exception { - OrderedQueueEntryList sqel = new StandardQueueEntryList(null); + OrderedQueueEntryList sqel = new StandardQueueEntryList(mock(StandardQueueImpl.class)); ConcurrentMap<Integer,QueueEntry> entriesMap = new ConcurrentHashMap<Integer,QueueEntry>(); @@ -215,6 +215,7 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase { QueueEntry entry = entriesMap.remove(pos); boolean wasDeleted = entry.isDeleted(); + entry.acquire(); entry.delete(); return entry.isDeleted() && !wasDeleted; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/FileHelperTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/FileHelperTest.java new file mode 100644 index 0000000000..9d47ed496a --- /dev/null +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/FileHelperTest.java @@ -0,0 +1,138 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.util; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.attribute.PosixFileAttributeView; +import java.nio.file.attribute.PosixFilePermission; +import java.nio.file.attribute.PosixFilePermissions; +import java.util.Set; + +import org.apache.qpid.test.utils.QpidTestCase; + +public class FileHelperTest extends QpidTestCase +{ + private static final String TEST_FILE_PERMISSIONS = "rwxr-x---"; + private File _testFile; + private FileHelper _fileHelper; + + @Override + public void setUp() throws Exception + { + super.setUp(); + _testFile = new File(TMP_FOLDER, "test-" + System.currentTimeMillis()); + _fileHelper = new FileHelper(); + } + + @Override + public void tearDown() throws Exception + { + try + { + super.tearDown(); + } + finally + { + Files.deleteIfExists(_testFile.toPath()); + } + } + + public void testCreateNewFile() throws Exception + { + assertFalse("File should not exist", _testFile.exists()); + Path path = _fileHelper.createNewFile(_testFile, TEST_FILE_PERMISSIONS); + assertTrue("File was not created", path.toFile().exists()); + if (Files.getFileStore(path).supportsFileAttributeView(PosixFileAttributeView.class)) + { + assertPermissions(path); + } + } + + public void testCreateNewFileUsingRelativePath() throws Exception + { + _testFile = new File("./tmp-" + System.currentTimeMillis()); + assertFalse("File should not exist", _testFile.exists()); + Path path = _fileHelper.createNewFile(_testFile, TEST_FILE_PERMISSIONS); + assertTrue("File was not created", path.toFile().exists()); + if (Files.getFileStore(path).supportsFileAttributeView(PosixFileAttributeView.class)) + { + assertPermissions(path); + } + } + + public void testWriteFileSafely() throws Exception + { + Path path = _fileHelper.createNewFile(_testFile, TEST_FILE_PERMISSIONS); + _fileHelper.writeFileSafely(path, new BaseAction<File, IOException>() + { + @Override + public void performAction(File file) throws IOException + { + Files.write(file.toPath(), "test".getBytes("UTF8")); + assertEquals("Unexpected name", _testFile.getAbsolutePath() + ".tmp", file.getPath()); + } + }); + + assertTrue("File was not created", path.toFile().exists()); + + if (Files.getFileStore(path).supportsFileAttributeView(PosixFileAttributeView.class)) + { + assertPermissions(path); + } + + String content = new String(Files.readAllBytes(path), "UTF-8"); + assertEquals("Unexpected file content", "test", content); + } + + public void testAtomicFileMoveOrReplace() throws Exception + { + Path path = _fileHelper.createNewFile(_testFile, TEST_FILE_PERMISSIONS); + Files.write(path, "test".getBytes("UTF8")); + _testFile = _fileHelper.atomicFileMoveOrReplace(path, path.resolveSibling(_testFile.getName() + ".target")).toFile(); + + assertFalse("File was not moved", path.toFile().exists()); + assertTrue("Target file does not exist", _testFile.exists()); + + if (Files.getFileStore(_testFile.toPath()).supportsFileAttributeView(PosixFileAttributeView.class)) + { + assertPermissions(_testFile.toPath()); + } + } + + + private void assertPermissions(Path path) throws IOException + { + Set<PosixFilePermission> permissions = Files.getPosixFilePermissions(path); + assertTrue("Unexpected owner read permission", permissions.contains(PosixFilePermission.OWNER_READ)); + assertTrue("Unexpected owner write permission", permissions.contains(PosixFilePermission.OWNER_WRITE)); + assertTrue("Unexpected owner exec permission", permissions.contains(PosixFilePermission.OWNER_EXECUTE)); + assertTrue("Unexpected group read permission", permissions.contains(PosixFilePermission.GROUP_READ)); + assertFalse("Unexpected group write permission", permissions.contains(PosixFilePermission.GROUP_WRITE)); + assertTrue("Unexpected group exec permission", permissions.contains(PosixFilePermission.GROUP_EXECUTE)); + assertFalse("Unexpected others read permission", permissions.contains(PosixFilePermission.OTHERS_READ)); + assertFalse("Unexpected others write permission", permissions.contains(PosixFilePermission.OTHERS_WRITE)); + assertFalse("Unexpected others exec permission", permissions.contains(PosixFilePermission.OTHERS_EXECUTE)); + } +} |
