diff options
| author | Alex Rudyy <orudyy@apache.org> | 2013-02-14 16:34:53 +0000 |
|---|---|---|
| committer | Alex Rudyy <orudyy@apache.org> | 2013-02-14 16:34:53 +0000 |
| commit | 899dbba6fb34568de45498ac9f8d564b55783814 (patch) | |
| tree | 1aa10cab21623d7a793ef18dde53f366c3451afa | |
| parent | 803d2ad92d998511e7fb33f601a46beb5b8813f0 (diff) | |
| download | qpid-python-899dbba6fb34568de45498ac9f8d564b55783814.tar.gz | |
QPID-4390: Use TaskExecutor to perform changes to the configuration
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-config-qpid-4390@1446270 13f79535-47bb-0310-9956-ffa450edef68
31 files changed, 430 insertions, 213 deletions
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java index 17434f1b67..59dbc6e530 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java @@ -136,7 +136,7 @@ public class HttpManagement extends AbstractPluginAdapter public HttpManagement(UUID id, Broker broker, Map<String, Object> attributes) { - super(id, DEFAULTS, MapValueConverter.convert(attributes, ATTRIBUTE_TYPES)); + super(id, DEFAULTS, MapValueConverter.convert(attributes, ATTRIBUTE_TYPES), broker.getTaskExecutor()); _broker = broker; addParent(Broker.class, broker); } diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagement.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagement.java index 25a56f7a00..f22cce21b0 100644 --- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagement.java +++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagement.java @@ -95,7 +95,7 @@ public class JMXManagement extends AbstractPluginAdapter implements Configuratio public JMXManagement(UUID id, Broker broker, Map<String, Object> attributes) { - super(id, DEFAULTS, MapValueConverter.convert(attributes, ATTRIBUTE_TYPES)); + super(id, DEFAULTS, MapValueConverter.convert(attributes, ATTRIBUTE_TYPES), broker.getTaskExecutor()); _broker = broker; addParent(Broker.class, broker); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/startup/BrokerRecoverer.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/startup/BrokerRecoverer.java index 012f8a7133..8a15a48be7 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/startup/BrokerRecoverer.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/startup/BrokerRecoverer.java @@ -16,6 +16,7 @@ import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.adapter.AuthenticationProviderFactory; import org.apache.qpid.server.model.adapter.BrokerAdapter; import org.apache.qpid.server.model.adapter.PortFactory; +import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.security.group.GroupPrincipalAccessor; import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; @@ -28,10 +29,11 @@ public class BrokerRecoverer implements ConfiguredObjectRecoverer<Broker> private final RootMessageLogger _rootMessageLogger; private final AuthenticationProviderFactory _authenticationProviderFactory; private final PortFactory _portFactory; + private final TaskExecutor _taskExecutor; public BrokerRecoverer(AuthenticationProviderFactory authenticationProviderFactory, PortFactory portFactory, StatisticsGatherer statisticsGatherer, VirtualHostRegistry virtualHostRegistry, LogRecorder logRecorder, - RootMessageLogger rootMessageLogger) + RootMessageLogger rootMessageLogger, TaskExecutor taskExecutor) { _portFactory = portFactory; _authenticationProviderFactory = authenticationProviderFactory; @@ -39,13 +41,14 @@ public class BrokerRecoverer implements ConfiguredObjectRecoverer<Broker> _virtualHostRegistry = virtualHostRegistry; _logRecorder = logRecorder; _rootMessageLogger = rootMessageLogger; + _taskExecutor = taskExecutor; } @Override public Broker create(RecovererProvider recovererProvider, ConfigurationEntry entry, ConfiguredObject... parents) { BrokerAdapter broker = new BrokerAdapter(entry.getId(), entry.getAttributes(), _statisticsGatherer, _virtualHostRegistry, - _logRecorder, _rootMessageLogger, _authenticationProviderFactory, _portFactory); + _logRecorder, _rootMessageLogger, _authenticationProviderFactory, _portFactory, _taskExecutor); Map<String, Collection<ConfigurationEntry>> childEntries = entry.getChildren(); for (String type : childEntries.keySet()) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/startup/DefaultRecovererProvider.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/startup/DefaultRecovererProvider.java index 9879d5c238..15cb229d8a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/startup/DefaultRecovererProvider.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/startup/DefaultRecovererProvider.java @@ -34,6 +34,7 @@ import org.apache.qpid.server.model.TrustStore; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.adapter.AuthenticationProviderFactory; import org.apache.qpid.server.model.adapter.PortFactory; +import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.plugin.AuthenticationManagerFactory; import org.apache.qpid.server.plugin.GroupManagerFactory; import org.apache.qpid.server.plugin.PluginFactory; @@ -52,9 +53,10 @@ public class DefaultRecovererProvider implements RecovererProvider private final PortFactory _portFactory; private final QpidServiceLoader<GroupManagerFactory> _groupManagerServiceLoader; private final QpidServiceLoader<PluginFactory> _pluginFactoryServiceLoader; + private final TaskExecutor _taskExecutor; public DefaultRecovererProvider(StatisticsGatherer brokerStatisticsGatherer, VirtualHostRegistry virtualHostRegistry, - LogRecorder logRecorder, RootMessageLogger rootMessageLogger) + LogRecorder logRecorder, RootMessageLogger rootMessageLogger, TaskExecutor taskExecutor) { _authenticationProviderFactory = new AuthenticationProviderFactory(new QpidServiceLoader<AuthenticationManagerFactory>()); _portFactory = new PortFactory(); @@ -64,6 +66,7 @@ public class DefaultRecovererProvider implements RecovererProvider _rootMessageLogger = rootMessageLogger; _groupManagerServiceLoader = new QpidServiceLoader<GroupManagerFactory>(); _pluginFactoryServiceLoader = new QpidServiceLoader<PluginFactory>(); + _taskExecutor = taskExecutor; } @Override @@ -72,7 +75,7 @@ public class DefaultRecovererProvider implements RecovererProvider if (Broker.class.getSimpleName().equals(type)) { return new BrokerRecoverer(_authenticationProviderFactory, _portFactory, _brokerStatisticsGatherer, _virtualHostRegistry, - _logRecorder, _rootMessageLogger); + _logRecorder, _rootMessageLogger, _taskExecutor); } else if(VirtualHost.class.getSimpleName().equals(type)) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/startup/VirtualHostRecoverer.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/startup/VirtualHostRecoverer.java index 1a2932f333..4f863adfb5 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/startup/VirtualHostRecoverer.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/startup/VirtualHostRecoverer.java @@ -45,7 +45,7 @@ public class VirtualHostRecoverer implements ConfiguredObjectRecoverer<VirtualHo { Broker broker = RecovererHelper.verifyOnlyBrokerIsParent(parents); - return new VirtualHostAdapter(entry.getId(), entry.getAttributes(),broker, _brokerStatisticsGatherer); + return new VirtualHostAdapter(entry.getId(), entry.getAttributes(), broker, _brokerStatisticsGatherer, broker.getTaskExecutor()); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/updater/ChangeStateTask.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/updater/ChangeStateTask.java new file mode 100644 index 0000000000..b6de1e136a --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/updater/ChangeStateTask.java @@ -0,0 +1,67 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.configuration.updater; + +import java.util.concurrent.Callable; + +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.State; + +public final class ChangeStateTask implements Callable<State> +{ + private ConfiguredObject _object; + private State _expectedState; + private State _desiredState; + + public ChangeStateTask(ConfiguredObject object, State expectedState, State desiredState) + { + _object = object; + _expectedState = expectedState; + _desiredState = desiredState; + } + + public ConfiguredObject getObject() + { + return _object; + } + + public State getExpectedState() + { + return _expectedState; + } + + public State getDesiredState() + { + return _desiredState; + } + + @Override + public State call() + { + return _object.setDesiredState(_expectedState, _desiredState); + } + + @Override + public String toString() + { + return "ChangeStateTask [object=" + _object + ", expectedState=" + _expectedState + ", desiredState=" + _desiredState + "]"; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/updater/CreateChildTask.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/updater/CreateChildTask.java new file mode 100644 index 0000000000..d3a8f5b797 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/updater/CreateChildTask.java @@ -0,0 +1,78 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.configuration.updater; + +import java.util.Arrays; +import java.util.Map; +import java.util.concurrent.Callable; + +import org.apache.qpid.server.model.ConfiguredObject; + +public final class CreateChildTask implements Callable<ConfiguredObject> +{ + private ConfiguredObject _object; + private Class<? extends ConfiguredObject> _childClass; + private Map<String, Object> _attributes; + private ConfiguredObject[] _otherParents; + + public CreateChildTask(ConfiguredObject object, Class<? extends ConfiguredObject> childClass, Map<String, Object> attributes, + ConfiguredObject... otherParents) + { + _object = object; + _childClass = childClass; + _attributes = attributes; + _otherParents = otherParents; + } + + public ConfiguredObject getObject() + { + return _object; + } + + public Class<? extends ConfiguredObject> getChildClass() + { + return _childClass; + } + + public Map<String, Object> getAttributes() + { + return _attributes; + } + + public ConfiguredObject[] getOtherParents() + { + return _otherParents; + } + + @Override + public ConfiguredObject call() + { + return _object.createChild(_childClass, _attributes, _otherParents); + } + + @Override + public String toString() + { + return "CreateChildTask [object=" + _object + ", childClass=" + _childClass + ", attributes=" + _attributes + + ", otherParents=" + Arrays.toString(_otherParents) + "]"; + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/updater/SetAttributeTask.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/updater/SetAttributeTask.java new file mode 100644 index 0000000000..94649434e6 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/updater/SetAttributeTask.java @@ -0,0 +1,74 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.configuration.updater; + +import java.util.concurrent.Callable; + +import org.apache.qpid.server.model.ConfiguredObject; + +public final class SetAttributeTask implements Callable<Object> +{ + private ConfiguredObject _object; + private String _attributeName; + private Object _expectedValue; + private Object _desiredValue; + + public SetAttributeTask(ConfiguredObject object, String attributeName, Object expectedValue, Object desiredValue) + { + _object = object; + _attributeName = attributeName; + _expectedValue = expectedValue; + _desiredValue = desiredValue; + } + + public ConfiguredObject getObject() + { + return _object; + } + + public String getAttributeName() + { + return _attributeName; + } + + public Object getExpectedValue() + { + return _expectedValue; + } + + public Object getDesiredValue() + { + return _desiredValue; + } + + @Override + public Object call() + { + return _object.setAttribute(_attributeName, _expectedValue, _desiredValue); + } + + @Override + public String toString() + { + return "SetAttributeTask [object=" + _object + ", attributeName=" + _attributeName + ", expectedValue=" + _expectedValue + + ", desiredValue=" + _desiredValue + "]"; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Broker.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Broker.java index 717f702337..fbecf1965b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Broker.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Broker.java @@ -29,6 +29,7 @@ import java.util.Map; import org.apache.qpid.server.logging.LogRecorder; import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; @@ -190,4 +191,6 @@ public interface Broker extends ConfiguredObject KeyStore getDefaultKeyStore(); TrustStore getDefaultTrustStore(); + + TaskExecutor getTaskExecutor(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractAdapter.java index e1a2921154..2ecee2aee3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractAdapter.java @@ -31,6 +31,10 @@ import org.apache.qpid.server.model.ConfigurationChangeListener; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.IllegalStateTransitionException; import org.apache.qpid.server.model.State; +import org.apache.qpid.server.configuration.updater.ChangeStateTask; +import org.apache.qpid.server.configuration.updater.CreateChildTask; +import org.apache.qpid.server.configuration.updater.SetAttributeTask; +import org.apache.qpid.server.configuration.updater.TaskExecutor; abstract class AbstractAdapter implements ConfiguredObject { @@ -42,9 +46,11 @@ abstract class AbstractAdapter implements ConfiguredObject private final UUID _id; private final Map<String, Object> _defaultAttributes = new HashMap<String, Object>(); + private final TaskExecutor _taskExecutor; - protected AbstractAdapter(UUID id, Map<String, Object> defaults, Map<String, Object> attributes) + protected AbstractAdapter(UUID id, Map<String, Object> defaults, Map<String, Object> attributes, TaskExecutor taskExecutor) { + _taskExecutor = taskExecutor; _id = id; if (attributes != null) { @@ -63,14 +69,9 @@ abstract class AbstractAdapter implements ConfiguredObject } } - protected AbstractAdapter(UUID id) + protected AbstractAdapter(UUID id, TaskExecutor taskExecutor) { - this(id, null, null); - } - - protected AbstractAdapter(UUID id, Map<String, Object> defaults) - { - this(id, defaults, null); + this(id, null, null, taskExecutor); } public final UUID getId() @@ -87,9 +88,16 @@ abstract class AbstractAdapter implements ConfiguredObject public final State setDesiredState(final State currentState, final State desiredState) throws IllegalStateTransitionException, AccessControlException { - if (setState(currentState, desiredState)) + if (_taskExecutor.isTaskExecutorThread()) { - notifyStateChanged(currentState, desiredState); + if (setState(currentState, desiredState)) + { + notifyStateChanged(currentState, desiredState); + } + } + else + { + _taskExecutor.submitAndWait(new ChangeStateTask(this, currentState, desiredState)); } return getActualState(); } @@ -137,7 +145,6 @@ abstract class AbstractAdapter implements ConfiguredObject } } - protected void childAdded(ConfiguredObject child) { synchronized (_changeListeners) @@ -149,7 +156,6 @@ abstract class AbstractAdapter implements ConfiguredObject } } - protected void childRemoved(ConfiguredObject child) { synchronized (_changeListeners) @@ -161,7 +167,6 @@ abstract class AbstractAdapter implements ConfiguredObject } } - private final Object getDefaultAttribute(String name) { return _defaultAttributes.get(name); @@ -198,6 +203,19 @@ abstract class AbstractAdapter implements ConfiguredObject public Object setAttribute(final String name, final Object expected, final Object desired) throws IllegalStateException, AccessControlException, IllegalArgumentException { + if (_taskExecutor.isTaskExecutorThread()) + { + return changeAttribute(name, expected, desired); + } + else + { + _taskExecutor.submitAndWait(new SetAttributeTask(this, name, expected, desired)); + return getAttribute(name); + } + } + + protected Object changeAttribute(final String name, final Object expected, final Object desired) + { synchronized (_attributes) { Object currentValue = getAttribute(name); @@ -251,4 +269,35 @@ abstract class AbstractAdapter implements ConfiguredObject { return getClass().getSimpleName() + " [id=" + _id + "]"; } + + @SuppressWarnings("unchecked") + @Override + public <C extends ConfiguredObject> C createChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) + { + if (_taskExecutor.isTaskExecutorThread()) + { + C child = addChild(childClass, attributes, otherParents); + if (child != null) + { + childAdded(child); + } + return child; + } + else + { + return (C)_taskExecutor.submitAndWait(new CreateChildTask(this, childClass, attributes, otherParents)); + } + } + + protected <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) + { + throw new UnsupportedOperationException(); + } + + + protected TaskExecutor getTaskExecutor() + { + return _taskExecutor; + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractKeyStoreAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractKeyStoreAdapter.java index a479de9769..ebd98f915d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractKeyStoreAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractKeyStoreAdapter.java @@ -42,7 +42,7 @@ public abstract class AbstractKeyStoreAdapter extends AbstractAdapter protected AbstractKeyStoreAdapter(UUID id, Broker broker, Map<String, Object> attributes) { - super(id); + super(id, broker.getTaskExecutor()); addParent(Broker.class, broker); _name = MapValueConverter.getStringAttribute(TrustStore.NAME, attributes); _password = MapValueConverter.getStringAttribute(TrustStore.PASSWORD, attributes); @@ -185,14 +185,14 @@ public abstract class AbstractKeyStoreAdapter extends AbstractAdapter private void setMandatoryAttribute(String name, Map<String, Object> attributeValues) { - setAttribute(name, null, MapValueConverter.getStringAttribute(name, attributeValues)); + changeAttribute(name, null, MapValueConverter.getStringAttribute(name, attributeValues)); } private void setOptionalAttribute(String name, Map<String, Object> attributeValues) { if (attributeValues.get(name) != null) { - setAttribute(name, null, MapValueConverter.getStringAttribute(name, attributeValues)); + changeAttribute(name, null, MapValueConverter.getStringAttribute(name, attributeValues)); } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractPluginAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractPluginAdapter.java index 4ebe45e1ee..ed4af9881f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractPluginAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractPluginAdapter.java @@ -26,6 +26,7 @@ import java.util.Collections; import java.util.Map; import java.util.UUID; +import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Plugin; @@ -35,9 +36,9 @@ import org.apache.qpid.server.model.Statistics; public abstract class AbstractPluginAdapter extends AbstractAdapter implements Plugin { - protected AbstractPluginAdapter(UUID id, Map<String, Object> defaults, Map<String, Object> attributes) + protected AbstractPluginAdapter(UUID id, Map<String, Object> defaults, Map<String, Object> attributes, TaskExecutor taskExecutor) { - super(id, defaults, attributes); + super(id, defaults, attributes, taskExecutor); } @Override diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AuthenticationProviderAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AuthenticationProviderAdapter.java index f8b5b3016f..4cc76f4fd3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AuthenticationProviderAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AuthenticationProviderAdapter.java @@ -45,6 +45,7 @@ import org.apache.qpid.server.model.Statistics; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.model.User; import org.apache.qpid.server.model.VirtualHostAlias; +import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.access.Operation; import org.apache.qpid.server.security.auth.UsernamePrincipal; @@ -67,7 +68,7 @@ public abstract class AuthenticationProviderAdapter<T extends AuthenticationMana private AuthenticationProviderAdapter(UUID id, Broker broker, final T authManager, Map<String, Object> attributes) { - super(id, null, attributes); + super(id, null, attributes, broker.getTaskExecutor()); _authManager = authManager; _broker = broker; _type = authManager instanceof PrincipalDatabaseAuthenticationManager? PrincipalDatabaseAuthenticationManager.class.getSimpleName() : AuthenticationManager.class.getSimpleName() ; @@ -198,15 +199,6 @@ public abstract class AuthenticationProviderAdapter<T extends AuthenticationMana } @Override - public <C extends ConfiguredObject> C createChild(Class<C> childClass, - Map<String, Object> attributes, - ConfiguredObject... otherParents) - { - throw new IllegalArgumentException("This authentication provider does not support" + - " creating children of type: " + childClass); - } - - @Override public boolean setState(State currentState, State desiredState) throws IllegalStateTransitionException, AccessControlException { @@ -250,6 +242,14 @@ public abstract class AuthenticationProviderAdapter<T extends AuthenticationMana { super(id, broker,authManager, attributes); } + + @Override + public <C extends ConfiguredObject> C createChild(Class<C> childClass, + Map<String, Object> attributes, + ConfiguredObject... otherParents) + { + throw new UnsupportedOperationException(); + } } public static class PrincipalDatabaseAuthenticationManagerAdapter @@ -330,7 +330,7 @@ public abstract class AuthenticationProviderAdapter<T extends AuthenticationMana } @Override - public <C extends ConfiguredObject> C createChild(Class<C> childClass, + public <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) { @@ -343,7 +343,7 @@ public abstract class AuthenticationProviderAdapter<T extends AuthenticationMana if(createUser(username, password,null)) { @SuppressWarnings("unchecked") - C pricipalAdapter = (C) new PrincipalAdapter(p); + C pricipalAdapter = (C) new PrincipalAdapter(p, getTaskExecutor()); return pricipalAdapter; } else @@ -353,7 +353,7 @@ public abstract class AuthenticationProviderAdapter<T extends AuthenticationMana } } - return super.createChild(childClass, attributes, otherParents); + return super.addChild(childClass, attributes, otherParents); } @Override @@ -365,7 +365,7 @@ public abstract class AuthenticationProviderAdapter<T extends AuthenticationMana Collection<User> principals = new ArrayList<User>(users.size()); for(Principal user : users) { - principals.add(new PrincipalAdapter(user)); + principals.add(new PrincipalAdapter(user, getTaskExecutor())); } @SuppressWarnings("unchecked") Collection<C> unmodifiablePrincipals = (Collection<C>) Collections.unmodifiableCollection(principals); @@ -382,9 +382,9 @@ public abstract class AuthenticationProviderAdapter<T extends AuthenticationMana private final Principal _user; - public PrincipalAdapter(Principal user) + public PrincipalAdapter(Principal user, TaskExecutor taskExecutor) { - super(UUIDGenerator.generateUserUUID(PrincipalDatabaseAuthenticationManagerAdapter.this.getName(), user.getName())); + super(UUIDGenerator.generateUserUUID(PrincipalDatabaseAuthenticationManagerAdapter.this.getName(), user.getName()), taskExecutor); _user = user; } @@ -505,16 +505,14 @@ public abstract class AuthenticationProviderAdapter<T extends AuthenticationMana } @Override - public Object setAttribute(String name, Object expected, Object desired) + public Object changeAttribute(String name, Object expected, Object desired) throws IllegalStateException, AccessControlException, IllegalArgumentException { if(name.equals(PASSWORD)) { setPassword((String)desired); } - return super.setAttribute(name, - expected, - desired); + return super.changeAttribute(name, expected, desired); } @Override diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java index 6fa4d31fe5..eb2d0dd7e2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java @@ -48,7 +48,7 @@ final class BindingAdapter extends AbstractAdapter implements Binding ExchangeAdapter exchangeAdapter, QueueAdapter queueAdapter) { - super(binding.getId()); + super(binding.getId(), queueAdapter.getTaskExecutor()); _binding = binding; _exchange = exchangeAdapter; _queue = queueAdapter; @@ -206,13 +206,6 @@ final class BindingAdapter extends AbstractAdapter implements Binding } @Override - public Object setAttribute(final String name, final Object expected, final Object desired) - throws IllegalStateException, AccessControlException, IllegalArgumentException - { - return super.setAttribute(name, expected, desired); //TODO - } - - @Override public Collection<String> getAttributeNames() { return Binding.AVAILABLE_ATTRIBUTES; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java index a60e50d7c7..4adb3b0fdd 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java @@ -53,6 +53,7 @@ import org.apache.qpid.server.model.Statistics; import org.apache.qpid.server.model.TrustStore; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.security.group.FileGroupManager; import org.apache.qpid.server.security.group.GroupManager; import org.apache.qpid.server.security.group.GroupPrincipalAccessor; @@ -166,9 +167,9 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, Configurat public BrokerAdapter(UUID id, Map<String, Object> attributes, StatisticsGatherer statisticsGatherer, VirtualHostRegistry virtualHostRegistry, LogRecorder logRecorder, RootMessageLogger rootMessageLogger, AuthenticationProviderFactory authenticationProviderFactory, - PortFactory portFactory) + PortFactory portFactory, TaskExecutor taskExecutor) { - super(id, DEFAULTS, MapValueConverter.convert(attributes, ATTRIBUTE_TYPES)); + super(id, DEFAULTS, MapValueConverter.convert(attributes, ATTRIBUTE_TYPES), taskExecutor); _statisticsGatherer = statisticsGatherer; _virtualHostRegistry = virtualHostRegistry; _logRecorder = logRecorder; @@ -301,16 +302,9 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, Configurat throws AccessControlException, IllegalArgumentException { final VirtualHostAdapter virtualHostAdapter = new VirtualHostAdapter(UUID.randomUUID(), attributes, this, - _statisticsGatherer); - - synchronized (_vhostAdapters) - { - _vhostAdapters.put(virtualHostAdapter.getName(), virtualHostAdapter); - } - - virtualHostAdapter.setState(State.INITIALISING, State.ACTIVE); - childAdded(virtualHostAdapter); - + _statisticsGatherer, getTaskExecutor()); + addVirtualHost(virtualHostAdapter); + virtualHostAdapter.setDesiredState(State.INITIALISING, State.ACTIVE); return virtualHostAdapter; } @@ -416,7 +410,7 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, Configurat //TODO: ACL @SuppressWarnings("unchecked") @Override - public <C extends ConfiguredObject> C createChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) + public <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) { if(childClass == VirtualHost.class) { @@ -436,7 +430,7 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, Configurat } } - public void addPort(Port port) + private void addPort(Port port) { synchronized (_portAdapters) { @@ -454,7 +448,6 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, Configurat { Port port = _portFactory.createPort(UUID.randomUUID(), this, attributes); addPort(port); - childAdded(port); return port; } @@ -465,14 +458,13 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, Configurat AuthenticationProvider authenticationProvider = _authenticationProviderFactory.create(UUID.randomUUID(), this, attributes, groupPrincipalAccessor); addAuthenticationProvider(authenticationProvider); - childAdded(authenticationProvider); return authenticationProvider; } /** * @throws IllegalConfigurationException if an AuthenticationProvider with the same name already exists */ - public void addAuthenticationProvider(AuthenticationProvider authenticationProvider) + private void addAuthenticationProvider(AuthenticationProvider authenticationProvider) { String name = authenticationProvider.getName(); synchronized (_authenticationProviders) @@ -486,7 +478,7 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, Configurat authenticationProvider.addChangeListener(this); } - public void addGroupProvider(GroupProvider groupProvider) + private void addGroupProvider(GroupProvider groupProvider) { synchronized (_groupProviders) { @@ -505,7 +497,7 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, Configurat throw new UnsupportedOperationException("Not implemented yet!"); } - public void addKeyStore(KeyStore keyStore) + private void addKeyStore(KeyStore keyStore) { synchronized (_keyStores) { @@ -523,7 +515,7 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, Configurat throw new UnsupportedOperationException("Not implemented yet!"); } - public void addTrustStore(TrustStore trustStore) + private void addTrustStore(TrustStore trustStore) { synchronized (_trustStores) { @@ -624,13 +616,6 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, Configurat return super.getAttribute(name); } - @Override - public Object setAttribute(String name, Object expected, Object desired) - throws IllegalStateException, AccessControlException, IllegalArgumentException - { - return super.setAttribute(name, expected, desired); //TODO - Implement. - } - private boolean deletePort(Port portAdapter) { Port removedPort = null; @@ -651,7 +636,7 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, Configurat return removedAuthenticationProvider != null; } - public void addVirtualHost(VirtualHost virtualHost) + private void addVirtualHost(VirtualHost virtualHost) { synchronized (_vhostAdapters) { @@ -774,7 +759,7 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, Configurat // no-op } - public void addPlugin(ConfiguredObject plugin) + private void addPlugin(ConfiguredObject plugin) { synchronized(_plugins) { @@ -909,4 +894,9 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, Configurat return _trustStores.get(_defaultTrustStoreId); } + @Override + public TaskExecutor getTaskExecutor() + { + return super.getTaskExecutor(); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java index c160d65b5f..84f99e1f17 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java @@ -38,6 +38,7 @@ import org.apache.qpid.server.model.Session; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.Statistics; import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.stats.StatisticsGatherer; @@ -50,9 +51,9 @@ final class ConnectionAdapter extends AbstractAdapter implements Connection new HashMap<AMQSessionModel, SessionAdapter>(); private final Statistics _statistics; - public ConnectionAdapter(final AMQConnectionModel conn) + public ConnectionAdapter(final AMQConnectionModel conn, TaskExecutor taskExecutor) { - super(UUIDGenerator.generateRandomUUID()); + super(UUIDGenerator.generateRandomUUID(), taskExecutor); _connection = conn; _statistics = new ConnectionStatisticsAdapter(conn); } @@ -74,7 +75,7 @@ final class ConnectionAdapter extends AbstractAdapter implements Connection { if(!_sessionAdapters.containsKey(session)) { - _sessionAdapters.put(session, new SessionAdapter(session)); + _sessionAdapters.put(session, new SessionAdapter(session, getTaskExecutor())); } } return new ArrayList<Session>(_sessionAdapters.values()); @@ -199,52 +200,6 @@ final class ConnectionAdapter extends AbstractAdapter implements Connection } @Override - public Object setAttribute(String name, Object expected, Object desired) throws IllegalStateException, AccessControlException, IllegalArgumentException - { - if(name.equals(CLIENT_ID)) - { - - } - else if(name.equals(CLIENT_VERSION)) - { - - } - else if(name.equals(INCOMING)) - { - - } - else if(name.equals(LOCAL_ADDRESS)) - { - - } - else if(name.equals(PRINCIPAL)) - { - - } - else if(name.equals(PROPERTIES)) - { - - } - else if(name.equals(REMOTE_ADDRESS)) - { - - } - else if(name.equals(REMOTE_PROCESS_NAME)) - { - - } - else if(name.equals(REMOTE_PROCESS_PID)) - { - - } - else if(name.equals(SESSION_COUNT_LIMIT)) - { - - } - return super.setAttribute(name, expected, desired); - } - - @Override public Collection<String> getAttributeNames() { final HashSet<String> attrNames = new HashSet<String>(super.getAttributeNames()); @@ -270,7 +225,8 @@ final class ConnectionAdapter extends AbstractAdapter implements Connection } } - public <C extends ConfiguredObject> C createChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) + @Override + public <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) { if(childClass == Session.class) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java index 494e226dc4..e6d3fab2f8 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java @@ -45,7 +45,7 @@ public class ConsumerAdapter extends AbstractAdapter implements Consumer queueAdapter.getName(), subscription.getSessionModel().getConnectionModel().getRemoteAddressString(), String.valueOf(subscription.getSessionModel().getChannelId()), - subscription.getConsumerName())); + subscription.getConsumerName()), queueAdapter.getTaskExecutor()); _subscription = subscription; _queue = queueAdapter; _statistics = new ConsumerStatistics(); @@ -108,13 +108,6 @@ public class ConsumerAdapter extends AbstractAdapter implements Consumer } @Override - public Object setAttribute(final String name, final Object expected, final Object desired) - throws IllegalStateException, AccessControlException, IllegalArgumentException - { - return super.setAttribute(name, expected, desired); //TODO - } - - @Override public Object getAttribute(final String name) { if(ID.equals(name)) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java index b945180137..5d5f3f7378 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java @@ -56,7 +56,7 @@ final class ExchangeAdapter extends AbstractAdapter implements Exchange, org.apa public ExchangeAdapter(final VirtualHostAdapter virtualHostAdapter, final org.apache.qpid.server.exchange.Exchange exchange) { - super(exchange.getId()); + super(exchange.getId(), virtualHostAdapter.getTaskExecutor()); _statistics = new ExchangeStatistics(); _vhost = virtualHostAdapter; _exchange = exchange; @@ -256,7 +256,7 @@ final class ExchangeAdapter extends AbstractAdapter implements Exchange, org.apa } @Override - public <C extends ConfiguredObject> C createChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) + public <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) { if(childClass == org.apache.qpid.server.model.Binding.class) { @@ -368,13 +368,6 @@ final class ExchangeAdapter extends AbstractAdapter implements Exchange, org.apa } @Override - public Object setAttribute(String name, Object expected, Object desired) - throws IllegalStateException, AccessControlException, IllegalArgumentException - { - return super.setAttribute(name, expected, desired); //TODO - Implement - } - - @Override public Collection<String> getAttributeNames() { return AVAILABLE_ATTRIBUTES; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/GroupProviderAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/GroupProviderAdapter.java index ecb10202e9..0fa834bc28 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/GroupProviderAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/GroupProviderAdapter.java @@ -38,6 +38,7 @@ import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.Statistics; import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.security.access.Operation; import org.apache.qpid.server.security.group.GroupManager; import org.apache.qpid.server.security.SecurityManager; @@ -49,7 +50,7 @@ public class GroupProviderAdapter extends AbstractAdapter implements private final Broker _broker; public GroupProviderAdapter(UUID id, GroupManager groupManager, Broker broker) { - super(id); + super(id, broker.getTaskExecutor()); if (groupManager == null) { @@ -174,7 +175,7 @@ public class GroupProviderAdapter extends AbstractAdapter implements } @Override - public <C extends ConfiguredObject> C createChild(Class<C> childClass, + public <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) { if (childClass == Group.class) @@ -184,7 +185,7 @@ public class GroupProviderAdapter extends AbstractAdapter implements if (getSecurityManager().authoriseGroupOperation(Operation.CREATE, groupName)) { _groupManager.createGroup(groupName); - return (C) new GroupAdapter(groupName); + return (C) new GroupAdapter(groupName, getTaskExecutor()); } else { @@ -208,7 +209,7 @@ public class GroupProviderAdapter extends AbstractAdapter implements Collection<Group> principals = new ArrayList<Group>(groups.size()); for (Principal group : groups) { - principals.add(new GroupAdapter(group.getName())); + principals.add(new GroupAdapter(group.getName(), getTaskExecutor())); } return (Collection<C>) Collections .unmodifiableCollection(principals); @@ -228,9 +229,9 @@ public class GroupProviderAdapter extends AbstractAdapter implements { private final String _group; - public GroupAdapter(String group) + public GroupAdapter(String group, TaskExecutor taskExecutor) { - super(UUIDGenerator.generateGroupUUID(GroupProviderAdapter.this.getName(), group)); + super(UUIDGenerator.generateGroupUUID(GroupProviderAdapter.this.getName(), group), taskExecutor); _group = group; } @@ -312,7 +313,7 @@ public class GroupProviderAdapter extends AbstractAdapter implements Collection<GroupMember> members = new ArrayList<GroupMember>(); for (Principal principal : usersInGroup) { - members.add(new GroupMemberAdapter(principal.getName())); + members.add(new GroupMemberAdapter(principal.getName(), getTaskExecutor())); } return (Collection<C>) Collections .unmodifiableCollection(members); @@ -325,7 +326,7 @@ public class GroupProviderAdapter extends AbstractAdapter implements } @Override - public <C extends ConfiguredObject> C createChild(Class<C> childClass, + public <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) { @@ -336,7 +337,7 @@ public class GroupProviderAdapter extends AbstractAdapter implements if (getSecurityManager().authoriseGroupOperation(Operation.UPDATE, _group)) { _groupManager.addUserToGroup(memberName, _group); - return (C) new GroupMemberAdapter(memberName); + return (C) new GroupMemberAdapter(memberName, getTaskExecutor()); } else { @@ -371,14 +372,6 @@ public class GroupProviderAdapter extends AbstractAdapter implements } @Override - public Object setAttribute(String name, Object expected, Object desired) - throws IllegalStateException, AccessControlException, - IllegalArgumentException - { - return super.setAttribute(name, expected, desired); - } - - @Override protected boolean setState(State currentState, State desiredState) throws IllegalStateTransitionException, AccessControlException { @@ -403,9 +396,9 @@ public class GroupProviderAdapter extends AbstractAdapter implements { private String _memberName; - public GroupMemberAdapter(String memberName) + public GroupMemberAdapter(String memberName, TaskExecutor taskExecutor) { - super(UUIDGenerator.generateGroupMemberUUID(GroupProviderAdapter.this.getName(), _group, memberName)); + super(UUIDGenerator.generateGroupMemberUUID(GroupProviderAdapter.this.getName(), _group, memberName), taskExecutor); _memberName = memberName; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/KeyStoreAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/KeyStoreAdapter.java index 64601be926..113d895e62 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/KeyStoreAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/KeyStoreAdapter.java @@ -35,7 +35,7 @@ public class KeyStoreAdapter extends AbstractKeyStoreAdapter implements KeyStore super(id, broker, attributes); if (attributes.get(CERTIFICATE_ALIAS) != null) { - setAttribute(CERTIFICATE_ALIAS, null, attributes.get(CERTIFICATE_ALIAS)); + changeAttribute(CERTIFICATE_ALIAS, null, attributes.get(CERTIFICATE_ALIAS)); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/PortAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/PortAdapter.java index 090bb59835..cb166d220e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/PortAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/PortAdapter.java @@ -41,6 +41,7 @@ import org.apache.qpid.server.model.Statistics; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostAlias; +import org.apache.qpid.server.configuration.updater.TaskExecutor; public class PortAdapter extends AbstractAdapter implements Port { @@ -53,9 +54,9 @@ public class PortAdapter extends AbstractAdapter implements Port * protocols on the same port we need to introduce a special entity like * PortAceptor which will be responsible for port binding/unbinding */ - public PortAdapter(UUID id, Broker broker, Map<String, Object> attributes, Map<String, Object> defaults) + public PortAdapter(UUID id, Broker broker, Map<String, Object> attributes, Map<String, Object> defaults, TaskExecutor taskExecutor) { - super(id, defaults, attributes); + super(id, defaults, attributes, taskExecutor); _broker = broker; addParent(Broker.class, broker); @@ -262,13 +263,6 @@ public class PortAdapter extends AbstractAdapter implements Port } @Override - public Object setAttribute(String name, Object expected, Object desired) - throws IllegalStateException, AccessControlException, IllegalArgumentException - { - return super.setAttribute(name, expected, desired); - } - - @Override public boolean setState(State currentState, State desiredState) { if (desiredState == State.DELETED) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/PortFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/PortFactory.java index 6488405d64..42c43bd258 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/PortFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/PortFactory.java @@ -105,7 +105,7 @@ public class PortFactory defaults.put(Port.NEED_CLIENT_AUTH, DEFAULT_AMQP_NEED_CLIENT_AUTH); defaults.put(Port.RECEIVE_BUFFER_SIZE, DEFAULT_AMQP_RECEIVE_BUFFER_SIZE); defaults.put(Port.SEND_BUFFER_SIZE, DEFAULT_AMQP_SEND_BUFFER_SIZE); - port = new AmqpPortAdapter(id, broker, attributes, defaults); + port = new AmqpPortAdapter(id, broker, attributes, defaults, broker.getTaskExecutor()); } else { @@ -117,7 +117,7 @@ public class PortFactory } Protocol protocol = protocols.iterator().next(); defaults.put(Port.NAME, portValue + "-" + protocol.name()); - port = new PortAdapter(id, broker, attributes, defaults); + port = new PortAdapter(id, broker, attributes, defaults, broker.getTaskExecutor()); } return port; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java index dbb1f13134..d34db0f36e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java @@ -80,7 +80,7 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs public QueueAdapter(final VirtualHostAdapter virtualHostAdapter, final AMQQueue queue) { - super(queue.getId()); + super(queue.getId(), virtualHostAdapter.getTaskExecutor()); _vhost = virtualHostAdapter; addParent(org.apache.qpid.server.model.VirtualHost.class, virtualHostAdapter); @@ -207,7 +207,7 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs } @Override - public Object setAttribute(String name, Object expected, Object desired) throws IllegalStateException, AccessControlException, IllegalArgumentException + public Object changeAttribute(String name, Object expected, Object desired) throws IllegalStateException, AccessControlException, IllegalArgumentException { try { @@ -306,7 +306,7 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs return desired; } - return super.setAttribute(name, expected, desired); + return super.changeAttribute(name, expected, desired); } finally { @@ -510,7 +510,7 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs @Override - public <C extends ConfiguredObject> C createChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) + public <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) { if(childClass == org.apache.qpid.server.model.Binding.class) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java index 6807968f14..2fffdb32f8 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java @@ -34,6 +34,7 @@ import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.Statistics; import org.apache.qpid.server.model.Consumer; import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.protocol.AMQSessionModel; final class SessionAdapter extends AbstractAdapter implements Session @@ -44,9 +45,9 @@ final class SessionAdapter extends AbstractAdapter implements Session private AMQSessionModel _session; private SessionStatistics _statistics; - public SessionAdapter(final AMQSessionModel session) + public SessionAdapter(final AMQSessionModel session, TaskExecutor taskExecutor) { - super(UUIDGenerator.generateRandomUUID()); + super(UUIDGenerator.generateRandomUUID(), taskExecutor); _session = session; _statistics = new SessionStatistics(); } @@ -141,13 +142,6 @@ final class SessionAdapter extends AbstractAdapter implements Session return super.getAttribute(name); //TODO - Implement } - @Override - public Object setAttribute(String name, Object expected, Object desired) - throws IllegalStateException, AccessControlException, IllegalArgumentException - { - return super.setAttribute(name, expected, desired); //TODO - Implement - } - public Statistics getStatistics() { return _statistics; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java index 7e51f94a86..ce63825df2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java @@ -60,6 +60,7 @@ import org.apache.qpid.server.model.Statistics; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostAlias; +import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.queue.AMQQueue; @@ -104,9 +105,9 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual private final List<VirtualHostAlias> _aliases = new ArrayList<VirtualHostAlias>(); private StatisticsGatherer _brokerStatisticsGatherer; - public VirtualHostAdapter(UUID id, Map<String, Object> attributes, Broker broker, StatisticsGatherer brokerStatisticsGatherer) + public VirtualHostAdapter(UUID id, Map<String, Object> attributes, Broker broker, StatisticsGatherer brokerStatisticsGatherer, TaskExecutor taskExecutor) { - super(id, null, MapValueConverter.convert(attributes, ATTRIBUTE_TYPES)); + super(id, null, MapValueConverter.convert(attributes, ATTRIBUTE_TYPES), taskExecutor); validateAttributes(); _broker = broker; _brokerStatisticsGatherer = brokerStatisticsGatherer; @@ -472,7 +473,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual } @Override - public <C extends ConfiguredObject> C createChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) + public <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) { if(childClass == Exchange.class) { @@ -572,7 +573,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual { if(!_connectionAdapters.containsKey(connection)) { - adapter = new ConnectionAdapter(connection); + adapter = new ConnectionAdapter(connection, getTaskExecutor()); _connectionAdapters.put(connection, adapter); } @@ -847,13 +848,6 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual } @Override - public Object setAttribute(String name, Object expected, Object desired) - throws IllegalStateException, AccessControlException, IllegalArgumentException - { - return super.setAttribute(name, expected, desired); //TODO - Implement - } - - @Override public Collection<String> getAttributeNames() { return AVAILABLE_ATTRIBUTES; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAliasAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAliasAdapter.java index fc07556073..91b705b004 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAliasAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAliasAdapter.java @@ -43,7 +43,7 @@ public class VirtualHostAliasAdapter extends AbstractAdapter implements Virtual public VirtualHostAliasAdapter(VirtualHostAdapter virtualHostAdapter, Port port) { - super(UUIDGenerator.generateVhostAliasUUID(virtualHostAdapter.getName(), port.getName())); + super(UUIDGenerator.generateVhostAliasUUID(virtualHostAdapter.getName(), port.getName()), virtualHostAdapter.getTaskExecutor()); _vhost = virtualHostAdapter; _port = port; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java index 4bd8111f34..995951a462 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java @@ -47,6 +47,7 @@ import org.apache.qpid.server.logging.messages.VirtualHostMessages; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.State; +import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -75,6 +76,7 @@ public class ApplicationRegistry implements IApplicationRegistry private LogRecorder _logRecorder; private ConfigurationEntryStore _store; + private TaskExecutor _taskExecutor; protected void setRootMessageLogger(RootMessageLogger rootMessageLogger) { @@ -107,7 +109,10 @@ public class ApplicationRegistry implements IApplicationRegistry { logStartupMessages(CurrentActor.get()); - RecovererProvider provider = new DefaultRecovererProvider((StatisticsGatherer)this, _virtualHostRegistry, _logRecorder, _rootMessageLogger); + _taskExecutor = new TaskExecutor(); + _taskExecutor.start(); + + RecovererProvider provider = new DefaultRecovererProvider((StatisticsGatherer)this, _virtualHostRegistry, _logRecorder, _rootMessageLogger, _taskExecutor); ConfiguredObjectRecoverer<? extends ConfiguredObject> brokerRecoverer = provider.getRecoverer(Broker.class.getSimpleName()); _broker = (Broker) brokerRecoverer.create(provider, _store.getRootEntry()); @@ -250,6 +255,11 @@ public class ApplicationRegistry implements IApplicationRegistry //Shutdown virtualhosts close(_virtualHostRegistry); + if (_taskExecutor != null) + { + _taskExecutor.stop(); + } + CurrentActor.get().message(BrokerMessages.STOPPED()); _logRecorder.closeLogRecorder(); @@ -257,6 +267,10 @@ public class ApplicationRegistry implements IApplicationRegistry } finally { + if (_taskExecutor != null) + { + _taskExecutor.stopImmediately(); + } CurrentActor.remove(); } _store = null; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/AmqpPortAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/AmqpPortAdapter.java index 6959f6827d..235c766786 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/AmqpPortAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/AmqpPortAdapter.java @@ -43,6 +43,7 @@ import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.model.TrustStore; import org.apache.qpid.server.model.adapter.PortAdapter; +import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.protocol.AmqpProtocolVersion; import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory; import org.apache.qpid.ssl.SSLContextFactory; @@ -54,9 +55,9 @@ public class AmqpPortAdapter extends PortAdapter private final Broker _broker; private IncomingNetworkTransport _transport; - public AmqpPortAdapter(UUID id, Broker broker, Map<String, Object> attributes, Map<String, Object> defaultAttributes) + public AmqpPortAdapter(UUID id, Broker broker, Map<String, Object> attributes, Map<String, Object> defaultAttributes, TaskExecutor taskExecutor) { - super(id, broker, attributes, defaultAttributes); + super(id, broker, attributes, defaultAttributes, taskExecutor); _broker = broker; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java index eacc904b25..51824463a3 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java @@ -52,6 +52,7 @@ import org.apache.qpid.server.model.TrustStore; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.adapter.AuthenticationProviderFactory; import org.apache.qpid.server.model.adapter.PortFactory; +import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.security.group.GroupPrincipalAccessor; import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; @@ -72,7 +73,7 @@ public class BrokerRecovererTest extends TestCase super.setUp(); _brokerRecoverer = new BrokerRecoverer(mock(AuthenticationProviderFactory.class), mock(PortFactory.class), mock(StatisticsGatherer.class), - mock(VirtualHostRegistry.class), mock(LogRecorder.class), mock(RootMessageLogger.class)); + mock(VirtualHostRegistry.class), mock(LogRecorder.class), mock(RootMessageLogger.class), mock(TaskExecutor.class)); when(_brokerEntry.getId()).thenReturn(_brokerId); when(_brokerEntry.getChildren()).thenReturn(_brokerEntryChildren); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/startup/DefaultRecovererProviderTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/startup/DefaultRecovererProviderTest.java index 1a070a6cba..c427fc4a43 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/startup/DefaultRecovererProviderTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/startup/DefaultRecovererProviderTest.java @@ -32,6 +32,8 @@ import org.apache.qpid.server.model.GroupProvider; import org.apache.qpid.server.model.Plugin; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.configuration.updater.TaskExecutor; +import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; @@ -48,8 +50,9 @@ public class DefaultRecovererProviderTest extends TestCase VirtualHostRegistry virtualHostRegistry = mock(VirtualHostRegistry.class); LogRecorder logRecorder = mock(LogRecorder.class); RootMessageLogger rootMessageLogger = mock(RootMessageLogger.class); + TaskExecutor taskExecutor = mock(TaskExecutor.class); - DefaultRecovererProvider provider = new DefaultRecovererProvider(statisticsGatherer, virtualHostRegistry, logRecorder, rootMessageLogger); + DefaultRecovererProvider provider = new DefaultRecovererProvider(statisticsGatherer, virtualHostRegistry, logRecorder, rootMessageLogger, taskExecutor); for (String configuredObjectType : supportedTypes) { ConfiguredObjectRecoverer<?> recovever = provider.getRecoverer(configuredObjectType); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/model/BrokerShutdownTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/model/BrokerShutdownTest.java index 2b1711c5a2..db8dce9409 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/model/BrokerShutdownTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/model/BrokerShutdownTest.java @@ -33,6 +33,7 @@ import org.apache.qpid.server.model.AuthenticationProvider; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.State; +import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.plugin.AuthenticationManagerFactory; import org.apache.qpid.server.security.auth.manager.TestAuthenticationManagerFactory; import org.apache.qpid.server.stats.StatisticsGatherer; @@ -53,9 +54,9 @@ import java.util.UUID; */ public class BrokerShutdownTest extends QpidTestCase { - private Provider[] _defaultProviders; private Broker _broker; + private TaskExecutor _taskExecutor; @Override public void setUp() throws Exception @@ -65,11 +66,31 @@ public class BrokerShutdownTest extends QpidTestCase super.setUp(); + _taskExecutor = new TaskExecutor(); + _taskExecutor.start(); + // Startup the new broker and register the new providers _broker = startBroker(); } - private Broker startBroker() + @Override + public void tearDown() throws Exception + { + try + { + super.tearDown(); + } + finally + { + if (_taskExecutor != null) + { + _taskExecutor.stopImmediately(); + } + } + + } + + private Broker startBroker() throws Exception { // test store with only broker and authentication provider entries ConfigurationEntryStore store = new ConfigurationEntryStore() @@ -122,8 +143,9 @@ public class BrokerShutdownTest extends QpidTestCase RootMessageLogger rootMessageLogger = mock(RootMessageLogger.class); // recover the broker from the store - RecovererProvider provider = new DefaultRecovererProvider(statisticsGatherer, virtualHostRegistry, logRecorder, rootMessageLogger); + RecovererProvider provider = new DefaultRecovererProvider(statisticsGatherer, virtualHostRegistry, logRecorder, rootMessageLogger, _taskExecutor); ConfiguredObjectRecoverer<? extends ConfiguredObject> brokerRecoverer = provider.getRecoverer(Broker.class.getSimpleName()); + Broker broker = (Broker) brokerRecoverer.create(provider, store.getRootEntry()); // start broker |
