summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2015-03-03 14:56:40 +0000
committerKeith Wall <kwall@apache.org>2015-03-03 14:56:40 +0000
commit9dc57fe738f366d875c2319dafdfa2c50ce2f20b (patch)
treebe6634866a966f358fcb1ba6ba29dfb5c9c340c1 /qpid/java/broker-core/src
parentfe37626d4fd8fb3ee5b3146a5159024a3d6d3357 (diff)
downloadqpid-python-9dc57fe738f366d875c2319dafdfa2c50ce2f20b.tar.gz
merge from trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1663717 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core/src')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java37
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/BrokerOptions.java21
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java31
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListener.java39
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java30
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java5
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilter.java46
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java52
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java58
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java19
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java29
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java17
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java10
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilterFactory.java57
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java105
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java32
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java91
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredAutomatedAttribute.java17
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredDerivedAttribute.java18
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttribute.java20
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/DerivedAttribute.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ManagedAttribute.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java10
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/RemoteReplicationNode.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemConfig.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java24
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProvider.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java128
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConnectionValidator.java28
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFilterFactory.java30
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/QpidServiceLoader.java13
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java95
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java7
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java22
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStore.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStore.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/database/AbstractPasswordFilePrincipalDatabase.java105
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/Base64MD5PasswordDatabaseAuthenticationManager.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PlainPasswordDatabaseAuthenticationManager.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java10
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/FileGroupDatabase.java26
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java83
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java97
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java8
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/Action.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/BaseAction.java26
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/FileHelper.java133
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java206
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHost.java32
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java517
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNode.java36
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java124
-rw-r--r--qpid/java/broker-core/src/main/resources/system.properties20
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/BrokerOptionsTest.java10
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/BrokerTest.java101
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/binding/BindingImplTest.java6
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java7
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java14
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java15
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/singleton/AbstractConfiguredObjectTest.java84
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java5
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java1
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java1
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java1
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java3
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/FileHelperTest.java138
77 files changed, 2403 insertions, 549 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java
index 6c50fe7cfd..e88763dd1d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java
@@ -23,9 +23,12 @@ package org.apache.qpid.server;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
+import java.net.URL;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
+import java.util.HashSet;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.TimeoutException;
import javax.security.auth.Subject;
@@ -34,6 +37,7 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
+import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutorImpl;
@@ -154,6 +158,8 @@ public class Broker implements BrokerShutdownProvider
private void startupImpl(final BrokerOptions options) throws Exception
{
+ populateSystemPropertiesFromDefaults(options.getInitialSystemProperties());
+
String storeLocation = options.getConfigurationStoreLocation();
String storeType = options.getConfigurationStoreType();
@@ -321,6 +327,37 @@ public class Broker implements BrokerShutdownProvider
}
}
+ public static void populateSystemPropertiesFromDefaults(final String initialProperties) throws IOException
+ {
+ URL initialPropertiesLocation;
+ if(initialProperties == null)
+ {
+ initialPropertiesLocation = Broker.class.getClassLoader().getResource("system.properties");
+ }
+ else
+ {
+ initialPropertiesLocation = (new File(initialProperties)).toURI().toURL();
+ }
+
+ Properties props = new Properties(QpidProperties.asProperties());
+ if(initialPropertiesLocation != null)
+ {
+
+ try(InputStream inStream = initialPropertiesLocation.openStream())
+ {
+ props.load(inStream);
+ }
+ }
+
+ Set<String> propertyNames = new HashSet<>(props.stringPropertyNames());
+ propertyNames.removeAll(System.getProperties().stringPropertyNames());
+ for (String propName : propertyNames)
+ {
+ System.setProperty(propName, props.getProperty(propName));
+ }
+ }
+
+
private class ShutdownService implements Runnable
{
public void run()
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/BrokerOptions.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/BrokerOptions.java
index 59075dfb57..ff3d9063f0 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/BrokerOptions.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/BrokerOptions.java
@@ -78,6 +78,7 @@ public class BrokerOptions
private boolean _overwriteConfigurationStore;
private Map<String, String> _configProperties = new HashMap<String,String>();
private boolean _startupLoggedToSystemOut = true;
+ private String _initialSystemProperties;
public Map<String, Object> convertToSystemConfigAttributes()
{
@@ -390,4 +391,24 @@ public class BrokerOptions
{
this._startupLoggedToSystemOut = startupLoggedToSystemOut;
}
+
+ /**
+ * Get the location of initial JVM system properties to set. This can be URL or a file path
+ *
+ * @return the location of initial JVM system properties to set.
+ */
+ public String getInitialSystemProperties()
+ {
+ return _initialSystemProperties;
+ }
+
+ /**
+ * Set the location of initial properties file to set as JVM system properties. This can be URL or a file path
+ *
+ * @param initialSystemProperties the location of initial JVM system properties.
+ */
+ public void setInitialSystemProperties(String initialSystemProperties)
+ {
+ _initialSystemProperties = initialSystemProperties;
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
index 76c6b6007f..6012e2e8db 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.binding;
-import java.security.AccessControlException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -45,10 +44,8 @@ import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.StateTransition;
-import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.util.StateChangeListener;
-import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public class BindingImpl
extends AbstractConfiguredObject<BindingImpl>
@@ -108,26 +105,6 @@ public class BindingImpl
}
}
- @Override
- protected void onCreate()
- {
- super.onCreate();
- try
- {
- _queue.getVirtualHost().getSecurityManager().authoriseCreateBinding(this);
- }
- catch(AccessControlException e)
- {
- deleted();
- throw e;
- }
- if (isDurable())
- {
- _queue.getVirtualHost().getDurableConfigurationStore().create(asObjectRecord());
- }
-
- }
-
private static Map<String, Object> enhanceWithDurable(Map<String, Object> attributes,
final AMQQueue queue,
final ExchangeImpl exchange)
@@ -263,12 +240,6 @@ public class BindingImpl
{
_arguments = arguments;
BindingImpl.super.setAttribute(ARGUMENTS, getActualAttributes().get(ARGUMENTS), arguments);
- if (isDurable())
- {
- VirtualHostImpl<?, ?, ?> vhost =
- (VirtualHostImpl<?, ?, ?>) _exchange.getParent(VirtualHost.class);
- vhost.getDurableConfigurationStore().update(true, asObjectRecord());
- }
}
}
);
@@ -278,6 +249,8 @@ public class BindingImpl
@Override
public void validateOnCreate()
{
+ _queue.getVirtualHost().getSecurityManager().authoriseCreateBinding(this);
+
AMQQueue queue = getAMQQueue();
Map<String, Object> arguments = getArguments();
if (arguments!=null && !arguments.isEmpty() && FilterSupport.argumentsContainFilter(arguments))
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java
index 765e1e4fa5..d6dbe37a6b 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java
@@ -48,6 +48,7 @@ public class BrokerProperties
public static final String PROPERTY_QPID_HOME = "QPID_HOME";
public static final String PROPERTY_QPID_WORK = "QPID_WORK";
public static final String PROPERTY_LOG_RECORDS_BUFFER_SIZE = "qpid.broker_log_records_buffer_size";
+ public static final String POSIX_FILE_PERMISSIONS = "qpid.default_posix_file_permissions";
private BrokerProperties()
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListener.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListener.java
index 21715f7406..d5bbe16446 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListener.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListener.java
@@ -25,7 +25,6 @@ import java.util.Collection;
import org.apache.qpid.server.model.ConfigurationChangeListener;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.State;
-import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.store.DurableConfigurationStore;
public class StoreConfigurationChangeListener implements ConfigurationChangeListener
@@ -43,7 +42,10 @@ public class StoreConfigurationChangeListener implements ConfigurationChangeList
{
if (newState == State.DELETED)
{
- _store.remove(object.asObjectRecord());
+ if(object.isDurable())
+ {
+ _store.remove(object.asObjectRecord());
+ }
object.removeChangeListener(this);
}
}
@@ -51,20 +53,23 @@ public class StoreConfigurationChangeListener implements ConfigurationChangeList
@Override
public void childAdded(ConfiguredObject<?> object, ConfiguredObject<?> child)
{
- // exclude VirtualHostNode children from storing in broker store
- if (!(object instanceof VirtualHostNode))
+ if (!object.managesChildStorage())
{
- child.addChangeListener(this);
- _store.update(true,child.asObjectRecord());
+ if(object.isDurable() && child.isDurable())
+ {
+ child.addChangeListener(this);
+ _store.update(true, child.asObjectRecord());
- Class<? extends ConfiguredObject> categoryClass = child.getCategoryClass();
- Collection<Class<? extends ConfiguredObject>> childTypes = child.getModel().getChildTypes(categoryClass);
+ Class<? extends ConfiguredObject> categoryClass = child.getCategoryClass();
+ Collection<Class<? extends ConfiguredObject>> childTypes =
+ child.getModel().getChildTypes(categoryClass);
- for(Class<? extends ConfiguredObject> childClass : childTypes)
- {
- for (ConfiguredObject<?> grandchild : child.getChildren(childClass))
+ for (Class<? extends ConfiguredObject> childClass : childTypes)
{
- childAdded(child, grandchild);
+ for (ConfiguredObject<?> grandchild : child.getChildren(childClass))
+ {
+ childAdded(child, grandchild);
+ }
}
}
}
@@ -74,14 +79,20 @@ public class StoreConfigurationChangeListener implements ConfigurationChangeList
@Override
public void childRemoved(ConfiguredObject object, ConfiguredObject child)
{
- _store.remove(child.asObjectRecord());
+ if(child.isDurable())
+ {
+ _store.remove(child.asObjectRecord());
+ }
child.removeChangeListener(this);
}
@Override
public void attributeSet(ConfiguredObject object, String attributeName, Object oldAttributeValue, Object newAttributeValue)
{
- _store.update(false, object.asObjectRecord());
+ if(object.isDurable())
+ {
+ _store.update(false, object.asObjectRecord());
+ }
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index 6587bc76b2..cf23e3dd91 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -177,17 +177,6 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
}
@Override
- protected void onCreate()
- {
- super.onCreate();
- if(isDurable())
- {
- getVirtualHost().getDurableConfigurationStore().create(asObjectRecord());
- }
-
- }
-
- @Override
public EventLogger getEventLogger()
{
return _virtualHost.getEventLogger();
@@ -213,12 +202,6 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
throw new RequiredExchangeException(getName());
}
- if (isDurable() && !isAutoDelete())
- {
- getVirtualHost().getDurableConfigurationStore().remove(asObjectRecord());
-
- }
-
if(_closed.compareAndSet(false,true))
{
List<BindingImpl> bindings = new ArrayList<BindingImpl>(_bindings);
@@ -241,11 +224,6 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
}
_closeTaskList.clear();
- if (isDurable() && !isAutoDelete())
- {
- getVirtualHost().getDurableConfigurationStore().remove(asObjectRecord());
-
- }
}
deleted();
}
@@ -665,10 +643,6 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
doRemoveBinding(b);
queue.removeBinding(b);
- if (b.isDurable())
- {
- _virtualHost.getDurableConfigurationStore().remove(b.asObjectRecord());
- }
b.delete();
}
@@ -905,10 +879,6 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
protected void changeAttributes(final Map<String, Object> attributes)
{
super.changeAttributes(attributes);
- if (isDurable() && getState() != State.DELETED)
- {
- this.getVirtualHost().getDurableConfigurationStore().update(false, asObjectRecord());
- }
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java
index 127a8d9e52..fcc34ee4de 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java
@@ -62,7 +62,8 @@ public class DefaultDestination implements MessageDestination
final AMQQueue q = _virtualHost.getQueue(routingAddress);
if(q == null)
{
- if(routingAddress != null && routingAddress.contains("/") && !routingAddress.startsWith("/"))
+ routingAddress = _virtualHost.getLocalAddress(routingAddress);
+ if(routingAddress.contains("/") && !routingAddress.startsWith("/"))
{
String[] parts = routingAddress.split("/",2);
ExchangeImpl exchange = _virtualHost.getExchange(parts[0]);
@@ -71,7 +72,7 @@ public class DefaultDestination implements MessageDestination
return exchange.send(message, parts[1], instanceProperties, txn, postEnqueueAction);
}
}
- else if(routingAddress == null || !routingAddress.contains("/"))
+ else if(!routingAddress.contains("/"))
{
ExchangeImpl exchange = _virtualHost.getExchange(routingAddress);
if(exchange != null)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
index 597fc44e4c..de796a846a 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
@@ -87,6 +87,12 @@ class HeadersBinding
+"' with arguments: " + _binding.getArguments());
_filter = new MessageFilter()
{
+ @Override
+ public String getName()
+ {
+ return "";
+ }
+
@Override
public boolean matches(Filterable message)
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilter.java
new file mode 100644
index 0000000000..dbd6a5f6f6
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilter.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.filter;
+
+import org.apache.qpid.common.AMQPFilterTypes;
+
+public final class ArrivalTimeFilter implements MessageFilter
+{
+ private final long _startingFrom;
+
+ public ArrivalTimeFilter(final long startingFrom)
+ {
+ _startingFrom = startingFrom;
+ }
+
+ @Override
+ public String getName()
+ {
+ return AMQPFilterTypes.REPLAY_PERIOD.toString();
+ }
+
+ @Override
+ public boolean matches(final Filterable message)
+ {
+ return message.getArrivalTime() >= _startingFrom;
+ }
+
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java
new file mode 100644
index 0000000000..8c55c8ac76
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.filter;
+
+import java.util.List;
+
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.server.plugin.MessageFilterFactory;
+import org.apache.qpid.server.plugin.PluggableService;
+
+@PluggableService
+public final class ArrivalTimeFilterFactory implements MessageFilterFactory
+{
+
+ @Override
+ public MessageFilter newInstance(final List<String> arguments)
+ {
+ if(arguments == null || arguments.size() != 1)
+ {
+ throw new IllegalArgumentException("Cannot create a filter from these arguments: " + arguments);
+ }
+ String arg = arguments.get(0);
+ long startingFrom= Long.parseLong(arg);
+
+ return new ArrivalTimeFilter(startingFrom);
+ }
+
+ @Override
+ public String getType()
+ {
+ return AMQPFilterTypes.REPLAY_PERIOD.toString();
+ }
+
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java
index 69fc520a8f..ad14fa423a 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java
@@ -14,26 +14,62 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
- *
+ * under the License.
*
+ *
*/
package org.apache.qpid.server.filter;
-//
-// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
-//
import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
-public interface FilterManager
+public class FilterManager
{
- void add(MessageFilter filter);
- void remove(MessageFilter filter);
+ private final Map<String, MessageFilter> _filters = new ConcurrentHashMap<>();
+
+ public FilterManager()
+ {
+ }
+
+ public void add(String name, MessageFilter filter)
+ {
+ _filters.put(name, filter);
+ }
+
+ public boolean allAllow(Filterable msg)
+ {
+ for (MessageFilter filter : _filters.values())
+ {
+ if (!filter.matches(msg))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public Iterator<MessageFilter> filters()
+ {
+ return _filters.values().iterator();
+ }
+
+ public boolean hasFilters()
+ {
+ return !_filters.isEmpty();
+ }
+
+ public boolean hasFilter(final String name)
+ {
+ return _filters.containsKey(name);
+ }
- boolean allAllow(Filterable msg);
+ @Override
+ public String toString()
+ {
+ return _filters.toString();
+ }
- Iterator<MessageFilter> filters();
- boolean hasFilters();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
index a159a8506b..28f7fe3554 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.filter;
+import java.util.Map;
+
import org.apache.log4j.Logger;
import org.apache.qpid.common.AMQPFilterTypes;
@@ -27,8 +29,6 @@ import org.apache.qpid.filter.SelectorParsingException;
import org.apache.qpid.filter.selector.ParseException;
import org.apache.qpid.filter.selector.TokenMgrError;
-import java.util.Map;
-
public class FilterManagerFactory
{
@@ -54,20 +54,13 @@ public class FilterManagerFactory
if (selector instanceof String && !selector.equals(""))
{
- manager = new SimpleFilterManager();
+ manager = new FilterManager();
try
{
- manager.add(new JMSSelectorFilter((String)selector));
- }
- catch (ParseException e)
- {
- throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selector + "\"", e);
- }
- catch (SelectorParsingException e)
- {
- throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selector + "\"", e);
+ MessageFilter filter = new JMSSelectorFilter((String)selector);
+ manager.add(filter.getName(), filter);
}
- catch (TokenMgrError e)
+ catch (ParseException | SelectorParsingException | TokenMgrError e)
{
throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selector + "\"", e);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
index d0b1670a45..6b8ae2f552 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
@@ -26,12 +26,14 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.WeakHashMap;
+
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.filter.SelectorParsingException;
import org.apache.qpid.filter.selector.ParseException;
import org.apache.qpid.filter.selector.TokenMgrError;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.plugin.PluggableService;
import org.apache.qpid.server.queue.AMQQueue;
public class FilterSupport
@@ -57,15 +59,7 @@ public class FilterSupport
{
selector = new JMSSelectorFilter(selectorString);
}
- catch (ParseException e)
- {
- throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e);
- }
- catch (SelectorParsingException e)
- {
- throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e);
- }
- catch (TokenMgrError e)
+ catch (ParseException | SelectorParsingException | TokenMgrError e)
{
throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e);
}
@@ -119,6 +113,7 @@ public class FilterSupport
}
}
+ @PluggableService
public static final class NoLocalFilter implements MessageFilter
{
private final MessageSource _queue;
@@ -128,6 +123,12 @@ public class FilterSupport
_queue = queue;
}
+ @Override
+ public String getName()
+ {
+ return AMQPFilterTypes.NO_LOCAL.toString();
+ }
+
public boolean matches(Filterable message)
{
@@ -165,6 +166,8 @@ public class FilterSupport
{
return _queue != null ? _queue.hashCode() : 0;
}
+
+
}
static final class CompoundFilter implements MessageFilter
@@ -178,6 +181,12 @@ public class FilterSupport
_jmsSelectorFilter = jmsSelectorFilter;
}
+ @Override
+ public String getName()
+ {
+ return "";
+ }
+
public boolean matches(Filterable message)
{
return _noLocalFilter.matches(message) && _jmsSelectorFilter.matches(message);
@@ -216,5 +225,7 @@ public class FilterSupport
result = 31 * result + (_jmsSelectorFilter != null ? _jmsSelectorFilter.hashCode() : 0);
return result;
}
+
+
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java
index 589e888059..295f9ae074 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java
@@ -34,6 +34,10 @@ public interface Filterable
Object getConnectionReference();
+ long getMessageNumber();
+
+ long getArrivalTime();
+
public class Factory
{
@@ -41,6 +45,7 @@ public interface Filterable
{
return new Filterable()
{
+
@Override
public AMQMessageHeader getMessageHeader()
{
@@ -64,6 +69,18 @@ public interface Filterable
{
return message.getConnectionReference();
}
+
+ @Override
+ public long getMessageNumber()
+ {
+ return message.getMessageNumber();
+ }
+
+ @Override
+ public long getArrivalTime()
+ {
+ return message.getArrivalTime();
+ }
};
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
index 744e4e4e9d..a36049cd23 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
@@ -25,14 +25,18 @@ import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
import org.apache.log4j.Logger;
+
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.filter.BooleanExpression;
import org.apache.qpid.filter.FilterableMessage;
import org.apache.qpid.filter.SelectorParsingException;
import org.apache.qpid.filter.selector.ParseException;
import org.apache.qpid.filter.selector.SelectorParser;
import org.apache.qpid.filter.selector.TokenMgrError;
+import org.apache.qpid.server.plugin.PluggableService;
+@PluggableService
public class JMSSelectorFilter implements MessageFilter
{
private final static Logger _logger = org.apache.log4j.Logger.getLogger(JMSSelectorFilter.class);
@@ -46,6 +50,12 @@ public class JMSSelectorFilter implements MessageFilter
_matcher = new SelectorParser().parse(selector);
}
+ @Override
+ public String getName()
+ {
+ return AMQPFilterTypes.JMS_SELECTOR.toString();
+ }
+
public boolean matches(Filterable message)
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilterFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilterFactory.java
new file mode 100644
index 0000000000..233edc78cd
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilterFactory.java
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.filter;
+
+import java.util.List;
+
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.filter.SelectorParsingException;
+import org.apache.qpid.filter.selector.ParseException;
+import org.apache.qpid.filter.selector.TokenMgrError;
+import org.apache.qpid.server.plugin.MessageFilterFactory;
+import org.apache.qpid.server.plugin.PluggableService;
+
+@PluggableService
+public final class JMSSelectorFilterFactory implements MessageFilterFactory
+{
+ public String getType()
+ {
+ return AMQPFilterTypes.JMS_SELECTOR.toString();
+ }
+
+ @Override
+ public MessageFilter newInstance(final List<String> arguments)
+ {
+ if(arguments == null || arguments.size() != 1)
+ {
+ throw new IllegalArgumentException("Cannot create a filter from these arguments: " + arguments);
+ }
+ String arg = arguments.get(0);
+ try
+ {
+ return new JMSSelectorFilter(arg);
+ }
+ catch (ParseException | TokenMgrError | SelectorParsingException e)
+ {
+ throw new IllegalArgumentException("Cannot create an JMS Selector from '" + arg + "'", e);
+ }
+ }
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
index d7dbbea166..226d646efd 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
@@ -22,5 +22,6 @@ package org.apache.qpid.server.filter;
public interface MessageFilter
{
+ String getName();
boolean matches(Filterable message);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
deleted file mode 100644
index 111fb6a333..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- *
- */
-package org.apache.qpid.server.filter;
-
-import org.apache.log4j.Logger;
-
-import java.util.Iterator;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-public class SimpleFilterManager implements FilterManager
-{
- private final Logger _logger = Logger.getLogger(SimpleFilterManager.class);
-
- private final ConcurrentLinkedQueue<MessageFilter> _filters;
- private String _toString = "";
-
- public SimpleFilterManager()
- {
- _logger.debug("Creating SimpleFilterManager");
- _filters = new ConcurrentLinkedQueue<MessageFilter>();
- }
-
- public SimpleFilterManager(JMSSelectorFilter messageFilter)
- {
- this();
- add(messageFilter);
- }
-
- public void add(MessageFilter filter)
- {
- _filters.add(filter);
- updateStringValue();
- }
-
- public void remove(MessageFilter filter)
- {
- _filters.remove(filter);
- updateStringValue();
- }
-
- public boolean allAllow(Filterable msg)
- {
- for (MessageFilter filter : _filters)
- {
- if (!filter.matches(msg))
- {
- return false;
- }
- }
- return true;
- }
-
- @Override
- public Iterator<MessageFilter> filters()
- {
- return _filters.iterator();
- }
-
- public boolean hasFilters()
- {
- return !_filters.isEmpty();
- }
-
-
- @Override
- public String toString()
- {
- return _toString;
- }
-
- private void updateStringValue()
- {
- StringBuilder toString = new StringBuilder();
- for (MessageFilter filter : _filters)
- {
- toString.append(filter.toString());
- toString.append(",");
- }
-
- if (_filters.size() > 0)
- {
- //Remove the last ','
- toString.deleteCharAt(toString.length()-1);
- }
- _toString = toString.toString();
- }
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
index aef769dc4f..76608cffbf 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
@@ -122,8 +122,11 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
private final TaskExecutor _taskExecutor;
private final Class<? extends ConfiguredObject> _category;
+ private final Class<? extends ConfiguredObject> _typeClass;
private final Class<? extends ConfiguredObject> _bestFitInterface;
private final Model _model;
+ private final boolean _managesChildStorage;
+
@ManagedAttributeField
private long _createdTime;
@@ -206,6 +209,8 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
_model = model;
_category = ConfiguredObjectTypeRegistry.getCategory(getClass());
+ Class<? extends ConfiguredObject> typeClass = model.getTypeRegistry().getTypeClass(getClass());
+ _typeClass = typeClass == null ? _category : typeClass;
_attributeTypes = model.getTypeRegistry().getAttributeTypes(getClass());
_automatedFields = model.getTypeRegistry().getAutomatedFields(getClass());
@@ -242,6 +247,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
_type = ConfiguredObjectTypeRegistry.getType(getClass());
+ _managesChildStorage = managesChildren(_category) || managesChildren(_typeClass);
_bestFitInterface = calculateBestFitInterface();
if(attributes.get(TYPE) != null && !_type.equals(attributes.get(TYPE)))
@@ -315,6 +321,11 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
}
+ private boolean managesChildren(final Class<? extends ConfiguredObject> clazz)
+ {
+ return clazz.getAnnotation(ManagedObject.class).managesChildren();
+ }
+
private Class<? extends ConfiguredObject> calculateBestFitInterface()
{
Set<Class<? extends ConfiguredObject>> candidates = new HashSet<Class<? extends ConfiguredObject>>();
@@ -1056,11 +1067,24 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
return _model;
}
+ @Override
public Class<? extends ConfiguredObject> getCategoryClass()
{
return _category;
}
+ @Override
+ public Class<? extends ConfiguredObject> getTypeClass()
+ {
+ return _typeClass;
+ }
+
+ @Override
+ public boolean managesChildStorage()
+ {
+ return _managesChildStorage;
+ }
+
public Map<String,String> getContext()
{
return _context == null ? Collections.<String,String>emptyMap() : Collections.unmodifiableMap(_context);
@@ -1219,8 +1243,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
if(attr != null && (attr.isAutomated() || attr.isDerived()))
{
Object value = attr.getValue((X)this);
- if(value != null && attr.isSecure() &&
- !SecurityManager.isSystemProcess())
+ if(value != null && !SecurityManager.isSystemProcess() && attr.isSecureValue(value))
{
return SECURE_VALUES.get(value.getClass());
}
@@ -1620,8 +1643,9 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
{
Object desired = attributes.get(name);
Object expected = getAttribute(name);
- if(((_attributes.get(name) != null && !_attributes.get(name).equals(attributes.get(name)))
- || attributes.get(name) != null)
+ Object currentValue = _attributes.get(name);
+ if(((currentValue != null && !currentValue.equals(desired))
+ || (currentValue == null && desired != null))
&& changeAttribute(name, expected, desired))
{
attributeSet(name, expected, desired);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java
index 15e804e6f5..24e62ce7de 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java
@@ -50,6 +50,25 @@ abstract class AttributeValueConverter<T>
}
};
+ static final AttributeValueConverter<Object> OBJECT_CONVERTER = new AttributeValueConverter<Object>()
+ {
+ @Override
+ public Object convert(final Object value, final ConfiguredObject object)
+ {
+ if(value instanceof String)
+ {
+ return AbstractConfiguredObject.interpolate(object, (String) value);
+ }
+ else if(value == null)
+ {
+ return null;
+ }
+ else
+ {
+ return value;
+ }
+ }
+ };
static final AttributeValueConverter<UUID> UUID_CONVERTER = new AttributeValueConverter<UUID>()
{
@Override
@@ -398,7 +417,17 @@ abstract class AttributeValueConverter<T>
}
else if(Map.class.isAssignableFrom(type))
{
- return (AttributeValueConverter<X>) MAP_CONVERTER;
+ if(returnType instanceof ParameterizedType)
+ {
+ Type keyType = ((ParameterizedType) returnType).getActualTypeArguments()[0];
+ Type valueType = ((ParameterizedType) returnType).getActualTypeArguments()[1];
+
+ return (AttributeValueConverter<X>) new GenericMapConverter(keyType,valueType);
+ }
+ else
+ {
+ return (AttributeValueConverter<X>) MAP_CONVERTER;
+ }
}
else if(Collection.class.isAssignableFrom(type))
{
@@ -416,6 +445,10 @@ abstract class AttributeValueConverter<T>
{
return (AttributeValueConverter<X>) new ConfiguredObjectConverter(type);
}
+ else if(Object.class == type)
+ {
+ return (AttributeValueConverter<X>) OBJECT_CONVERTER;
+ }
throw new IllegalArgumentException("Cannot create attribute converter of type " + type.getName());
}
@@ -575,6 +608,62 @@ abstract class AttributeValueConverter<T>
}
}
+ public static class GenericMapConverter extends AttributeValueConverter<Map>
+ {
+
+ private final AttributeValueConverter<?> _keyConverter;
+ private final AttributeValueConverter<?> _valueConverter;
+
+
+ public GenericMapConverter(final Type keyType, final Type valueType)
+ {
+ _keyConverter = getConverter(getRawType(keyType), keyType);
+
+ _valueConverter = getConverter(getRawType(valueType), valueType);
+ }
+
+
+ @Override
+ public Map convert(final Object value, final ConfiguredObject object)
+ {
+ if(value instanceof Map)
+ {
+ Map<?,?> original = (Map<?,?>)value;
+ Map converted = new LinkedHashMap(original.size());
+ for(Map.Entry<?,?> entry : original.entrySet())
+ {
+ converted.put(_keyConverter.convert(entry.getKey(),object),
+ _valueConverter.convert(entry.getValue(), object));
+ }
+ return Collections.unmodifiableMap(converted);
+ }
+ else if(value == null)
+ {
+ return null;
+ }
+ else
+ {
+ if(value instanceof String)
+ {
+ String interpolated = AbstractConfiguredObject.interpolate(object, (String) value);
+ ObjectMapper objectMapper = new ObjectMapper();
+ try
+ {
+ return convert(objectMapper.readValue(interpolated, Map.class), object);
+ }
+ catch (IOException e)
+ {
+ // fall through to the non-JSON single object case
+ }
+ }
+
+ throw new IllegalArgumentException("Cannot convert type " + value.getClass() + " to a Map");
+ }
+
+ }
+ }
+
+
static final class EnumConverter<X extends Enum<X>> extends AttributeValueConverter<X>
{
private final Class<X> _klazz;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredAutomatedAttribute.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredAutomatedAttribute.java
index 9fca898dc0..342b7ac0ba 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredAutomatedAttribute.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredAutomatedAttribute.java
@@ -28,6 +28,7 @@ import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.regex.Pattern;
import org.apache.log4j.Logger;
@@ -37,6 +38,7 @@ public class ConfiguredAutomatedAttribute<C extends ConfiguredObject, T> extend
private final ManagedAttribute _annotation;
private final Method _validValuesMethod;
+ private final Pattern _secureValuePattern;
ConfiguredAutomatedAttribute(final Class<C> clazz,
final Method getter,
@@ -53,6 +55,16 @@ public class ConfiguredAutomatedAttribute<C extends ConfiguredObject, T> extend
validValuesMethod = getValidValuesMethod(validValue, clazz);
}
_validValuesMethod = validValuesMethod;
+
+ String secureValueFilter = _annotation.secureValueFilter();
+ if (secureValueFilter == null || "".equals(secureValueFilter))
+ {
+ _secureValuePattern = null;
+ }
+ else
+ {
+ _secureValuePattern = Pattern.compile(secureValueFilter);
+ }
}
private Method getValidValuesMethod(final String validValue, final Class<C> clazz)
@@ -140,6 +152,11 @@ public class ConfiguredAutomatedAttribute<C extends ConfiguredObject, T> extend
return _annotation.description();
}
+ public Pattern getSecureValueFilter()
+ {
+ return _secureValuePattern;
+ }
+
public Collection<String> validValues()
{
if(_validValuesMethod != null)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredDerivedAttribute.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredDerivedAttribute.java
index 71488edb8c..20fd0264c6 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredDerivedAttribute.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredDerivedAttribute.java
@@ -21,10 +21,12 @@
package org.apache.qpid.server.model;
import java.lang.reflect.Method;
+import java.util.regex.Pattern;
public class ConfiguredDerivedAttribute<C extends ConfiguredObject, T> extends ConfiguredObjectAttribute<C,T>
{
private final DerivedAttribute _annotation;
+ private final Pattern _secureValuePattern;
ConfiguredDerivedAttribute(final Class<C> clazz,
final Method getter,
@@ -32,6 +34,16 @@ public class ConfiguredDerivedAttribute<C extends ConfiguredObject, T> extends
{
super(clazz, getter);
_annotation = annotation;
+
+ String secureValueFilter = _annotation.secureValueFilter();
+ if (secureValueFilter == null || "".equals(secureValueFilter))
+ {
+ _secureValuePattern = null;
+ }
+ else
+ {
+ _secureValuePattern = Pattern.compile(secureValueFilter);
+ }
}
public boolean isAutomated()
@@ -72,4 +84,10 @@ public class ConfiguredDerivedAttribute<C extends ConfiguredObject, T> extends
return _annotation.description();
}
+ @Override
+ public Pattern getSecureValueFilter()
+ {
+ return _secureValuePattern;
+ }
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
index 89fda6798b..bfe9c8b15d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
@@ -239,6 +239,9 @@ public interface ConfiguredObject<X extends ConfiguredObject<X>>
void setAttributes(Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException;
Class<? extends ConfiguredObject> getCategoryClass();
+ Class<? extends ConfiguredObject> getTypeClass();
+
+ boolean managesChildStorage();
<C extends ConfiguredObject<C>> C findConfiguredObject(Class<C> clazz, String name);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttribute.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttribute.java
index 73b7839a8e..94610a6cb5 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttribute.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttribute.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.model;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
+import java.util.regex.Pattern;
public abstract class ConfiguredObjectAttribute<C extends ConfiguredObject, T> extends ConfiguredObjectAttributeOrStatistic<C,T>
{
@@ -49,6 +50,25 @@ public abstract class ConfiguredObjectAttribute<C extends ConfiguredObject, T> e
public abstract String getDescription();
+ public abstract Pattern getSecureValueFilter();
+
+ public boolean isSecureValue(Object value)
+ {
+ if (isSecure())
+ {
+ Pattern filter = getSecureValueFilter();
+ if (filter == null)
+ {
+ return true;
+ }
+ else
+ {
+ return filter.matcher(String.valueOf(value)).matches();
+ }
+ }
+ return false;
+ }
+
public T convert(final Object value, C object)
{
final AttributeValueConverter<T> converter = getConverter();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java
index 27d914c639..5026df0e19 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java
@@ -156,7 +156,7 @@ public class ConfiguredObjectFactoryImpl implements ConfiguredObjectFactory
factory = categoryFactories.get(_defaultTypes.get(category));
if(factory == null)
{
- throw new NoFactoryForTypeException(category, _defaultTypes.get(category));
+ throw new NoFactoryForTypeException(category, type);
}
}
return factory;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java
index d134c43bda..d0c6fb041e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java
@@ -385,7 +385,7 @@ public class ConfiguredObjectTypeRegistry
return null;
}
- private Class<? extends ConfiguredObject> getTypeClass(final Class<? extends ConfiguredObject> clazz)
+ public Class<? extends ConfiguredObject> getTypeClass(final Class<? extends ConfiguredObject> clazz)
{
String typeName = getType(clazz);
Class<? extends ConfiguredObject> typeClass = null;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/DerivedAttribute.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/DerivedAttribute.java
index e5c17a17e4..6de6bf25c3 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/DerivedAttribute.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/DerivedAttribute.java
@@ -32,4 +32,5 @@ public @interface DerivedAttribute
boolean persist() default false;
String description() default "";
boolean oversize() default false;
+ String secureValueFilter() default "";
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ManagedAttribute.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ManagedAttribute.java
index 05b2c610ba..2f96299703 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ManagedAttribute.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ManagedAttribute.java
@@ -37,4 +37,5 @@ public @interface ManagedAttribute
String[] validValues() default {};
boolean oversize() default false;
String oversizedAltText() default "";
+ String secureValueFilter() default "";
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index 46fbaac3f2..ba1f262cfc 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -21,6 +21,8 @@
package org.apache.qpid.server.model;
import java.util.Collection;
+import java.util.List;
+import java.util.Map;
import org.apache.qpid.server.queue.QueueEntryVisitor;
import org.apache.qpid.server.store.MessageDurability;
@@ -48,6 +50,8 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>
String QUEUE_FLOW_STOPPED = "queueFlowStopped";
String MAXIMUM_MESSAGE_TTL = "maximumMessageTtl";
String MINIMUM_MESSAGE_TTL = "minimumMessageTtl";
+ String DEFAULT_FILTERS = "defaultFilters";
+ String ENSURE_NONDESTRUCTIVE_CONSUMERS = "ensureNondestructiveConsumers";
String QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT = "queue.minimumEstimatedMemoryFootprint";
@ManagedContextDefault( name = QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT)
@@ -67,6 +71,9 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>
@ManagedAttribute( defaultValue = "NONE" )
ExclusivityPolicy getExclusive();
+ @ManagedAttribute( defaultValue = "false" )
+ boolean isEnsureNondestructiveConsumers();
+
@DerivedAttribute( persist = true )
String getOwner();
@@ -155,6 +162,9 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>
@ManagedAttribute
long getMaximumMessageTtl();
+ @ManagedAttribute
+ Map<String, Map<String,List<String>>> getDefaultFilters();
+
//children
Collection<? extends Binding> getBindings();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/RemoteReplicationNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/RemoteReplicationNode.java
index 21cbfcf194..3b751a3a10 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/RemoteReplicationNode.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/RemoteReplicationNode.java
@@ -20,9 +20,6 @@
*/
package org.apache.qpid.server.model;
-import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.ManagedObject;
-
@ManagedObject(category=true, managesChildren=false, creatable=false)
public interface RemoteReplicationNode<X extends RemoteReplicationNode<X>> extends ConfiguredObject<X>
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemConfig.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemConfig.java
index a69808180d..c1a05a9e35 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemConfig.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemConfig.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.model;
+import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.LogRecorder;
import org.apache.qpid.server.store.DurableConfigurationStore;
@@ -37,6 +38,9 @@ public interface SystemConfig<X extends SystemConfig<X>> extends ConfiguredObjec
String INITIAL_CONFIGURATION_LOCATION = "initialConfigurationLocation";
String STARTUP_LOGGED_TO_SYSTEM_OUT = "startupLoggedToSystemOut";
+ @ManagedContextDefault(name = BrokerProperties.POSIX_FILE_PERMISSIONS)
+ String DEFAULT_POSIX_FILE_PERMISSIONS = "rw-r-----";
+
@ManagedAttribute(defaultValue = "false")
boolean isManagementMode();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
index 79f37b66cb..cc758ba7c9 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
@@ -22,14 +22,16 @@ package org.apache.qpid.server.model;
import java.security.AccessControlException;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.store.MessageStore;
-@ManagedObject( managesChildren = true, defaultType = "ProvidedStore")
+@ManagedObject( defaultType = "ProvidedStore")
public interface VirtualHost<X extends VirtualHost<X, Q, E>, Q extends Queue<?>, E extends Exchange<?> > extends ConfiguredObject<X>
{
@@ -42,6 +44,9 @@ public interface VirtualHost<X extends VirtualHost<X, Q, E>, Q extends Queue<?>,
String STORE_TRANSACTION_OPEN_TIMEOUT_WARN = "storeTransactionOpenTimeoutWarn";
String HOUSE_KEEPING_THREAD_COUNT = "houseKeepingThreadCount";
String MODEL_VERSION = "modelVersion";
+ String ENABLED_CONNECTION_VALIDATORS = "enabledConnectionValidators";
+ String DISABLED_CONNECTION_VALIDATORS = "disabledConnectionValidators";
+ String GLOBAL_ADDRESS_DOMAINS = "globalAddressDomains";
@ManagedContextDefault( name = "queue.deadLetterQueueEnabled")
public static final boolean DEFAULT_DEAD_LETTER_QUEUE_ENABLED = false;
@@ -88,6 +93,21 @@ public interface VirtualHost<X extends VirtualHost<X, Q, E>, Q extends Queue<?>,
@DerivedAttribute( persist = true )
String getModelVersion();
+ @ManagedContextDefault( name = "virtualhost.enabledConnectionValidators")
+ String DEFAULT_ENABLED_VALIDATORS = "[]";
+
+ @ManagedAttribute( defaultValue = "${virtualhost.enabledConnectionValidators}")
+ List<String> getEnabledConnectionValidators();
+
+ @ManagedContextDefault( name = "virtualhost.disabledConnectionValidators")
+ String DEFAULT_DISABLED_VALIDATORS = "[]";
+
+ @ManagedAttribute( defaultValue = "${virtualhost.disabledConnectionValidators}")
+ List<String> getDisabledConnectionValidators();
+
+ @ManagedAttribute( defaultValue = "[]")
+ List<String> getGlobalAddressDomains();
+
@ManagedStatistic
long getQueueCount();
@@ -129,6 +149,8 @@ public interface VirtualHost<X extends VirtualHost<X, Q, E>, Q extends Queue<?>,
void delete();
+ String getRedirectHost(AmqpPort<?> port);
+
public static interface Transaction
{
void dequeue(MessageInstance entry);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java
index fa35e725c9..3bc19ef7bd 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java
@@ -24,7 +24,7 @@ import java.util.Collection;
import org.apache.qpid.server.store.DurableConfigurationStore;
-@ManagedObject(category=true, managesChildren=false)
+@ManagedObject(category=true, managesChildren=true)
public interface VirtualHostNode<X extends VirtualHostNode<X>> extends ConfiguredObject<X>
{
String QPID_INITIAL_CONFIG_VIRTUALHOST_CONFIG_VAR = "qpid.initial_config_virtualhost_config";
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
index eae438754b..0cbb80d722 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
@@ -80,7 +80,7 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection
{
Map<String,Object> attributes = new HashMap<String, Object>();
attributes.put(ID, UUID.randomUUID());
- attributes.put(NAME, _connection.getRemoteAddressString().replaceAll("/", ""));
+ attributes.put(NAME, "[" + _connection.getConnectionId() + "] " + _connection.getRemoteAddressString().replaceAll("/", ""));
attributes.put(DURABLE, false);
return attributes;
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProvider.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProvider.java
index 631ed3e8f7..cb9727f3f6 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProvider.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProvider.java
@@ -25,7 +25,7 @@ import org.apache.qpid.server.model.GroupProvider;
import org.apache.qpid.server.model.ManagedAttribute;
import org.apache.qpid.server.model.ManagedObject;
-@ManagedObject( category = false, type = "GroupFile" )
+@ManagedObject( category = false, type = "GroupFile", managesChildren = true )
public interface FileBasedGroupProvider<X extends FileBasedGroupProvider<X>> extends GroupProvider<X>, GroupManagingGroupProvider
{
String PATH="path";
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java
index 19aec414de..327b7ddfe9 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java
@@ -34,6 +34,7 @@ import java.util.UUID;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.Broker;
@@ -50,6 +51,7 @@ import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
import org.apache.qpid.server.security.group.FileGroupDatabase;
import org.apache.qpid.server.security.group.GroupPrincipal;
+import org.apache.qpid.server.util.FileHelper;
public class FileBasedGroupProviderImpl
extends AbstractConfiguredObject<FileBasedGroupProviderImpl> implements FileBasedGroupProvider<FileBasedGroupProviderImpl>
@@ -162,9 +164,11 @@ public class FileBasedGroupProviderImpl
{
throw new IllegalConfigurationException(String.format("Cannot create groups file at '%s'",_path));
}
+
try
{
- file.createNewFile();
+ String posixFileAttributes = getContextValue(String.class, BrokerProperties.POSIX_FILE_PERMISSIONS);
+ new FileHelper().createNewFile(file, posixFileAttributes);
}
catch (IOException e)
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java
index e3ded3006d..7046f2973e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java
@@ -21,14 +21,14 @@
package org.apache.qpid.server.model.adapter;
-import java.io.ByteArrayOutputStream;
import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -38,6 +38,9 @@ import java.util.Set;
import java.util.TreeMap;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.configuration.BrokerProperties;
+import org.apache.qpid.server.util.BaseAction;
+import org.apache.qpid.server.util.FileHelper;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.JsonProcessingException;
import org.codehaus.jackson.map.ObjectMapper;
@@ -118,7 +121,7 @@ public class FileSystemPreferencesProviderImpl
FileSystemPreferencesStore store = new FileSystemPreferencesStore(new File(_path));
// we need to check and create file if it does not exist every time on open
- store.createIfNotExist();
+ store.createIfNotExist(getContextValue(String.class, BrokerProperties.POSIX_FILE_PERMISSIONS));
store.open();
_store = store;
_open = true;
@@ -184,6 +187,7 @@ public class FileSystemPreferencesProviderImpl
if(_store != null)
{
+ _store.close();
_store.delete();
deleted();
_authenticationProvider.setPreferencesProvider(null);
@@ -280,7 +284,7 @@ public class FileSystemPreferencesProviderImpl
else
{
FileSystemPreferencesStore store = new FileSystemPreferencesStore(new File(_path));
- store.createIfNotExist();
+ store.createIfNotExist(getContextValue(String.class, BrokerProperties.POSIX_FILE_PERMISSIONS));
store.open();
_store = store;
}
@@ -334,9 +338,9 @@ public class FileSystemPreferencesProviderImpl
{
private final ObjectMapper _objectMapper;
private final Map<String, Map<String, Object>> _preferences;
+ private final FileHelper _fileHelper;
private File _storeFile;
private FileLock _storeLock;
- private RandomAccessFile _storeRAF;
public FileSystemPreferencesStore(File preferencesFile)
{
@@ -345,9 +349,10 @@ public class FileSystemPreferencesProviderImpl
_objectMapper.configure(SerializationConfig.Feature.INDENT_OUTPUT, true);
_objectMapper.configure(JsonParser.Feature.ALLOW_COMMENTS, true);
_preferences = new TreeMap<String, Map<String, Object>>();
+ _fileHelper = new FileHelper();
}
- public void createIfNotExist()
+ public void createIfNotExist(String filePermissions)
{
if (!_storeFile.exists())
{
@@ -358,7 +363,8 @@ public class FileSystemPreferencesProviderImpl
}
try
{
- if (_storeFile.createNewFile() && !_storeFile.exists())
+ Path path = _fileHelper.createNewFile(_storeFile, filePermissions);
+ if (!Files.exists(path))
{
throw new IllegalConfigurationException(String.format("Cannot create preferences store file at '%s'", _storeFile.getAbsolutePath()));
}
@@ -391,43 +397,20 @@ public class FileSystemPreferencesProviderImpl
}
try
{
- _storeRAF = new RandomAccessFile(_storeFile, "rw");
- FileChannel fileChannel = _storeRAF.getChannel();
- try
- {
- _storeLock = fileChannel.tryLock();
- }
- catch (OverlappingFileLockException e)
- {
- _storeLock = null;
- }
- if (_storeLock == null)
+ getFileLock(_storeFile.getPath() + ".lck");
+ if (_storeFile.length() > 0)
{
- throw new IllegalConfigurationException("Cannot get lock on store file " + _storeFile.getName()
- + " is another instance running?");
- }
- long fileSize = fileChannel.size();
- if (fileSize > 0)
- {
- ByteBuffer buffer = ByteBuffer.allocate((int) fileSize);
- fileChannel.read(buffer);
- buffer.rewind();
- buffer.flip();
- byte[] data = buffer.array();
- try
- {
- Map<String, Map<String, Object>> preferencesMap = _objectMapper.readValue(data,
- new TypeReference<Map<String, Map<String, Object>>>()
- {
- });
- _preferences.putAll(preferencesMap);
- }
- catch (JsonProcessingException e)
- {
- throw new IllegalConfigurationException("Cannot parse preferences json in " + _storeFile.getName(), e);
- }
+ Map<String, Map<String, Object>> preferencesMap = _objectMapper.readValue(_storeFile,
+ new TypeReference<Map<String, Map<String, Object>>>()
+ {
+ });
+ _preferences.putAll(preferencesMap);
}
}
+ catch (JsonProcessingException e)
+ {
+ throw new IllegalConfigurationException("Cannot parse preferences json in " + _storeFile.getName(), e);
+ }
catch (IOException e)
{
throw new IllegalConfigurationException("Cannot load preferences from " + _storeFile.getName(), e);
@@ -443,6 +426,7 @@ public class FileSystemPreferencesProviderImpl
if (_storeLock != null)
{
_storeLock.release();
+ _storeLock.channel().close();
}
}
catch (IOException e)
@@ -452,22 +436,7 @@ public class FileSystemPreferencesProviderImpl
finally
{
_storeLock = null;
- try
- {
- if (_storeRAF != null)
- {
- _storeRAF.close();
- }
- }
- catch (IOException e)
- {
- LOGGER.error("Cannot close preferences file", e);
- }
- finally
- {
- _storeRAF = null;
- _preferences.clear();
- }
+ _preferences.clear();
}
}
}
@@ -544,16 +513,14 @@ public class FileSystemPreferencesProviderImpl
checkStoreOpened();
try
{
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- _objectMapper.writeValue(baos, _preferences);
- FileChannel channel = _storeRAF.getChannel();
- long currentSize = channel.size();
- channel.position(0);
- channel.write(ByteBuffer.wrap(baos.toByteArray()));
- if (currentSize > baos.size())
+ _fileHelper.writeFileSafely(_storeFile.toPath(), new BaseAction<File, IOException>()
{
- channel.truncate(baos.size());
- }
+ @Override
+ public void performAction(File file) throws IOException
+ {
+ _objectMapper.writeValue(file, _preferences);
+ }
+ });
}
catch (IOException e)
{
@@ -569,5 +536,32 @@ public class FileSystemPreferencesProviderImpl
}
}
+ private void getFileLock(String lockFilePath)
+ {
+ File lockFile = new File(lockFilePath);
+ try
+ {
+ lockFile.createNewFile();
+ lockFile.deleteOnExit();
+
+ @SuppressWarnings("resource")
+ FileOutputStream out = new FileOutputStream(lockFile);
+ FileChannel channel = out.getChannel();
+ _storeLock = channel.tryLock();
+ }
+ catch (IOException ioe)
+ {
+ throw new IllegalStateException("Cannot create the lock file " + lockFile.getName(), ioe);
+ }
+ catch(OverlappingFileLockException e)
+ {
+ _storeLock = null;
+ }
+
+ if(_storeLock == null)
+ {
+ throw new IllegalStateException("Cannot get lock on file " + lockFile.getAbsolutePath() + ". Is another instance running?");
+ }
+ }
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConnectionValidator.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConnectionValidator.java
new file mode 100644
index 0000000000..11f8944863
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConnectionValidator.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.plugin;
+
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+
+public interface ConnectionValidator extends Pluggable
+{
+ boolean validateConnectionCreation(AMQConnectionModel<?, ?> connection);
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFilterFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFilterFactory.java
new file mode 100644
index 0000000000..372642310e
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFilterFactory.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.plugin;
+
+import java.util.List;
+
+import org.apache.qpid.server.filter.MessageFilter;
+
+public interface MessageFilterFactory extends Pluggable
+{
+ MessageFilter newInstance(List<String> arguments);
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/QpidServiceLoader.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/QpidServiceLoader.java
index f70afb12ba..e6100efda7 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/QpidServiceLoader.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/QpidServiceLoader.java
@@ -19,8 +19,11 @@
package org.apache.qpid.server.plugin;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.ServiceLoader;
import org.apache.log4j.Logger;
@@ -47,6 +50,16 @@ public class QpidServiceLoader
return instancesOf(clazz, true);
}
+ public <C extends Pluggable> Map<String,C> getInstancesByType(Class<C> clazz)
+ {
+ Map<String,C> instances = new HashMap<>();
+ for(C instance : instancesOf(clazz))
+ {
+ instances.put(instance.getType(), instance);
+ }
+ return Collections.unmodifiableMap(instances);
+ }
+
private <C extends Pluggable> Iterable<C> instancesOf(Class<C> clazz, boolean atLeastOne)
{
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 60999fb2be..639d569e8f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -30,7 +30,9 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
@@ -52,6 +54,7 @@ import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
@@ -75,6 +78,8 @@ import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.QueueNotificationListener;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.StateTransition;
+import org.apache.qpid.server.plugin.MessageFilterFactory;
+import org.apache.qpid.server.plugin.QpidServiceLoader;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.SecurityManager;
@@ -186,6 +191,9 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
@ManagedAttributeField
private MessageDurability _messageDurability;
+ @ManagedAttributeField
+ private Map<String, Map<String,List<String>>> _defaultFilters;
+
private Object _exclusiveOwner; // could be connection, session, Principal or a String for the container name
private final Set<NotificationCheck> _notificationChecks =
@@ -241,12 +249,15 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
private long _minimumMessageTtl;
@ManagedAttributeField
private long _maximumMessageTtl;
+ @ManagedAttributeField
+ private boolean _ensureNondestructiveConsumers;
private final AtomicBoolean _recovering = new AtomicBoolean(true);
private final ConcurrentLinkedQueue<EnqueueRequest> _postRecoveryQueue = new ConcurrentLinkedQueue<>();
private final QueueRunner _queueRunner = new QueueRunner(this);
private boolean _closing;
+ private final ConcurrentMap<String,MessageFilter> _defaultFiltersMap = new ConcurrentHashMap<>();
protected AbstractQueue(Map<String, Object> attributes, VirtualHostImpl virtualHost)
{
@@ -283,11 +294,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
});
}
- if (isDurable())
- {
- _virtualHost.getDurableConfigurationStore().create(asObjectRecord());
- }
- else if(getMessageDurability() != MessageDurability.NEVER)
+ if(!isDurable() && getMessageDurability() != MessageDurability.NEVER)
{
Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(),
new PrivilegedAction<Object>()
@@ -351,17 +358,9 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
case PRINCIPAL:
_exclusiveOwner = sessionModel.getConnectionModel().getAuthorizedPrincipal();
- if(isDurable())
- {
- _virtualHost.getDurableConfigurationStore().update(false,asObjectRecord());
- }
break;
case CONTAINER:
_exclusiveOwner = sessionModel.getConnectionModel().getRemoteContainerName();
- if(isDurable())
- {
- _virtualHost.getDurableConfigurationStore().update(false,asObjectRecord());
- }
break;
case CONNECTION:
_exclusiveOwner = sessionModel.getConnectionModel();
@@ -450,6 +449,40 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
_maxAsyncDeliveries = getContextValue(Integer.class, Queue.MAX_ASYNCHRONOUS_DELIVERIES);
+
+ if(_defaultFilters != null)
+ {
+ QpidServiceLoader qpidServiceLoader = new QpidServiceLoader();
+ final Map<String, MessageFilterFactory> messageFilterFactories =
+ qpidServiceLoader.getInstancesByType(MessageFilterFactory.class);
+
+ for (Map.Entry<String,Map<String,List<String>>> entry : _defaultFilters.entrySet())
+ {
+ String name = String.valueOf(entry.getKey());
+ Map<String, List<String>> filterValue = entry.getValue();
+ if(filterValue.size() == 1)
+ {
+ String filterTypeName = String.valueOf(filterValue.keySet().iterator().next());
+ MessageFilterFactory filterFactory = messageFilterFactories.get(filterTypeName);
+ if(filterFactory != null)
+ {
+ List<String> filterArguments = filterValue.values().iterator().next();
+ _defaultFiltersMap.put(name, filterFactory.newInstance(filterArguments));
+ }
+ else
+ {
+ throw new IllegalArgumentException("Unknown filter type " + filterTypeName + ", known types are: " + messageFilterFactories.keySet());
+ }
+ }
+ else
+ {
+ throw new IllegalArgumentException("Filter value should be a map with one entry, having the type as key and the value being the filter arguments, not " + filterValue);
+
+ }
+
+ }
+ }
+
updateAlertChecks();
}
@@ -555,6 +588,12 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
@Override
+ public Map<String, Map<String, List<String>>> getDefaultFilters()
+ {
+ return _defaultFilters;
+ }
+
+ @Override
public final MessageDurability getMessageDurability()
{
return _messageDurability;
@@ -573,6 +612,14 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
@Override
+ public boolean isEnsureNondestructiveConsumers()
+ {
+ return _ensureNondestructiveConsumers;
+ }
+
+
+
+ @Override
public Collection<String> getAvailableAttributes()
{
return new ArrayList<String>(_arguments.keySet());
@@ -603,7 +650,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
@Override
public synchronized QueueConsumerImpl addConsumer(final ConsumerTarget target,
- final FilterManager filters,
+ FilterManager filters,
final Class<? extends ServerMessage> messageClass,
final String consumerName,
EnumSet<ConsumerImpl.Option> optionSet)
@@ -699,6 +746,26 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
{
throw new ExistingConsumerPreventsExclusive();
}
+ if(!_defaultFiltersMap.isEmpty())
+ {
+ if(filters == null)
+ {
+ filters = new FilterManager();
+ }
+ for (Map.Entry<String,MessageFilter> filter : _defaultFiltersMap.entrySet())
+ {
+ if(!filters.hasFilter(filter.getKey()))
+ {
+ filters.add(filter.getKey(), filter.getValue());
+ }
+ }
+ }
+
+ if(_ensureNondestructiveConsumers)
+ {
+ optionSet = EnumSet.copyOf(optionSet);
+ optionSet.removeAll(EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES, ConsumerImpl.Option.ACQUIRES));
+ }
QueueConsumerImpl consumer = new QueueConsumerImpl(this,
target,
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
index 49732e8345..367a12057d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
@@ -62,11 +62,16 @@ public class QueueArgumentsConverter
public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue";
+ public static final String QPID_DEFAULT_FILTERS = "qpid.default_filters";
+
+ public static final String QPID_ENSURE_NONDESTRUCTIVE_CONSUMERS = "qpid.ensure_nondestructive_consumers";
/**
* No-local queue argument is used to support the no-local feature of Durable Subscribers.
*/
public static final String QPID_NO_LOCAL = "no-local";
+
static final Map<String, String> ATTRIBUTE_MAPPINGS = new LinkedHashMap<String, String>();
+
static
{
ATTRIBUTE_MAPPINGS.put(X_QPID_MINIMUM_ALERT_REPEAT_GAP, Queue.ALERT_REPEAT_GAP);
@@ -99,6 +104,8 @@ public class QueueArgumentsConverter
ATTRIBUTE_MAPPINGS.put(QPID_NO_LOCAL, Queue.NO_LOCAL);
ATTRIBUTE_MAPPINGS.put(QPID_MESSAGE_DURABILITY, Queue.MESSAGE_DURABILITY);
+ ATTRIBUTE_MAPPINGS.put(QPID_DEFAULT_FILTERS, Queue.DEFAULT_FILTERS);
+ ATTRIBUTE_MAPPINGS.put(QPID_ENSURE_NONDESTRUCTIVE_CONSUMERS, Queue.ENSURE_NONDESTRUCTIVE_CONSUMERS);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 452c5ff14f..917c951b6d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -371,11 +371,16 @@ public abstract class QueueEntryImpl implements QueueEntry
}
}
- private void dequeue()
+ private boolean dequeue()
{
EntryState state = _state;
- if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
+ while(state.getState() == State.ACQUIRED && !_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
+ {
+ state = _state;
+ }
+
+ if(state.getState() == State.ACQUIRED)
{
if (state instanceof ConsumerAcquiredState || state instanceof LockedAcquiredState)
{
@@ -387,7 +392,11 @@ public abstract class QueueEntryImpl implements QueueEntry
{
notifyStateChange(state.getState() , QueueEntry.State.DEQUEUED);
}
-
+ return true;
+ }
+ else
+ {
+ return false;
}
}
@@ -420,9 +429,10 @@ public abstract class QueueEntryImpl implements QueueEntry
public void delete()
{
- dequeue();
-
- dispose();
+ if(dequeue())
+ {
+ dispose();
+ }
}
public int routeToAlternate(final Action<? super MessageInstance> action, ServerTransaction txn)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStore.java
index 0607f4b3d3..8b6a83d443 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStore.java
@@ -62,7 +62,7 @@ public interface FileKeyStore<X extends FileKeyStore<X>> extends KeyStore<X>
@ManagedAttribute(defaultValue = "${this:path}")
String getDescription();
- @ManagedAttribute( mandatory = true, secure = true, oversize = true, oversizedAltText = OVER_SIZED_ATTRIBUTE_ALTERNATIVE_TEXT)
+ @ManagedAttribute( mandatory = true, secure = true, oversize = true, oversizedAltText = OVER_SIZED_ATTRIBUTE_ALTERNATIVE_TEXT, secureValueFilter = "^data\\:.*")
String getStoreUrl();
@DerivedAttribute
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStore.java
index 78509182b5..f239b83f27 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStore.java
@@ -31,7 +31,7 @@ public interface NonJavaKeyStore<X extends NonJavaKeyStore<X>> extends KeyStore<
@ManagedAttribute(defaultValue = "${this:subjectName}")
String getDescription();
- @ManagedAttribute( mandatory = true, secure = true, oversize = true, oversizedAltText = OVER_SIZED_ATTRIBUTE_ALTERNATIVE_TEXT )
+ @ManagedAttribute( mandatory = true, secure = true, oversize = true, oversizedAltText = OVER_SIZED_ATTRIBUTE_ALTERNATIVE_TEXT, secureValueFilter = "^data\\:.*")
String getPrivateKeyUrl();
@ManagedAttribute( mandatory = true, oversize = true, oversizedAltText = OVER_SIZED_ATTRIBUTE_ALTERNATIVE_TEXT )
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/database/AbstractPasswordFilePrincipalDatabase.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/database/AbstractPasswordFilePrincipalDatabase.java
index cb5bc54cd2..2c692ddf4d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/database/AbstractPasswordFilePrincipalDatabase.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/database/AbstractPasswordFilePrincipalDatabase.java
@@ -22,6 +22,8 @@ package org.apache.qpid.server.security.auth.database;
import org.apache.log4j.Logger;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
+import org.apache.qpid.server.util.BaseAction;
+import org.apache.qpid.server.util.FileHelper;
import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.login.AccountNotFoundException;
@@ -36,7 +38,6 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
@@ -45,9 +46,9 @@ public abstract class AbstractPasswordFilePrincipalDatabase<U extends PasswordPr
protected static final String DEFAULT_ENCODING = "utf-8";
private final Pattern _regexp = Pattern.compile(":");
- private final Map<String, U> _userMap = new HashMap<String, U>();
+ private final Map<String, U> _userMap = new HashMap<>();
private final ReentrantLock _userUpdate = new ReentrantLock();
- private final Random _random = new Random();
+ private final FileHelper _fileHelper = new FileHelper();
private File _passwordFile;
public final void open(File passwordFile) throws IOException
@@ -181,7 +182,7 @@ public abstract class AbstractPasswordFilePrincipalDatabase<U extends PasswordPr
try
{
_userUpdate.lock();
- final Map<String, U> newUserMap = new HashMap<String, U>();
+ final Map<String, U> newUserMap = new HashMap<>();
BufferedReader reader = null;
try
@@ -224,71 +225,33 @@ public abstract class AbstractPasswordFilePrincipalDatabase<U extends PasswordPr
protected abstract Logger getLogger();
- protected File createTempFileOnSameFilesystem()
- {
- File liveFile = _passwordFile;
- File tmp;
-
- do
- {
- tmp = new File(liveFile.getPath() + _random.nextInt() + ".tmp");
- }
- while(tmp.exists());
- tmp.deleteOnExit();
- return tmp;
- }
-
- protected void swapTempFileToLive(final File temp) throws IOException
+ protected void savePasswordFile() throws IOException
{
- File live = _passwordFile;
- // Remove any existing ".old" file
- final File old = new File(live.getAbsoluteFile() + ".old");
- if (old.exists())
+ try
{
- old.delete();
- }
+ _userUpdate.lock();
- // Create an new ".old" file
- if(!live.renameTo(old))
- {
- //unable to rename the existing file to the backup name
- getLogger().error("Could not backup the existing password file");
- throw new IOException("Could not backup the existing password file");
+ _fileHelper.writeFileSafely(_passwordFile.toPath(), new BaseAction<File,IOException>()
+ {
+ @Override
+ public void performAction(File file) throws IOException
+ {
+ writeToFile(file);
+ }
+ });
}
-
- // Move temp file to be the new "live" file
- if(!temp.renameTo(live))
+ finally
{
- //failed to rename the new file to the required filename
- if(!old.renameTo(live))
- {
- //unable to return the backup to required filename
- getLogger().error(
- "Could not rename the new password file into place, and unable to restore original file");
- throw new IOException("Could not rename the new password file into place, and unable to restore original file");
- }
-
- getLogger().error("Could not rename the new password file into place");
- throw new IOException("Could not rename the new password file into place");
+ _userUpdate.unlock();
}
}
- protected void savePasswordFile() throws IOException
+ private void writeToFile(File tmp) throws IOException
{
- try
- {
- _userUpdate.lock();
-
- BufferedReader reader = null;
- PrintStream writer = null;
-
- File tmp = createTempFileOnSameFilesystem();
-
- try
+ try(PrintStream writer = new PrintStream(tmp);
+ BufferedReader reader = new BufferedReader(new FileReader(_passwordFile)))
{
- writer = new PrintStream(tmp);
- reader = new BufferedReader(new FileReader(_passwordFile));
String line;
while ((line = reader.readLine()) != null)
@@ -346,32 +309,6 @@ public abstract class AbstractPasswordFilePrincipalDatabase<U extends PasswordPr
getLogger().error("Unable to create the new password file: " + e);
throw new IOException("Unable to create the new password file",e);
}
- finally
- {
-
- try
- {
- if (reader != null)
- {
- reader.close();
- }
- }
- finally
- {
- if (writer != null)
- {
- writer.close();
- }
- }
-
- }
-
- swapTempFileToLive(tmp);
- }
- finally
- {
- _userUpdate.unlock();
- }
}
protected abstract U createUserFromPassword(Principal principal, char[] passwd);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/Base64MD5PasswordDatabaseAuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/Base64MD5PasswordDatabaseAuthenticationManager.java
index 6fe649ff04..0b5e17306c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/Base64MD5PasswordDatabaseAuthenticationManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/Base64MD5PasswordDatabaseAuthenticationManager.java
@@ -28,7 +28,7 @@ import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
import org.apache.qpid.server.security.auth.database.Base64MD5PasswordFilePrincipalDatabase;
import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
-@ManagedObject( category = false, type = "Base64MD5PasswordFile" )
+@ManagedObject( category = false, managesChildren = true, type = "Base64MD5PasswordFile" )
public class Base64MD5PasswordDatabaseAuthenticationManager
extends PrincipalDatabaseAuthenticationManager<Base64MD5PasswordDatabaseAuthenticationManager>
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PlainPasswordDatabaseAuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PlainPasswordDatabaseAuthenticationManager.java
index e6d2fcf44c..b86bfac0ad 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PlainPasswordDatabaseAuthenticationManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PlainPasswordDatabaseAuthenticationManager.java
@@ -28,7 +28,7 @@ import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
import org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase;
import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
-@ManagedObject( category = false, type = "PlainPasswordFile" )
+@ManagedObject( category = false, managesChildren = true, type = "PlainPasswordFile" )
public class PlainPasswordDatabaseAuthenticationManager extends PrincipalDatabaseAuthenticationManager<PlainPasswordDatabaseAuthenticationManager>
{
public static final String PROVIDER_TYPE = "PlainPasswordFile";
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
index d3c9635502..cf165ff4af 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
@@ -23,6 +23,8 @@ package org.apache.qpid.server.security.auth.manager;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.security.AccessControlException;
import java.security.Principal;
import java.util.Collection;
@@ -40,6 +42,7 @@ import javax.security.sasl.SaslServer;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.Broker;
@@ -56,6 +59,7 @@ import org.apache.qpid.server.security.auth.AuthenticationResult;
import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
+import org.apache.qpid.server.util.FileHelper;
public abstract class PrincipalDatabaseAuthenticationManager<T extends PrincipalDatabaseAuthenticationManager<T>>
extends AbstractAuthenticationManager<T>
@@ -96,7 +100,11 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal
{
try
{
- passwordFile.createNewFile();
+ Path path = new FileHelper().createNewFile(passwordFile, getContextValue(String.class, BrokerProperties.POSIX_FILE_PERMISSIONS));
+ if (!Files.exists(path))
+ {
+ throw new IllegalConfigurationException(String.format("Cannot create password file at '%s'", _path));
+ }
}
catch (IOException e)
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/FileGroupDatabase.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/FileGroupDatabase.java
index 0a02ce38fc..1e9e646e35 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/FileGroupDatabase.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/FileGroupDatabase.java
@@ -34,6 +34,8 @@ import java.util.concurrent.ConcurrentSkipListSet;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.util.BaseAction;
+import org.apache.qpid.server.util.FileHelper;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
/**
@@ -232,9 +234,9 @@ public class FileGroupDatabase implements GroupDatabase
}
}
- private synchronized void writeGroupFile(String groupFile) throws IOException
+ private synchronized void writeGroupFile(final String groupFile) throws IOException
{
- Properties propertiesFile = new Properties();
+ final Properties propertiesFile = new Properties();
for (String group : _groupToUserMap.keySet())
{
@@ -244,19 +246,19 @@ public class FileGroupDatabase implements GroupDatabase
propertiesFile.setProperty(group + ".users", userList);
}
- String comment = "Written " + new Date();
- FileOutputStream fileOutputStream = new FileOutputStream(groupFile);
- try
- {
- propertiesFile.store(fileOutputStream, comment);
- }
- finally
+
+ new FileHelper().writeFileSafely(new File(groupFile).toPath(), new BaseAction<File, IOException>()
{
- if(fileOutputStream != null)
+ @Override
+ public void performAction(File file) throws IOException
{
- fileOutputStream.close();
+ String comment = "Written " + new Date();
+ try(FileOutputStream fileOutputStream = new FileOutputStream(file))
+ {
+ propertiesFile.store(fileOutputStream, comment);
+ }
}
- }
+ });
}
private void validatePropertyNameIsGroupName(String propertyName)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
index 0de6543713..ad671d7e99 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
@@ -27,6 +27,8 @@ import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -40,6 +42,9 @@ import java.util.Map;
import java.util.UUID;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.configuration.BrokerProperties;
+import org.apache.qpid.server.util.BaseAction;
+import org.apache.qpid.server.util.FileHelper;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.JsonProcessingException;
import org.codehaus.jackson.Version;
@@ -85,6 +90,7 @@ public class JsonFileConfigStore implements DurableConfigurationStore
private final Map<String, List<UUID>> _idsByType = new HashMap<String, List<UUID>>();
private final ObjectMapper _objectMapper = new ObjectMapper();
private final Class<? extends ConfiguredObject> _rootClass;
+ private final FileHelper _fileHelper;
private Map<String,Class<? extends ConfiguredObject>> _classNameMapping;
private String _directoryName;
@@ -123,6 +129,7 @@ public class JsonFileConfigStore implements DurableConfigurationStore
_objectMapper.registerModule(_module);
_objectMapper.enable(SerializationConfig.Feature.INDENT_OUTPUT);
_rootClass = rootClass;
+ _fileHelper = new FileHelper();
}
@Override
@@ -173,7 +180,7 @@ public class JsonFileConfigStore implements DurableConfigurationStore
_directoryName = fileFromSettings.getParent();
_configFileName = fileFromSettings.getName();
_backupFileName = fileFromSettings.getName() + ".bak";
- _tempFileName = fileFromSettings.getName() + ".tmp";;
+ _tempFileName = fileFromSettings.getName() + ".tmp";
_lockFileName = fileFromSettings.getName() + ".lck";
}
@@ -191,56 +198,45 @@ public class JsonFileConfigStore implements DurableConfigurationStore
checkDirectoryIsWritable(_directoryName);
getFileLock();
- if(!fileExists(_configFileName))
+ Path storeFile = new File(_directoryName, _configFileName).toPath();
+ Path backupFile = new File(_directoryName, _backupFileName).toPath();
+ if(!Files.exists(storeFile))
{
- if(!fileExists(_backupFileName))
+ if(!Files.exists(backupFile))
{
- File newFile = new File(_directoryName, _configFileName);
try
{
- _objectMapper.writeValue(newFile, Collections.emptyMap());
+ String posixFileAttributes = _parent.getContextValue(String.class, BrokerProperties.POSIX_FILE_PERMISSIONS);
+ storeFile = _fileHelper.createNewFile(storeFile, posixFileAttributes);
+ _objectMapper.writeValue(storeFile.toFile(), Collections.emptyMap());
}
catch (IOException e)
{
- throw new StoreException("Could not write configuration file " + newFile, e);
+ throw new StoreException("Could not write configuration file " + storeFile, e);
}
}
else
{
- renameFile(_backupFileName, _configFileName);
+ try
+ {
+ _fileHelper.atomicFileMoveOrReplace(backupFile, storeFile);
+ }
+ catch (IOException e)
+ {
+ throw new StoreException("Could not move backup to configuration file " + storeFile, e);
+ }
}
}
- deleteFileIfExists(_backupFileName);
- }
-
- private void renameFile(String fromFileName, String toFileName)
- {
- File toFile = deleteFileIfExists(toFileName);
- File fromFile = new File(_directoryName, fromFileName);
- if(!fromFile.renameTo(toFile))
+ try
{
- throw new StoreException("Cannot rename file " + fromFile.getAbsolutePath() + " to " + toFile.getAbsolutePath());
+ Files.deleteIfExists(backupFile);
}
- }
-
- private File deleteFileIfExists(final String toFileName)
- {
- File toFile = new File(_directoryName, toFileName);
- if(toFile.exists())
+ catch (IOException e)
{
- if(!toFile.delete())
- {
- throw new StoreException("Cannot delete file " + toFile.getAbsolutePath());
- }
+ throw new StoreException("Could not delete backup file " + backupFile, e);
}
- return toFile;
- }
- private boolean fileExists(String fileName)
- {
- File file = new File(_directoryName, fileName);
- return file.exists();
}
private void getFileLock()
@@ -396,7 +392,7 @@ public class JsonFileConfigStore implements DurableConfigurationStore
private void save()
{
UUID rootId = getRootId();
- Map<String, Object> data = null;
+ final Map<String, Object> data;
if (rootId == null)
{
@@ -409,15 +405,18 @@ public class JsonFileConfigStore implements DurableConfigurationStore
try
{
- deleteFileIfExists(_tempFileName);
- deleteFileIfExists(_backupFileName);
-
- File tmpFile = new File(_directoryName, _tempFileName);
- _objectMapper.writeValue(tmpFile, data);
- renameFile(_configFileName, _backupFileName);
- renameFile(tmpFile.getName(),_configFileName);
- tmpFile.delete();
- deleteFileIfExists(_backupFileName);
+ Path tmpFile = new File(_directoryName, _tempFileName).toPath();
+ _fileHelper.writeFileSafely( new File(_directoryName, _configFileName).toPath(),
+ new File(_directoryName, _backupFileName).toPath(),
+ tmpFile,
+ new BaseAction<File, IOException>()
+ {
+ @Override
+ public void performAction(File file) throws IOException
+ {
+ _objectMapper.writeValue(file, data);
+ }
+ });
}
catch (IOException e)
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
index 10d8a5d61c..c59fd821c3 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
@@ -20,8 +20,10 @@
*/
package org.apache.qpid.server.store;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -30,14 +32,19 @@ import java.util.UUID;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.configuration.store.StoreConfigurationChangeListener;
import org.apache.qpid.server.filter.FilterSupport;
import org.apache.qpid.server.model.Binding;
+import org.apache.qpid.server.model.ConfigurationChangeListener;
+import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
+import org.apache.qpid.server.util.Action;
public class VirtualHostStoreUpgraderAndRecoverer
{
@@ -509,12 +516,100 @@ public class VirtualHostStoreUpgraderAndRecoverer
}
- public void perform(DurableConfigurationStore durableConfigurationStore)
+ public void perform(final DurableConfigurationStore durableConfigurationStore)
{
String virtualHostCategory = VirtualHost.class.getSimpleName();
GenericStoreUpgrader upgraderHandler = new GenericStoreUpgrader(virtualHostCategory, VirtualHost.MODEL_VERSION, durableConfigurationStore, _upgraders);
upgraderHandler.upgrade();
new GenericRecoverer(_virtualHostNode).recover(upgraderHandler.getRecords());
+
+ final StoreConfigurationChangeListener configChangeListener = new StoreConfigurationChangeListener(durableConfigurationStore);
+ if(_virtualHostNode.getVirtualHost() != null)
+ {
+ applyRecursively(_virtualHostNode.getVirtualHost(), new Action<ConfiguredObject<?>>()
+ {
+ @Override
+ public void performAction(final ConfiguredObject<?> object)
+ {
+ object.addChangeListener(configChangeListener);
+ }
+ });
+ }
+ _virtualHostNode.addChangeListener(new ConfigurationChangeListener()
+ {
+ @Override
+ public void stateChanged(final ConfiguredObject<?> object, final State oldState, final State newState)
+ {
+
+ }
+
+ @Override
+ public void childAdded(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
+ {
+ if(child instanceof VirtualHost)
+ {
+ applyRecursively(child, new Action<ConfiguredObject<?>>()
+ {
+ @Override
+ public void performAction(final ConfiguredObject<?> object)
+ {
+ if(object.isDurable())
+ {
+ durableConfigurationStore.update(true, object.asObjectRecord());
+ object.addChangeListener(configChangeListener);
+ }
+ }
+ });
+
+ }
+ }
+
+ @Override
+ public void childRemoved(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
+ {
+ if(child instanceof VirtualHost)
+ {
+ child.removeChangeListener(configChangeListener);
+ }
+ }
+
+ @Override
+ public void attributeSet(final ConfiguredObject<?> object,
+ final String attributeName,
+ final Object oldAttributeValue,
+ final Object newAttributeValue)
+ {
+
+ }
+ });
+ }
+
+ private void applyRecursively(final ConfiguredObject<?> object, final Action<ConfiguredObject<?>> action)
+ {
+ applyRecursively(object, action, new HashSet<ConfiguredObject<?>>());
+ }
+
+ private void applyRecursively(final ConfiguredObject<?> object,
+ final Action<ConfiguredObject<?>> action,
+ final HashSet<ConfiguredObject<?>> visited)
+ {
+ if(!visited.contains(object))
+ {
+ visited.add(object);
+ action.performAction(object);
+ for(Class<? extends ConfiguredObject> childClass : object.getModel().getChildTypes(object.getCategoryClass()))
+ {
+ Collection<? extends ConfiguredObject> children = object.getChildren(childClass);
+ if(children != null)
+ {
+ for(ConfiguredObject<?> child : children)
+ {
+ applyRecursively(child, action, visited);
+ }
+ }
+ }
+ }
}
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
index 8d35e1a94f..d2351b5500 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
@@ -36,8 +36,13 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import org.slf4j.LoggerFactory;
+
+
public class SelectorThread extends Thread
{
+ private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(SelectorThread.class);
+
public static final String IO_THREAD_NAME_PREFIX = "NCS-";
private final Queue<Runnable> _tasks = new ConcurrentLinkedQueue<>();
private final Queue<NonBlockingConnection> _unregisteredConnections = new ConcurrentLinkedQueue<>();
@@ -165,7 +170,8 @@ public class SelectorThread extends Thread
NonBlockingConnection connection = iterator.next();
int period = connection.getTicker().getTimeToNextTick(currentTime);
- if (period < 0 || connection.isStateChanged())
+
+ if (period <= 0 || connection.isStateChanged())
{
toBeScheduled.add(connection);
connection.getSocketChannel().register(_selector, 0).cancel();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/Action.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/Action.java
index 0d53b4d03b..715709a1b2 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/Action.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/Action.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.util;
-public interface Action<T>
+public interface Action<T> extends BaseAction<T, RuntimeException>
{
void performAction(T object);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/BaseAction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/BaseAction.java
new file mode 100644
index 0000000000..f7dbcb4d3c
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/BaseAction.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+public interface BaseAction<T, E extends Exception>
+{
+ void performAction(T object) throws E;
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/FileHelper.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/FileHelper.java
new file mode 100644
index 0000000000..0e1a28f220
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/FileHelper.java
@@ -0,0 +1,133 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.AtomicMoveNotSupportedException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.attribute.PosixFileAttributeView;
+import java.nio.file.attribute.PosixFilePermission;
+import java.nio.file.attribute.PosixFilePermissions;
+import java.util.Set;
+
+public class FileHelper
+{
+
+ public void writeFileSafely(Path targetFile, BaseAction<File, IOException> operation) throws IOException
+ {
+ String name = targetFile.toFile().getName();
+ writeFileSafely(targetFile,
+ targetFile.resolveSibling(name + ".bak"),
+ targetFile.resolveSibling(name + ".tmp"),
+ operation);
+ }
+
+ public void writeFileSafely(Path targetFile, Path backupFile, Path tmpFile, BaseAction<File, IOException> write) throws IOException
+ {
+ Files.deleteIfExists(tmpFile);
+ Files.deleteIfExists(backupFile);
+
+ Set<PosixFilePermission> permissions = null;
+ if (Files.exists(targetFile) && isPosixFileSystem(targetFile))
+ {
+ permissions = Files.getPosixFilePermissions(targetFile);
+ }
+
+ tmpFile = createNewFile(tmpFile, permissions);
+
+ write.performAction(tmpFile.toFile());
+
+ atomicFileMoveOrReplace(targetFile, backupFile);
+
+ if (permissions != null)
+ {
+ Files.setPosixFilePermissions(backupFile, permissions);
+ }
+
+ atomicFileMoveOrReplace(tmpFile, targetFile);
+
+ Files.deleteIfExists(tmpFile);
+ Files.deleteIfExists(backupFile);
+ }
+
+ public Path createNewFile(File newFile, String posixFileAttributes) throws IOException
+ {
+ return createNewFile(newFile.toPath(), posixFileAttributes);
+ }
+
+ public Path createNewFile(Path newFile, String posixFileAttributes) throws IOException
+ {
+ Set<PosixFilePermission> permissions = posixFileAttributes == null ? null : PosixFilePermissions.fromString(posixFileAttributes);
+ return createNewFile(newFile, permissions );
+ }
+
+ public Path createNewFile(Path newFile, Set<PosixFilePermission> permissions) throws IOException
+ {
+ if (!Files.exists(newFile))
+ {
+ newFile = Files.createFile(newFile);
+ }
+
+ if (permissions != null && isPosixFileSystem(newFile))
+ {
+ Files.setPosixFilePermissions(newFile, permissions);
+ }
+
+ return newFile;
+ }
+
+ public boolean isPosixFileSystem(Path path) throws IOException
+ {
+ while (!Files.exists(path))
+ {
+ path = path.getParent();
+
+ if (path == null)
+ {
+ return false;
+ }
+ }
+ return Files.getFileStore(path).supportsFileAttributeView(PosixFileAttributeView.class);
+ }
+
+ public Path atomicFileMoveOrReplace(Path sourceFile, Path targetFile) throws IOException
+ {
+ try
+ {
+ return Files.move(sourceFile, targetFile, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
+ }
+ catch(AtomicMoveNotSupportedException e)
+ {
+ if (sourceFile.toFile().renameTo(targetFile.toFile()))
+ {
+ return targetFile;
+ }
+ else
+ {
+ throw new RuntimeException("Atomic move is unsupported and rename from : '"
+ + sourceFile + "' to: '" + targetFile + "' failed.");
+ }
+ }
+ }
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index dce902b126..06917f0161 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -63,6 +63,8 @@ import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.*;
import org.apache.qpid.server.model.adapter.ConnectionAdapter;
+import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.plugin.ConnectionValidator;
import org.apache.qpid.server.plugin.QpidServiceLoader;
import org.apache.qpid.server.plugin.SystemNodeCreator;
import org.apache.qpid.server.protocol.AMQConnectionModel;
@@ -75,7 +77,6 @@ import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
-import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
@@ -94,6 +95,8 @@ import org.apache.qpid.server.util.MapValueConverter;
public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> extends AbstractConfiguredObject<X>
implements VirtualHostImpl<X, AMQQueue<?>, ExchangeImpl<?>>, IConnectionRegistry.RegistryChangeListener, EventListener
{
+ private final Collection<ConnectionValidator> _connectionValidators = new ArrayList<>();
+
private static enum BlockingType { STORE, FILESYSTEM };
private static final String USE_ASYNC_RECOVERY = "use_async_message_store_recovery";
@@ -162,6 +165,14 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
@ManagedAttributeField
private int _housekeepingThreadCount;
+ @ManagedAttributeField
+ private List<String> _enabledConnectionValidators;
+
+ @ManagedAttributeField
+ private List<String> _disabledConnectionValidators;
+
+ @ManagedAttributeField
+ private List<String> _globalAddressDomains;
private boolean _useAsyncRecoverer;
@@ -212,6 +223,13 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
{
throw new IllegalArgumentException(getClass().getSimpleName() + " must be durable");
}
+ if(getGlobalAddressDomains() != null)
+ {
+ for(String domain : getGlobalAddressDomains())
+ {
+ validateGlobalAddressDomain(domain);
+ }
+ }
}
@Override
@@ -230,6 +248,26 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
throw new IntegrityViolationException("Cannot delete default virtual host '" + getName() + "'");
}
}
+ if(changedAttributes.contains(GLOBAL_ADDRESS_DOMAINS))
+ {
+ VirtualHost<?, ?, ?> virtualHost = (VirtualHost<?, ?, ?>) proxyForValidation;
+ if(virtualHost.getGlobalAddressDomains() != null)
+ {
+ for(String name : virtualHost.getGlobalAddressDomains())
+ {
+ validateGlobalAddressDomain(name);
+ }
+ }
+ }
+ }
+
+ private void validateGlobalAddressDomain(final String name)
+ {
+ String regex = "/(/?)([\\w_\\-:.\\$]+/)*[\\w_\\-:.\\$]+";
+ if(!name.matches(regex))
+ {
+ throw new IllegalArgumentException("'"+name+"' is not a valid global address domain");
+ }
}
@Override
@@ -243,8 +281,17 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
{
super.validateOnCreate();
validateMessageStoreCreation();
+ if(getGlobalAddressDomains() != null)
+ {
+ for(String name : getGlobalAddressDomains())
+ {
+ validateGlobalAddressDomain(name);
+ }
+ }
}
+
+
private void validateMessageStoreCreation()
{
MessageStore store = createMessageStore();
@@ -293,10 +340,20 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
_messageStore.addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
_messageStore.addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
- addChangeListener(new StoreUpdatingChangeListener());
+ _fileSystemMaxUsagePercent = getContextValue(Integer.class, Broker.STORE_FILESYSTEM_MAX_USAGE_PERCENT);
- _fileSystemMaxUsagePercent = getContextValue(Integer.class, Broker.STORE_FILESYSTEM_MAX_USAGE_PERCENT);
+ QpidServiceLoader serviceLoader = new QpidServiceLoader();
+ for(ConnectionValidator validator : serviceLoader.instancesOf(ConnectionValidator.class))
+ {
+ if((_enabledConnectionValidators.isEmpty()
+ && (_disabledConnectionValidators.isEmpty()) || !_disabledConnectionValidators.contains(validator.getType()))
+ || _enabledConnectionValidators.contains(validator.getType()))
+ {
+ _connectionValidators.add(validator);
+ }
+
+ }
}
private void checkVHostStateIsActive()
@@ -438,6 +495,20 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
return _eventLogger;
}
+ @Override
+ public boolean authoriseCreateConnection(final AMQConnectionModel<?, ?> connection)
+ {
+ getSecurityManager().authoriseCreateConnection(connection);
+ for(ConnectionValidator validator : _connectionValidators)
+ {
+ if(!validator.validateConnectionCreation(connection))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
/**
* Initialise a housekeeping task to iterate over queues cleaning expired messages with no consumers
* and checking for idle or open transactions that have exceeded the permitted thresholds.
@@ -526,9 +597,42 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
@Override
+ public List<String> getEnabledConnectionValidators()
+ {
+ return _enabledConnectionValidators;
+ }
+
+ @Override
+ public List<String> getDisabledConnectionValidators()
+ {
+ return _disabledConnectionValidators;
+ }
+
+ @Override
+ public List<String> getGlobalAddressDomains()
+ {
+ return _globalAddressDomains;
+ }
+
+ @Override
public AMQQueue<?> getQueue(String name)
{
- return (AMQQueue<?>) getChildByName(Queue.class, name);
+ AMQQueue<?> childByName = (AMQQueue<?>) getChildByName(Queue.class, name);
+ if(childByName == null && getGlobalAddressDomains() != null)
+ {
+ for(String domain : getGlobalAddressDomains())
+ {
+ if(name.startsWith(domain + "/"))
+ {
+ childByName = (AMQQueue<?>) getChildByName(Queue.class,name.substring(domain.length()));
+ if(childByName != null)
+ {
+ break;
+ }
+ }
+ }
+ }
+ return childByName;
}
@Override
@@ -556,14 +660,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
{
int purged = queue.deleteAndReturnCount();
- if (queue.isDurable() && !(queue.getLifetimePolicy()
- == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE
- || queue.getLifetimePolicy()
- == LifetimePolicy.DELETE_ON_SESSION_END))
- {
- DurableConfigurationStore store = getDurableConfigurationStore();
- store.remove(queue.asObjectRecord());
- }
return purged;
}
@@ -614,7 +710,22 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
@Override
public ExchangeImpl getExchange(String name)
{
- return getChildByName(ExchangeImpl.class,name);
+ ExchangeImpl childByName = getChildByName(ExchangeImpl.class, name);
+ if(childByName == null && getGlobalAddressDomains() != null)
+ {
+ for(String domain : getGlobalAddressDomains())
+ {
+ if(name.startsWith(domain + "/"))
+ {
+ childByName = getChildByName(ExchangeImpl.class,name.substring(domain.length()));
+ if(childByName != null)
+ {
+ break;
+ }
+ }
+ }
+ }
+ return childByName;
}
@Override
@@ -671,6 +782,23 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
exchange.deleteWithChecks();
}
+ @Override
+ public String getLocalAddress(final String routingAddress)
+ {
+ String localAddress = routingAddress;
+ if(getGlobalAddressDomains() != null)
+ {
+ for(String domain : getGlobalAddressDomains())
+ {
+ if(localAddress.length() > routingAddress.length() - domain.length() && routingAddress.startsWith(domain + "/"))
+ {
+ localAddress = routingAddress.substring(domain.length());
+ }
+ }
+ }
+ return localAddress;
+ }
+
public SecurityManager getSecurityManager()
{
return _broker.getSecurityManager();
@@ -886,6 +1014,12 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
}
+ @Override
+ public String getRedirectHost(final AmqpPort<?> port)
+ {
+ return null;
+ }
+
private class VirtualHostHouseKeepingTask extends HouseKeepingTask
{
public VirtualHostHouseKeepingTask()
@@ -1390,14 +1524,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
return total;
}
- @Override
- protected void onCreate()
- {
- super.onCreate();
- ConfiguredObjectRecord record = asObjectRecord();
- getDurableConfigurationStore().create(new ConfiguredObjectRecordImpl(record.getId(), record.getType(), record.getAttributes()));
- }
-
@StateTransition( currentState = { State.UNINITIALIZED,State.ERRORED }, desiredState = State.ACTIVE )
private void onActivate()
{
@@ -1509,44 +1635,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
onActivate();
}
- private class StoreUpdatingChangeListener implements ConfigurationChangeListener
- {
- @Override
- public void stateChanged(final ConfiguredObject<?> object, final State oldState, final State newState)
- {
- if (object == AbstractVirtualHost.this && isDurable() && newState == State.DELETED)
- {
- getDurableConfigurationStore().remove(asObjectRecord());
- object.removeChangeListener(this);
- }
- }
-
- @Override
- public void childAdded(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
- {
-
- }
-
- @Override
- public void childRemoved(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
- {
-
- }
-
- @Override
- public void attributeSet(final ConfiguredObject<?> object,
- final String attributeName,
- final Object oldAttributeValue,
- final Object newAttributeValue)
- {
- if (object == AbstractVirtualHost.this && isDurable() && getState() != State.DELETED && isAttributePersisted(attributeName)
- && !(attributeName.equals(VirtualHost.DESIRED_STATE) && newAttributeValue.equals(State.DELETED)))
- {
- getDurableConfigurationStore().update(false, asObjectRecord());
- }
- }
- }
-
private class FileSystemSpaceChecker extends HouseKeepingTask
{
private boolean _fileSystemFull;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
index 29729b6c7d..dff8a80dd9 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
@@ -33,6 +33,7 @@ import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.NoFactoryForTypeException;
import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.SecurityManager;
@@ -108,4 +109,7 @@ public interface VirtualHostImpl< X extends VirtualHostImpl<X,Q,E>, Q extends AM
EventLogger getEventLogger();
+ boolean authoriseCreateConnection(AMQConnectionModel<?, ?> connection);
+
+ String getLocalAddress(String routingAddress);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHost.java
new file mode 100644
index 0000000000..5e87b2e511
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHost.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhostnode;
+
+import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.virtualhost.NonStandardVirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+
+public interface RedirectingVirtualHost<X extends RedirectingVirtualHost<X>>
+ extends VirtualHostImpl<X, AMQQueue<?>, ExchangeImpl<?>>,
+ NonStandardVirtualHost<X,AMQQueue<?>,ExchangeImpl<?>>
+{
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
new file mode 100644
index 0000000000..cacc981e9b
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
@@ -0,0 +1,517 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhostnode;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ScheduledFuture;
+
+import org.apache.qpid.server.connection.IConnectionRegistry;
+import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.model.BrokerModel;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Connection;
+import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.ManagedAttributeField;
+import org.apache.qpid.server.model.ManagedObject;
+import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.VirtualHostAlias;
+import org.apache.qpid.server.model.VirtualHostNode;
+import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.LinkRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.txn.DtxRegistry;
+import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
+import org.apache.qpid.server.virtualhost.HouseKeepingTask;
+import org.apache.qpid.server.virtualhost.RequiredExchangeException;
+
+@ManagedObject( category = false, type = RedirectingVirtualHostImpl.TYPE, register = false )
+class RedirectingVirtualHostImpl
+ extends AbstractConfiguredObject<RedirectingVirtualHostImpl>
+ implements RedirectingVirtualHost<RedirectingVirtualHostImpl>
+{
+ public static final String TYPE = "REDIRECTOR";
+ private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
+
+ @ManagedAttributeField
+ private boolean _queue_deadLetterQueueEnabled;
+
+ @ManagedAttributeField
+ private long _housekeepingCheckPeriod;
+
+ @ManagedAttributeField
+ private long _storeTransactionIdleTimeoutClose;
+
+ @ManagedAttributeField
+ private long _storeTransactionIdleTimeoutWarn;
+
+ @ManagedAttributeField
+ private long _storeTransactionOpenTimeoutClose;
+
+ @ManagedAttributeField
+ private long _storeTransactionOpenTimeoutWarn;
+ @ManagedAttributeField
+ private int _housekeepingThreadCount;
+
+ @ManagedAttributeField
+ private List<String> _enabledConnectionValidators;
+
+ @ManagedAttributeField
+ private List<String> _disabledConnectionValidators;
+
+ @ManagedAttributeField
+ private List<String> _globalAddressDomains;
+
+ @ManagedObjectFactoryConstructor
+ public RedirectingVirtualHostImpl(final Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode)
+ {
+ super(parentsMap(virtualHostNode), attributes);
+
+ _messagesDelivered = new StatisticsCounter("messages-delivered-" + getName());
+ _dataDelivered = new StatisticsCounter("bytes-delivered-" + getName());
+ _messagesReceived = new StatisticsCounter("messages-received-" + getName());
+ _dataReceived = new StatisticsCounter("bytes-received-" + getName());
+ setState(State.UNAVAILABLE);
+ }
+
+ @Override
+ protected void validateChange(final ConfiguredObject<?> proxyForValidation, final Set<String> changedAttributes)
+ {
+ super.validateChange(proxyForValidation, changedAttributes);
+
+ throwUnsupportedForRedirector();
+ }
+
+ @Override
+ public String getModelVersion()
+ {
+ return BrokerModel.MODEL_VERSION;
+ }
+
+ @Override
+ protected <C extends ConfiguredObject> C addChild(final Class<C> childClass,
+ final Map<String, Object> attributes,
+ final ConfiguredObject... otherParents)
+ {
+ throwUnsupportedForRedirector();
+ return null;
+ }
+
+ @Override
+ public ExchangeImpl createExchange(final Map<String, Object> attributes)
+ {
+ throwUnsupportedForRedirector();
+ return null;
+ }
+
+ @Override
+ public void removeExchange(final ExchangeImpl<?> exchange, final boolean force)
+ throws ExchangeIsAlternateException, RequiredExchangeException
+ {
+ throwUnsupportedForRedirector();
+ }
+
+ @Override
+ public MessageDestination getMessageDestination(final String name)
+ {
+ return null;
+ }
+
+ @Override
+ public ExchangeImpl<?> getExchange(final String name)
+ {
+ return null;
+ }
+
+ @Override
+ public AMQQueue<?> createQueue(final Map<String, Object> attributes)
+ {
+ throwUnsupportedForRedirector();
+ return null;
+ }
+
+ @Override
+ public void executeTransaction(final TransactionalOperation op)
+ {
+ throwUnsupportedForRedirector();
+ }
+
+ @Override
+ public Collection<String> getExchangeTypeNames()
+ {
+ return getObjectFactory().getSupportedTypes(Exchange.class);
+ }
+
+ @Override
+ public String getRedirectHost(final AmqpPort<?> port)
+ {
+ return ((RedirectingVirtualHostNode<?>)(getParent(VirtualHostNode.class))).getRedirects().get(port);
+ }
+
+ @Override
+ public boolean isQueue_deadLetterQueueEnabled()
+ {
+ return false;
+ }
+
+ @Override
+ public long getHousekeepingCheckPeriod()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getStoreTransactionIdleTimeoutClose()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getStoreTransactionIdleTimeoutWarn()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getStoreTransactionOpenTimeoutClose()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getStoreTransactionOpenTimeoutWarn()
+ {
+ return 0;
+ }
+
+ @Override
+ public int getHousekeepingThreadCount()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getQueueCount()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getExchangeCount()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getConnectionCount()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getBytesIn()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getBytesOut()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getMessagesIn()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getMessagesOut()
+ {
+ return 0;
+ }
+
+ @Override
+ public Collection<VirtualHostAlias> getAliases()
+ {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public Collection<Connection> getConnections()
+ {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public IConnectionRegistry getConnectionRegistry()
+ {
+ return null;
+ }
+
+ @Override
+ public AMQQueue<?> getQueue(final String name)
+ {
+ return null;
+ }
+
+ @Override
+ public MessageSource getMessageSource(final String name)
+ {
+ return null;
+ }
+
+ @Override
+ public AMQQueue<?> getQueue(final UUID id)
+ {
+ return null;
+ }
+
+ @Override
+ public Collection<AMQQueue<?>> getQueues()
+ {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public int removeQueue(final AMQQueue<?> queue)
+ {
+ throwUnsupportedForRedirector();
+ return 0;
+ }
+
+ @Override
+ public Collection<ExchangeImpl<?>> getExchanges()
+ {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public DurableConfigurationStore getDurableConfigurationStore()
+ {
+ return null;
+ }
+
+ @Override
+ public ExchangeImpl<?> getExchange(final UUID id)
+ {
+ return null;
+ }
+
+ @Override
+ public MessageDestination getDefaultDestination()
+ {
+ return null;
+ }
+
+ @Override
+ public MessageStore getMessageStore()
+ {
+ return null;
+ }
+
+ @Override
+ public void setTargetSize(final long targetSize)
+ {
+
+ }
+
+ @Override
+ public long getTotalQueueDepthBytes()
+ {
+ return 0l;
+ }
+
+ @Override
+ public org.apache.qpid.server.security.SecurityManager getSecurityManager()
+ {
+ return null;
+ }
+
+ @Override
+ public void scheduleHouseKeepingTask(final long period, final HouseKeepingTask task)
+ {
+ }
+
+ @Override
+ public long getHouseKeepingTaskCount()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getHouseKeepingCompletedTaskCount()
+ {
+ return 0;
+ }
+
+ @Override
+ public int getHouseKeepingPoolSize()
+ {
+ return 0;
+ }
+
+ @Override
+ public void setHouseKeepingPoolSize(final int newSize)
+ {
+ }
+
+ @Override
+ public int getHouseKeepingActiveCount()
+ {
+ return 0;
+ }
+
+ @Override
+ public DtxRegistry getDtxRegistry()
+ {
+ return null;
+ }
+
+ @Override
+ public LinkRegistry getLinkRegistry(final String remoteContainerId)
+ {
+ return null;
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleTask(final long delay, final Runnable timeoutTask)
+ {
+ throwUnsupportedForRedirector();
+ return null;
+ }
+
+ @Override
+ public boolean getDefaultDeadLetterQueueEnabled()
+ {
+ return false;
+ }
+
+ @Override
+ public EventLogger getEventLogger()
+ {
+ return null;
+ }
+
+ @Override
+ public void registerMessageReceived(final long messageSize, final long timestamp)
+ {
+ throwUnsupportedForRedirector();
+ }
+
+ @Override
+ public void registerMessageDelivered(final long messageSize)
+ {
+ throwUnsupportedForRedirector();
+ }
+
+ @Override
+ public StatisticsCounter getMessageDeliveryStatistics()
+ {
+ return _messagesDelivered;
+ }
+
+ @Override
+ public StatisticsCounter getMessageReceiptStatistics()
+ {
+ return _messagesReceived;
+ }
+
+ @Override
+ public StatisticsCounter getDataDeliveryStatistics()
+ {
+ return _dataDelivered;
+ }
+
+ @Override
+ public StatisticsCounter getDataReceiptStatistics()
+ {
+ return _dataReceived;
+ }
+
+ @Override
+ public void resetStatistics()
+ {
+ }
+
+ @Override
+ public boolean authoriseCreateConnection(final AMQConnectionModel<?, ?> connection)
+ {
+ return false;
+ }
+
+ @Override
+ public List<String> getEnabledConnectionValidators()
+ {
+ return _enabledConnectionValidators;
+ }
+
+ @Override
+ public List<String> getDisabledConnectionValidators()
+ {
+ return _disabledConnectionValidators;
+ }
+
+ @Override
+ public List<String> getGlobalAddressDomains()
+ {
+ return _globalAddressDomains;
+ }
+
+ @Override
+ public String getLocalAddress(final String routingAddress)
+ {
+ String localAddress = routingAddress;
+ if(getGlobalAddressDomains() != null)
+ {
+ for(String domain : getGlobalAddressDomains())
+ {
+ if(localAddress.length() > routingAddress.length() - domain.length() && routingAddress.startsWith(domain + "/"))
+ {
+ localAddress = routingAddress.substring(domain.length());
+ }
+ }
+ }
+ return localAddress;
+ }
+
+ private void throwUnsupportedForRedirector()
+ {
+ throw new IllegalStateException("The virtual host state of " + getState()
+ + " does not permit this operation.");
+ }
+
+
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNode.java
new file mode 100644
index 0000000000..636681db72
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNode.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhostnode;
+
+import java.util.Map;
+
+import org.apache.qpid.server.model.ManagedAttribute;
+import org.apache.qpid.server.model.ManagedObject;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.VirtualHostNode;
+
+@ManagedObject(type= RedirectingVirtualHostNodeImpl.VIRTUAL_HOST_NODE_TYPE, category=false, validChildTypes = "org.apache.qpid.server.virtualhostnode.RedirectingVirtualHostNodeImpl#getSupportedChildTypes()")
+public interface RedirectingVirtualHostNode<X extends RedirectingVirtualHostNode<X>> extends VirtualHostNode<X>
+{
+
+ @ManagedAttribute( defaultValue = "{}")
+ Map<Port<?>, String> getRedirects();
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java
new file mode 100644
index 0000000000..c94d113514
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java
@@ -0,0 +1,124 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhostnode;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.ManagedAttributeField;
+import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.RemoteReplicationNode;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.StateTransition;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.DurableConfigurationStore;
+
+
+public class RedirectingVirtualHostNodeImpl
+ extends AbstractConfiguredObject<RedirectingVirtualHostNodeImpl> implements RedirectingVirtualHostNode<RedirectingVirtualHostNodeImpl>
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(RedirectingVirtualHostImpl.class);
+ public static final String VIRTUAL_HOST_NODE_TYPE = "Redirector";
+
+
+ @ManagedAttributeField
+ private String _virtualHostInitialConfiguration;
+
+ @ManagedAttributeField
+ private Map<Port<?>,String> _redirects;
+
+ private RedirectingVirtualHostImpl _virtualHost;
+
+ @ManagedObjectFactoryConstructor
+ public RedirectingVirtualHostNodeImpl(Map<String, Object> attributes, Broker<?> parent)
+ {
+ super(Collections.<Class<? extends ConfiguredObject>,ConfiguredObject<?>>singletonMap(Broker.class, parent),
+ attributes);
+ }
+
+ @StateTransition( currentState = {State.UNINITIALIZED, State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE )
+ protected void doActivate()
+ {
+ try
+ {
+ _virtualHost = new RedirectingVirtualHostImpl(Collections.<String,Object>singletonMap(ConfiguredObject.NAME,getName()), this);
+ _virtualHost.create();
+ setState(State.ACTIVE);
+ }
+ catch(RuntimeException e)
+ {
+ setState(State.ERRORED);
+ if (getParent(Broker.class).isManagementMode())
+ {
+ LOGGER.warn("Failed to make " + this + " active.", e);
+ }
+ else
+ {
+ throw e;
+ }
+ }
+ }
+
+ @Override
+ public String getVirtualHostInitialConfiguration()
+ {
+ return _virtualHostInitialConfiguration;
+ }
+
+ @Override
+ public VirtualHost<?, ?, ?> getVirtualHost()
+ {
+ return _virtualHost;
+ }
+
+ @Override
+ public DurableConfigurationStore getConfigurationStore()
+ {
+ return null;
+ }
+
+ @Override
+ public Collection<? extends RemoteReplicationNode> getRemoteReplicationNodes()
+ {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Map<Port<?>, String> getRedirects()
+ {
+ return _redirects;
+ }
+
+ public static Map<String, Collection<String>> getSupportedChildTypes()
+ {
+ Collection<String> validVhostTypes = Collections.singleton(RedirectingVirtualHostImpl.TYPE);
+ return Collections.singletonMap(VirtualHost.class.getSimpleName(), validVhostTypes);
+ }
+
+}
diff --git a/qpid/java/broker-core/src/main/resources/system.properties b/qpid/java/broker-core/src/main/resources/system.properties
deleted file mode 100644
index 661b0cba77..0000000000
--- a/qpid/java/broker-core/src/main/resources/system.properties
+++ /dev/null
@@ -1,20 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/BrokerOptionsTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/BrokerOptionsTest.java
index 8c115c6e62..2846950bc1 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/BrokerOptionsTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/BrokerOptionsTest.java
@@ -309,4 +309,14 @@ public class BrokerOptionsTest extends QpidTestCase
assertEquals("unexpected number of entries", 2, props.keySet().size());
assertEquals("value", props.get("name"));
}
+
+
+ public void testSetInitialSystemProperties()
+ {
+ assertNull("Unexpected default value for initial system properties", _options.getInitialSystemProperties());
+
+ _options.setInitialSystemProperties("test.properties");
+
+ assertEquals("test.properties", _options.getInitialSystemProperties());
+ }
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/BrokerTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/BrokerTest.java
new file mode 100644
index 0000000000..195414c7d7
--- /dev/null
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/BrokerTest.java
@@ -0,0 +1,101 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server;
+
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+
+import org.apache.qpid.server.model.BrokerModel;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.test.utils.TestFileUtils;
+import org.apache.qpid.util.FileUtils;
+import org.codehaus.jackson.map.ObjectMapper;
+
+public class BrokerTest extends QpidTestCase
+{
+ private static final String INITIAL_SYSTEM_PROPERTY = "test";
+ private static final String INITIAL_SYSTEM_PROPERTY_VALUE = "testValue";
+
+ private File _initialSystemProperties;
+ private File _initialConfiguration;
+ private File _brokerWork;
+ private Broker _broker;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ // create empty initial configuration
+ Map<String,Object> initialConfig = new HashMap<>();
+ initialConfig.put(ConfiguredObject.NAME, "test");
+ initialConfig.put(org.apache.qpid.server.model.Broker.MODEL_VERSION, BrokerModel.MODEL_VERSION);
+
+ ObjectMapper mapper = new ObjectMapper();
+ String config = mapper.writeValueAsString(initialConfig);
+ _initialConfiguration = TestFileUtils.createTempFile(this, ".initial-config.json", config);
+ _brokerWork = TestFileUtils.createTestDirectory("qpid-work", true);
+ _initialSystemProperties = TestFileUtils.createTempFile(this, ".initial-system.properties",
+ INITIAL_SYSTEM_PROPERTY + "=" + INITIAL_SYSTEM_PROPERTY_VALUE
+ + "\nQPID_WORK=" + _brokerWork.getAbsolutePath() + "_test");
+ setTestSystemProperty("QPID_WORK", _brokerWork.getAbsolutePath());
+ }
+
+ public void tearDown() throws Exception
+ {
+ try
+ {
+ super.tearDown();
+ }
+ finally
+ {
+ if (_broker != null)
+ {
+ _broker.shutdown();
+ }
+ System.clearProperty(INITIAL_SYSTEM_PROPERTY);
+ FileUtils.delete(_brokerWork, true);
+ FileUtils.delete(_initialSystemProperties, false);
+ FileUtils.delete(_initialConfiguration, false);
+ }
+ }
+
+ public void testInitialSystemPropertiesAreSetOnBrokerStartup() throws Exception
+ {
+ BrokerOptions options = new BrokerOptions();
+ options.setInitialSystemProperties(_initialSystemProperties.getAbsolutePath());
+ options.setSkipLoggingConfiguration(true);
+ options.setStartupLoggedToSystemOut(true);
+ options.setInitialConfigurationLocation(_initialConfiguration.getAbsolutePath());
+ _broker = new Broker();
+ _broker.startup(options);
+
+ // test JVM system property should be set from initial system config file
+ assertEquals("Unexpected JVM system property", INITIAL_SYSTEM_PROPERTY_VALUE, System.getProperty(INITIAL_SYSTEM_PROPERTY));
+
+ // existing system property should not be overridden
+ assertEquals("Unexpected QPID_WORK system property", _brokerWork.getAbsolutePath(), System.getProperty("QPID_WORK"));
+ }
+}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/binding/BindingImplTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/binding/BindingImplTest.java
index 93fa9114fb..d004f16466 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/binding/BindingImplTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/binding/BindingImplTest.java
@@ -35,6 +35,8 @@ import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.Model;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.test.utils.QpidTestCase;
public class BindingImplTest extends QpidTestCase
@@ -57,7 +59,11 @@ public class BindingImplTest extends QpidTestCase
attributes.put(Binding.ARGUMENTS, arguments);
attributes.put(Binding.NAME, getTestName());
AMQQueue queue = mock(AMQQueue.class);
+ VirtualHostImpl vhost = mock(VirtualHostImpl.class);
+ SecurityManager securityManager = mock(SecurityManager.class);
+ when(vhost.getSecurityManager()).thenReturn(securityManager);
when(queue.getTaskExecutor()).thenReturn(_taskExecutor);
+ when(queue.getVirtualHost()).thenReturn(vhost);
when(queue.getModel()).thenReturn(_model);
ExchangeImpl exchange = mock(ExchangeImpl.class);
when(exchange.getTaskExecutor()).thenReturn(_taskExecutor);
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java
index 14ff640c57..f5a9217ef3 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java
@@ -57,6 +57,7 @@ public class StoreConfigurationChangeListenerTest extends QpidTestCase
notifyBrokerStarted();
UUID id = UUID.randomUUID();
ConfiguredObject object = mock(VirtualHost.class);
+ when(object.isDurable()).thenReturn(true);
when(object.getId()).thenReturn(id);
ConfiguredObjectRecord record = mock(ConfiguredObjectRecord.class);
when(object.asObjectRecord()).thenReturn(record);
@@ -69,11 +70,13 @@ public class StoreConfigurationChangeListenerTest extends QpidTestCase
notifyBrokerStarted();
Broker broker = mock(Broker.class);
when(broker.getCategoryClass()).thenReturn(Broker.class);
+ when(broker.isDurable()).thenReturn(true);
VirtualHost child = mock(VirtualHost.class);
when(child.getCategoryClass()).thenReturn(VirtualHost.class);
Model model = mock(Model.class);
when(model.getChildTypes(any(Class.class))).thenReturn(Collections.<Class<? extends ConfiguredObject>>emptyList());
when(child.getModel()).thenReturn(model);
+ when(child.isDurable()).thenReturn(true);
_listener.childAdded(broker, child);
verify(_store).update(eq(true), any(ConfiguredObjectRecord.class));
}
@@ -83,15 +86,17 @@ public class StoreConfigurationChangeListenerTest extends QpidTestCase
notifyBrokerStarted();
Broker broker = mock(Broker.class);
when(broker.getCategoryClass()).thenReturn(Broker.class);
+ when(broker.isDurable()).thenReturn(true);
_listener.attributeSet(broker, Broker.CONNECTION_SESSION_COUNT_LIMIT, null, 1);
verify(_store).update(eq(false),any(ConfiguredObjectRecord.class));
}
- public void testChildAddedForVirtualHostNode()
+ public void testChildAddedWhereParentManagesChildStorage()
{
notifyBrokerStarted();
VirtualHostNode<?> object = mock(VirtualHostNode.class);
+ when(object.managesChildStorage()).thenReturn(true);
VirtualHost<?,?,?> virtualHost = mock(VirtualHost.class);
_listener.childAdded(object, virtualHost);
verifyNoMoreInteractions(_store);
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
index ca440bc432..d625fcba75 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
@@ -38,7 +38,6 @@ import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.filter.MessageFilter;
-import org.apache.qpid.server.filter.SimpleFilterManager;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
@@ -103,16 +102,23 @@ public class MockConsumer implements ConsumerTarget
{
if(_messageIds != null)
{
- SimpleFilterManager filters = new SimpleFilterManager();
- filters.add(new MessageFilter()
+ FilterManager filters = new FilterManager();
+ MessageFilter filter = new MessageFilter()
{
@Override
+ public String getName()
+ {
+ return "";
+ }
+
+ @Override
public boolean matches(final Filterable message)
{
final String messageId = message.getMessageHeader().getMessageId();
return _messageIds.contains(messageId);
}
- });
+ };
+ filters.add(filter.getName(), filter);
return filters;
}
else
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
index 4485d5cc85..c21a386eaa 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
@@ -45,6 +45,7 @@ import org.mockito.stubbing.Answer;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.configuration.store.StoreConfigurationChangeListener;
import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.connection.IConnectionRegistry.RegistryChangeListener;
@@ -66,6 +67,7 @@ public class VirtualHostTest extends QpidTestCase
private VirtualHostNode<?> _virtualHostNode;
private DurableConfigurationStore _configStore;
private VirtualHost<?, ?, ?> _virtualHost;
+ private StoreConfigurationChangeListener _storeConfigurationChangeListener;
@Override
protected void setUp() throws Exception
@@ -79,9 +81,13 @@ public class VirtualHostTest extends QpidTestCase
when(_broker.getTaskExecutor()).thenReturn(_taskExecutor);
_virtualHostNode = mock(VirtualHostNode.class);
+ when(_virtualHostNode.isDurable()).thenReturn(true);
_configStore = mock(DurableConfigurationStore.class);
+ _storeConfigurationChangeListener = new StoreConfigurationChangeListener(_configStore);
+
when(_virtualHostNode.getConfigurationStore()).thenReturn(_configStore);
+
// Virtualhost needs the EventLogger from the SystemContext.
when(_virtualHostNode.getParent(Broker.class)).thenReturn(_broker);
@@ -122,7 +128,7 @@ public class VirtualHostTest extends QpidTestCase
assertEquals("Unexpected name", virtualHostName, virtualHost.getName());
assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState());
- verify(_configStore).create(matchesRecord(virtualHost.getId(), virtualHost.getType()));
+ verify(_configStore).update(eq(true),matchesRecord(virtualHost.getId(), virtualHost.getType()));
}
public void testDeleteVirtualHost()
@@ -170,7 +176,7 @@ public class VirtualHostTest extends QpidTestCase
virtualHost.start();
assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState());
- verify(_configStore, times(1)).create(matchesRecord(virtualHost.getId(), virtualHost.getType()));
+ verify(_configStore, times(1)).update(eq(true), matchesRecord(virtualHost.getId(), virtualHost.getType()));
verify(_configStore, times(2)).update(eq(false), matchesRecord(virtualHost.getId(), virtualHost.getType()));
}
@@ -293,7 +299,7 @@ public class VirtualHostTest extends QpidTestCase
assertNotNull(queue.getId());
assertEquals(queueName, queue.getName());
- verify(_configStore).create(matchesRecord(queue.getId(), queue.getType()));
+ verify(_configStore).update(eq(true),matchesRecord(queue.getId(), queue.getType()));
}
public void testCreateNonDurableQueue()
@@ -396,7 +402,10 @@ public class VirtualHostTest extends QpidTestCase
attributes.put(VirtualHost.TYPE, TestMemoryVirtualHost.VIRTUAL_HOST_TYPE);
TestMemoryVirtualHost host = new TestMemoryVirtualHost(attributes, _virtualHostNode);
+ host.addChangeListener(_storeConfigurationChangeListener);
host.create();
+ // Fire the child added event on the node
+ _storeConfigurationChangeListener.childAdded(_virtualHostNode,host);
_virtualHost = host;
return host;
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/singleton/AbstractConfiguredObjectTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/singleton/AbstractConfiguredObjectTest.java
index 46f205116a..081fbe4edf 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/singleton/AbstractConfiguredObjectTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/singleton/AbstractConfiguredObjectTest.java
@@ -22,14 +22,18 @@ import java.security.PrivilegedAction;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.security.auth.Subject;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.model.ConfigurationChangeListener;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Model;
+import org.apache.qpid.server.model.State;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -476,4 +480,84 @@ public class AbstractConfiguredObjectTest extends QpidTestCase
}
+
+ public void testAttributeSetListenerFiring()
+ {
+ final String objectName = "listenerFiring";
+
+ Map<String, Object> attributes = new HashMap<>();
+ attributes.put(ConfiguredObject.NAME, objectName);
+ attributes.put(TestSingleton.STRING_VALUE, "first");
+
+ final TestSingleton object = _model.getObjectFactory().create(TestSingleton.class, attributes);
+
+ final AtomicInteger listenerCount = new AtomicInteger();
+ final LinkedHashMap<String, String> updates = new LinkedHashMap<>();
+ object.addChangeListener(new NoopConfigurationChangeListener()
+ {
+ @Override
+ public void attributeSet(final ConfiguredObject<?> object,
+ final String attributeName,
+ final Object oldAttributeValue,
+ final Object newAttributeValue)
+ {
+ listenerCount.incrementAndGet();
+ String delta = String.valueOf(oldAttributeValue) + "=>" + String.valueOf(newAttributeValue);
+ updates.put(attributeName, delta);
+ }
+ });
+
+ // Set updated value (should cause listener to fire)
+ object.setAttributes(Collections.singletonMap(TestSingleton.STRING_VALUE, "second"));
+
+ assertEquals(1, listenerCount.get());
+ String delta = updates.remove(TestSingleton.STRING_VALUE);
+ assertEquals("first=>second", delta);
+
+ // Set unchanged value (should not cause listener to fire)
+ object.setAttributes(Collections.singletonMap(TestSingleton.STRING_VALUE, "second"));
+ assertEquals(1, listenerCount.get());
+
+ // Set value to null (should cause listener to fire)
+ object.setAttributes(Collections.singletonMap(TestSingleton.STRING_VALUE, null));
+ assertEquals(2, listenerCount.get());
+ delta = updates.remove(TestSingleton.STRING_VALUE);
+ assertEquals("second=>null", delta);
+
+ // Set to null again (should not cause listener to fire)
+ object.setAttributes(Collections.singletonMap(TestSingleton.STRING_VALUE, null));
+ assertEquals(2, listenerCount.get());
+
+ // Set updated value (should cause listener to fire)
+ object.setAttributes(Collections.singletonMap(TestSingleton.STRING_VALUE, "third"));
+ assertEquals(3, listenerCount.get());
+ delta = updates.remove(TestSingleton.STRING_VALUE);
+ assertEquals("null=>third", delta);
+ }
+
+ private static class NoopConfigurationChangeListener implements ConfigurationChangeListener
+ {
+ @Override
+ public void stateChanged(final ConfiguredObject<?> object, final State oldState, final State newState)
+ {
+ }
+
+ @Override
+ public void childAdded(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
+ {
+ }
+
+ @Override
+ public void childRemoved(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
+ {
+ }
+
+ @Override
+ public void attributeSet(final ConfiguredObject<?> object,
+ final String attributeName,
+ final Object oldAttributeValue,
+ final Object newAttributeValue)
+ {
+ }
+ }
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java
index 799fc71d74..2427470707 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java
@@ -93,6 +93,7 @@ public class LastValueQueueListTest extends TestCase
ServerMessage message = createTestServerMessage(null);
QueueEntry addedEntry = _list.add(message);
+ addedEntry.acquire();
addedEntry.delete();
int numberOfEntries = countEntries(_list);
@@ -113,6 +114,7 @@ public class LastValueQueueListTest extends TestCase
ServerMessage message = createTestServerMessage(TEST_KEY_VALUE);
QueueEntry addedEntry = _list.add(message);
+ addedEntry.acquire();
addedEntry.delete();
int numberOfEntries = countEntries(_list);
@@ -173,6 +175,7 @@ public class LastValueQueueListTest extends TestCase
assertEquals(1, countEntries(_list));
assertEquals(1, _list.getLatestValuesMap().size());
+ addedEntry.acquire();
addedEntry.delete();
assertEquals(0, countEntries(_list));
@@ -193,7 +196,9 @@ public class LastValueQueueListTest extends TestCase
assertEquals(2, countEntries(_list));
assertEquals(2, _list.getLatestValuesMap().size());
+ addedEntry1.acquire();
addedEntry1.delete();
+ addedEntry2.acquire();
addedEntry2.delete();
assertEquals(0, countEntries(_list));
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
index 40b6c1bebd..2101a2297f 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
@@ -113,6 +113,7 @@ public abstract class QueueEntryImplTestBase extends TestCase
*/
private void delete()
{
+ _queueEntry.acquire();
_queueEntry.delete();
assertTrue("Queue entry should be in DELETED state after invoking of delete method",
_queueEntry.isDeleted());
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
index a0ab7cd454..7f5ea06dcf 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
@@ -196,6 +196,7 @@ public abstract class QueueEntryListTestBase extends TestCase
final QueueEntry head = getTestList().getHead();
final QueueEntry first = getTestList().next(head);
+ first.acquire();
first.delete();
final QueueEntry second = getTestList().next(head);
@@ -226,6 +227,7 @@ public abstract class QueueEntryListTestBase extends TestCase
assertNull(list.next(queueEntry2));
//'delete' the 2nd QueueEntry
+ queueEntry2.acquire();
queueEntry2.delete();
assertTrue("Deleting node should have succeeded", queueEntry2.isDeleted());
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
index a2d314d629..a27db98400 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
@@ -109,6 +109,7 @@ public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase
public void testTraverseWithDeletedEntries()
{
// Delete 2nd queue entry
+ _queueEntry2.acquire();
_queueEntry2.delete();
assertTrue(_queueEntry2.isDeleted());
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
index d9a176c688..f88ce5f5f9 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
@@ -137,6 +137,7 @@ public class SortedQueueEntryTest extends QueueEntryImplTestBase
public void testTraverseWithDeletedEntries()
{
// Delete 2nd queue entry
+ _queueEntry3.acquire();
_queueEntry3.delete();
assertTrue(_queueEntry3.isDeleted());
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
index 28d22a5a44..73d14a843f 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
@@ -155,7 +155,7 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase
public void testScavenge() throws Exception
{
- OrderedQueueEntryList sqel = new StandardQueueEntryList(null);
+ OrderedQueueEntryList sqel = new StandardQueueEntryList(mock(StandardQueueImpl.class));
ConcurrentMap<Integer,QueueEntry> entriesMap = new ConcurrentHashMap<Integer,QueueEntry>();
@@ -215,6 +215,7 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase
{
QueueEntry entry = entriesMap.remove(pos);
boolean wasDeleted = entry.isDeleted();
+ entry.acquire();
entry.delete();
return entry.isDeleted() && !wasDeleted;
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/FileHelperTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/FileHelperTest.java
new file mode 100644
index 0000000000..9d47ed496a
--- /dev/null
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/FileHelperTest.java
@@ -0,0 +1,138 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.attribute.PosixFileAttributeView;
+import java.nio.file.attribute.PosixFilePermission;
+import java.nio.file.attribute.PosixFilePermissions;
+import java.util.Set;
+
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class FileHelperTest extends QpidTestCase
+{
+ private static final String TEST_FILE_PERMISSIONS = "rwxr-x---";
+ private File _testFile;
+ private FileHelper _fileHelper;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _testFile = new File(TMP_FOLDER, "test-" + System.currentTimeMillis());
+ _fileHelper = new FileHelper();
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ try
+ {
+ super.tearDown();
+ }
+ finally
+ {
+ Files.deleteIfExists(_testFile.toPath());
+ }
+ }
+
+ public void testCreateNewFile() throws Exception
+ {
+ assertFalse("File should not exist", _testFile.exists());
+ Path path = _fileHelper.createNewFile(_testFile, TEST_FILE_PERMISSIONS);
+ assertTrue("File was not created", path.toFile().exists());
+ if (Files.getFileStore(path).supportsFileAttributeView(PosixFileAttributeView.class))
+ {
+ assertPermissions(path);
+ }
+ }
+
+ public void testCreateNewFileUsingRelativePath() throws Exception
+ {
+ _testFile = new File("./tmp-" + System.currentTimeMillis());
+ assertFalse("File should not exist", _testFile.exists());
+ Path path = _fileHelper.createNewFile(_testFile, TEST_FILE_PERMISSIONS);
+ assertTrue("File was not created", path.toFile().exists());
+ if (Files.getFileStore(path).supportsFileAttributeView(PosixFileAttributeView.class))
+ {
+ assertPermissions(path);
+ }
+ }
+
+ public void testWriteFileSafely() throws Exception
+ {
+ Path path = _fileHelper.createNewFile(_testFile, TEST_FILE_PERMISSIONS);
+ _fileHelper.writeFileSafely(path, new BaseAction<File, IOException>()
+ {
+ @Override
+ public void performAction(File file) throws IOException
+ {
+ Files.write(file.toPath(), "test".getBytes("UTF8"));
+ assertEquals("Unexpected name", _testFile.getAbsolutePath() + ".tmp", file.getPath());
+ }
+ });
+
+ assertTrue("File was not created", path.toFile().exists());
+
+ if (Files.getFileStore(path).supportsFileAttributeView(PosixFileAttributeView.class))
+ {
+ assertPermissions(path);
+ }
+
+ String content = new String(Files.readAllBytes(path), "UTF-8");
+ assertEquals("Unexpected file content", "test", content);
+ }
+
+ public void testAtomicFileMoveOrReplace() throws Exception
+ {
+ Path path = _fileHelper.createNewFile(_testFile, TEST_FILE_PERMISSIONS);
+ Files.write(path, "test".getBytes("UTF8"));
+ _testFile = _fileHelper.atomicFileMoveOrReplace(path, path.resolveSibling(_testFile.getName() + ".target")).toFile();
+
+ assertFalse("File was not moved", path.toFile().exists());
+ assertTrue("Target file does not exist", _testFile.exists());
+
+ if (Files.getFileStore(_testFile.toPath()).supportsFileAttributeView(PosixFileAttributeView.class))
+ {
+ assertPermissions(_testFile.toPath());
+ }
+ }
+
+
+ private void assertPermissions(Path path) throws IOException
+ {
+ Set<PosixFilePermission> permissions = Files.getPosixFilePermissions(path);
+ assertTrue("Unexpected owner read permission", permissions.contains(PosixFilePermission.OWNER_READ));
+ assertTrue("Unexpected owner write permission", permissions.contains(PosixFilePermission.OWNER_WRITE));
+ assertTrue("Unexpected owner exec permission", permissions.contains(PosixFilePermission.OWNER_EXECUTE));
+ assertTrue("Unexpected group read permission", permissions.contains(PosixFilePermission.GROUP_READ));
+ assertFalse("Unexpected group write permission", permissions.contains(PosixFilePermission.GROUP_WRITE));
+ assertTrue("Unexpected group exec permission", permissions.contains(PosixFilePermission.GROUP_EXECUTE));
+ assertFalse("Unexpected others read permission", permissions.contains(PosixFilePermission.OTHERS_READ));
+ assertFalse("Unexpected others write permission", permissions.contains(PosixFilePermission.OTHERS_WRITE));
+ assertFalse("Unexpected others exec permission", permissions.contains(PosixFilePermission.OTHERS_EXECUTE));
+ }
+}