From 3d34526bc2fe159b265d5a7c58ffd687ba00c499 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Sun, 20 Apr 2014 01:32:50 +0000 Subject: QPID-5712 : [Java Broker] Remove VirtualHostRegistry and remove redundant maps from BrokerAdapter git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1588715 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/store/berkeleydb/VirtualHostTest.java | 9 +- .../apache/qpid/server/binding/BindingImpl.java | 35 +- .../updater/ChangeAttributesTask.java | 62 --- .../configuration/updater/ChangeStateTask.java | 67 --- .../configuration/updater/CreateChildTask.java | 78 --- .../configuration/updater/SetAttributeTask.java | 2 +- .../server/configuration/updater/TaskExecutor.java | 123 ++++- .../server/model/AbstractConfiguredObject.java | 136 +++-- .../java/org/apache/qpid/server/model/Broker.java | 6 - .../qpid/server/model/SystemContextImpl.java | 199 +++---- .../qpid/server/model/adapter/BrokerAdapter.java | 583 ++++++++------------- .../qpid/server/model/port/AbstractPort.java | 3 +- .../apache/qpid/server/model/port/AmqpPort.java | 3 + .../qpid/server/model/port/AmqpPortImpl.java | 13 + .../manager/ScramSHA1AuthenticationManager.java | 212 +++----- .../server/virtualhost/AbstractVirtualHost.java | 19 +- .../qpid/server/virtualhost/VirtualHostImpl.java | 2 - .../server/virtualhost/VirtualHostRegistry.java | 101 ---- .../startup/VirtualHostCreationTest.java | 16 +- .../configuration/updater/TaskExecutorTest.java | 34 +- .../apache/qpid/server/model/VirtualHostTest.java | 15 +- .../adapter/FileSystemPreferencesProviderTest.java | 9 +- .../apache/qpid/server/util/BrokerTestHelper.java | 18 +- .../qpid/server/virtualhost/MockVirtualHost.java | 6 - .../protocol/v0_10/ServerConnectionDelegate.java | 4 +- .../v0_8/handler/ConnectionOpenMethodHandler.java | 3 +- .../protocol/v0_8/state/AMQStateManager.java | 10 +- .../qpid/server/protocol/v1_0/Connection_1_0.java | 4 +- .../MultiVersionProtocolEngineFactoryTest.java | 11 - .../server/store/VirtualHostMessageStoreTest.java | 4 +- 30 files changed, 692 insertions(+), 1095 deletions(-) delete mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/ChangeAttributesTask.java delete mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/ChangeStateTask.java delete mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/CreateChildTask.java delete mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java (limited to 'qpid/java') diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java index 3e23df6d87..5ad113f827 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java @@ -59,6 +59,7 @@ public class VirtualHostTest extends QpidTestCase private File _bdbStorePath; private VirtualHost _host; private ConfigurationEntryStore _store; + private TaskExecutor _taskExecutor; @Override protected void setUp() throws Exception @@ -67,9 +68,9 @@ public class VirtualHostTest extends QpidTestCase _store = mock(ConfigurationEntryStore.class); _broker = BrokerTestHelper.createBrokerMock(); - TaskExecutor taslExecutor = mock(TaskExecutor.class); - when(taslExecutor.isTaskExecutorThread()).thenReturn(true); - when(_broker.getTaskExecutor()).thenReturn(taslExecutor); + _taskExecutor = new TaskExecutor(); + _taskExecutor.start(); + when(_broker.getTaskExecutor()).thenReturn(_taskExecutor); _statisticsGatherer = mock(StatisticsGatherer.class); @@ -89,6 +90,8 @@ public class VirtualHostTest extends QpidTestCase } finally { + _taskExecutor.stopImmediately(); + if (_bdbStorePath != null) { FileUtils.delete(_bdbStorePath, true); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java index c1a5f92717..76826bfc56 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java @@ -29,6 +29,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.exchange.AbstractExchange; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.logging.EventLogger; @@ -255,26 +256,22 @@ public class BindingImpl public void setArguments(final Map arguments) { - if(getTaskExecutor().isTaskExecutorThread()) - { - _arguments = arguments; - super.setAttribute(ARGUMENTS, getActualAttributes().get(ARGUMENTS), arguments); - if (isDurable()) - { - VirtualHostImpl vhost = (VirtualHostImpl) _exchange.getParent(VirtualHost.class); - vhost.getDurableConfigurationStore().update(true, asObjectRecord()); - } - } - else - { - getTaskExecutor().submitAndWait(new Runnable() - { - @Override - public void run() + runTask(new TaskExecutor.VoidTask() { - setArguments(arguments); + @Override + public void execute() + { + _arguments = arguments; + BindingImpl.super.setAttribute(ARGUMENTS, getActualAttributes().get(ARGUMENTS), arguments); + if (isDurable()) + { + VirtualHostImpl vhost = + (VirtualHostImpl) _exchange.getParent(VirtualHost.class); + vhost.getDurableConfigurationStore().update(true, asObjectRecord()); + } + } } - }); - } + ); + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/ChangeAttributesTask.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/ChangeAttributesTask.java deleted file mode 100644 index fb672635b4..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/ChangeAttributesTask.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.configuration.updater; - -import java.util.Map; -import java.util.concurrent.Callable; - -import org.apache.qpid.server.model.ConfiguredObject; - -public class ChangeAttributesTask implements TaskExecutor.Task -{ - private final Map _attributes; - private final ConfiguredObject _object; - - public ChangeAttributesTask(ConfiguredObject target, Map attributes) - { - super(); - _object = target; - _attributes = attributes; - } - - @Override - public Void call() - { - _object.setAttributes(_attributes); - return null; - } - - public Map getAttributes() - { - return _attributes; - } - - public ConfiguredObject getObject() - { - return _object; - } - - @Override - public String toString() - { - return "ChangeAttributesTask [object=" + _object + ", attributes=" + _attributes + "]"; - } -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/ChangeStateTask.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/ChangeStateTask.java deleted file mode 100644 index 42ce1d8c03..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/ChangeStateTask.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.configuration.updater; - -import java.util.concurrent.Callable; - -import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.model.State; - -public final class ChangeStateTask implements TaskExecutor.Task -{ - private ConfiguredObject _object; - private State _expectedState; - private State _desiredState; - - public ChangeStateTask(ConfiguredObject object, State expectedState, State desiredState) - { - _object = object; - _expectedState = expectedState; - _desiredState = desiredState; - } - - public ConfiguredObject getObject() - { - return _object; - } - - public State getExpectedState() - { - return _expectedState; - } - - public State getDesiredState() - { - return _desiredState; - } - - @Override - public State call() - { - return _object.setDesiredState(_expectedState, _desiredState); - } - - @Override - public String toString() - { - return "ChangeStateTask [object=" + _object + ", expectedState=" + _expectedState + ", desiredState=" + _desiredState + "]"; - } -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/CreateChildTask.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/CreateChildTask.java deleted file mode 100644 index 27bc6d73f6..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/CreateChildTask.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.configuration.updater; - -import java.util.Arrays; -import java.util.Map; -import java.util.concurrent.Callable; - -import org.apache.qpid.server.model.ConfiguredObject; - -public final class CreateChildTask implements TaskExecutor.Task -{ - private ConfiguredObject _object; - private Class _childClass; - private Map _attributes; - private ConfiguredObject[] _otherParents; - - public CreateChildTask(ConfiguredObject object, Class childClass, Map attributes, - ConfiguredObject... otherParents) - { - _object = object; - _childClass = childClass; - _attributes = attributes; - _otherParents = otherParents; - } - - public ConfiguredObject getObject() - { - return _object; - } - - public Class getChildClass() - { - return _childClass; - } - - public Map getAttributes() - { - return _attributes; - } - - public ConfiguredObject[] getOtherParents() - { - return _otherParents; - } - - @Override - public ConfiguredObject call() - { - return _object.createChild(_childClass, _attributes, _otherParents); - } - - @Override - public String toString() - { - return "CreateChildTask [object=" + _object + ", childClass=" + _childClass + ", attributes=" + _attributes - + ", otherParents=" + Arrays.toString(_otherParents) + "]"; - } - -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/SetAttributeTask.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/SetAttributeTask.java index f8d29c368b..6ad8b8187f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/SetAttributeTask.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/SetAttributeTask.java @@ -60,7 +60,7 @@ public final class SetAttributeTask implements TaskExecutor.Task } @Override - public Object call() + public Object execute() { return _object.setAttribute(_attributeName, _expectedValue, _desiredValue); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java index 4e3601efcc..29b03ff962 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java @@ -49,11 +49,27 @@ public class TaskExecutor private final AtomicReference _state; private volatile ExecutorService _executor; - public static interface Task extends Callable + public static interface Task { - X call(); + X execute(); } + public static interface VoidTask + { + void execute(); + } + + public static interface TaskWithException + { + X execute() throws E; + } + + public static interface VoidTaskWithException + { + void execute() throws E; + } + + public TaskExecutor() { _state = new AtomicReference(State.INITIALISING); @@ -142,20 +158,109 @@ public class TaskExecutor return future; } - public void submitAndWait(final Runnable task) throws CancellationException + public void run(final VoidTask task) throws CancellationException { - submitAndWait(new Task() + run(new Task() { @Override - public Void call() + public Void execute() { - task.run(); + task.execute(); return null; } }); } - public T submitAndWait(Task task) throws CancellationException + private static class ExceptionTaskWrapper implements Task + { + private final TaskWithException _underlying; + private E _exception; + + private ExceptionTaskWrapper(final TaskWithException underlying) + { + _underlying = underlying; + } + + + @Override + public T execute() + { + try + { + return _underlying.execute(); + } + catch (Exception e) + { + _exception = (E) e; + return null; + } + } + + E getException() + { + return _exception; + } + } + + + private static class ExceptionVoidTaskWrapper implements Task + { + private final VoidTaskWithException _underlying; + private E _exception; + + private ExceptionVoidTaskWrapper(final VoidTaskWithException underlying) + { + _underlying = underlying; + } + + + @Override + public Void execute() + { + try + { + _underlying.execute(); + + } + catch (Exception e) + { + _exception = (E) e; + } + return null; + } + + E getException() + { + return _exception; + } + } + + public T run(TaskWithException task) throws CancellationException, E + { + ExceptionTaskWrapper wrapper = new ExceptionTaskWrapper(task); + T result = run(wrapper); + if(wrapper.getException() != null) + { + throw wrapper.getException(); + } + else + { + return result; + } + } + + + public void run(VoidTaskWithException task) throws CancellationException, E + { + ExceptionVoidTaskWrapper wrapper = new ExceptionVoidTaskWrapper(task); + run(wrapper); + if(wrapper.getException() != null) + { + throw wrapper.getException(); + } + } + + public T run(Task task) throws CancellationException { try { @@ -207,7 +312,7 @@ public class TaskExecutor { LOGGER.debug("Performing task " + userTask); } - T result = userTask.call(); + T result = userTask.execute(); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Task " + userTask + " is performed successfully with result:" + result); @@ -215,7 +320,7 @@ public class TaskExecutor return result; } - private class CallableWrapper implements Task + private class CallableWrapper implements Callable { private Task _userTask; private Subject _contextSubject; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java index 03b19608af..6a9b6df36c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java @@ -48,10 +48,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import javax.security.auth.Subject; import org.apache.qpid.server.configuration.IllegalConfigurationException; -import org.apache.qpid.server.configuration.updater.ChangeAttributesTask; -import org.apache.qpid.server.configuration.updater.ChangeStateTask; -import org.apache.qpid.server.configuration.updater.CreateChildTask; -import org.apache.qpid.server.configuration.updater.SetAttributeTask; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; @@ -544,23 +540,25 @@ public abstract class AbstractConfiguredObject> im public final State setDesiredState(final State currentState, final State desiredState) throws IllegalStateTransitionException, AccessControlException { - if (_taskExecutor.isTaskExecutorThread()) - { - authoriseSetDesiredState(currentState, desiredState); - if (setState(currentState, desiredState)) - { - notifyStateChanged(currentState, desiredState); - return desiredState; - } - else - { - return getState(); - } - } - else - { - return _taskExecutor.submitAndWait(new ChangeStateTask(this, currentState, desiredState)); - } + + + return runTask(new TaskExecutor.Task() + { + @Override + public State execute() + { + authoriseSetDesiredState(currentState, desiredState); + if (setState(currentState, desiredState)) + { + notifyStateChanged(currentState, desiredState); + return desiredState; + } + else + { + return getState(); + } + } + }); } /** @@ -727,25 +725,25 @@ public abstract class AbstractConfiguredObject> im public Object setAttribute(final String name, final Object expected, final Object desired) throws IllegalStateException, AccessControlException, IllegalArgumentException { - if (_taskExecutor.isTaskExecutorThread()) + return _taskExecutor.run(new TaskExecutor.Task() { - authoriseSetAttributes(createProxyForValidation(Collections.singletonMap(name, desired)), - Collections.singleton(name)); - - if (changeAttribute(name, expected, desired)) - { - attributeSet(name, expected, desired); - return desired; - } - else + @Override + public Object execute() { - return getAttribute(name); + authoriseSetAttributes(createProxyForValidation(Collections.singletonMap(name, desired)), + Collections.singleton(name)); + + if (changeAttribute(name, expected, desired)) + { + attributeSet(name, expected, desired); + return desired; + } + else + { + return getAttribute(name); + } } - } - else - { - return _taskExecutor.submitAndWait(new SetAttributeTask(this, name, expected, desired)); - } + }); } protected boolean changeAttribute(final String name, final Object expected, final Object desired) @@ -864,22 +862,23 @@ public abstract class AbstractConfiguredObject> im @SuppressWarnings("unchecked") @Override - public C createChild(Class childClass, Map attributes, ConfiguredObject... otherParents) + public C createChild(final Class childClass, final Map attributes, + final ConfiguredObject... otherParents) { - if (_taskExecutor.isTaskExecutorThread()) - { - authoriseCreateChild(childClass, attributes, otherParents); - C child = addChild(childClass, attributes, otherParents); - if (child != null) + return _taskExecutor.run(new TaskExecutor.Task() { + + @Override + public C execute() { - childAdded(child); + authoriseCreateChild(childClass, attributes, otherParents); + C child = addChild(childClass, attributes, otherParents); + if (child != null) + { + childAdded(child); + } + return child; } - return child; - } - else - { - return (C)_taskExecutor.submitAndWait(new CreateChildTask(this, childClass, attributes, otherParents)); - } + }); } protected C addChild(Class childClass, Map attributes, ConfiguredObject... otherParents) @@ -973,18 +972,39 @@ public abstract class AbstractConfiguredObject> im return _taskExecutor; } + protected final C runTask(TaskExecutor.Task task) + { + return _taskExecutor.run(task); + } + + protected void runTask(TaskExecutor.VoidTask task) + { + _taskExecutor.run(task); + } + + protected final T runTask(TaskExecutor.TaskWithException task) throws E + { + return _taskExecutor.run(task); + } + + protected final void runTask(TaskExecutor.VoidTaskWithException task) throws E + { + _taskExecutor.run(task); + } + + @Override public void setAttributes(final Map attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException { - if (getTaskExecutor().isTaskExecutorThread()) + runTask(new TaskExecutor.VoidTask() { - authoriseSetAttributes(createProxyForValidation(attributes), attributes.keySet()); - changeAttributes(attributes); - } - else - { - getTaskExecutor().submitAndWait(new ChangeAttributesTask(this, attributes)); - } + @Override + public void execute() + { + authoriseSetAttributes(createProxyForValidation(attributes), attributes.keySet()); + changeAttributes(attributes); + } + }); } protected void authoriseSetAttributes(final ConfiguredObject proxyForValidation, diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java index dbf138b8a2..a634d25c14 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java @@ -30,7 +30,6 @@ import org.apache.qpid.server.logging.LogRecorder; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.stats.StatisticsGatherer; -import org.apache.qpid.server.virtualhost.VirtualHostRegistry; @ManagedObject( defaultType = "broker" ) public interface Broker> extends ConfiguredObject, EventLoggerProvider, StatisticsGatherer @@ -178,11 +177,6 @@ public interface Broker> extends ConfiguredObject, EventL Collection> getTrustStores(); - /* - * TODO: Remove this method. Eventually the broker will become a registry. - */ - VirtualHostRegistry getVirtualHostRegistry(); - TaskExecutor getTaskExecutor(); boolean isManagementMode(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContextImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContextImpl.java index 68ca0f76c6..3ead1699b1 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContextImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContextImpl.java @@ -84,121 +84,138 @@ public class SystemContextImpl extends AbstractConfiguredObject> resolvedObjects = new HashMap>(); - resolvedObjects.put(getId(), this); - - Collection recordsWithUnresolvedParents = new ArrayList(Arrays.asList(records)); - Collection> recordsWithUnresolvedDependencies = - new ArrayList>(); - - boolean updatesMade; - - do + runTask(new TaskExecutor.VoidTask() { - updatesMade = false; - Iterator iter = recordsWithUnresolvedParents.iterator(); - while (iter.hasNext()) + @Override + public void execute() { - ConfiguredObjectRecord record = iter.next(); - Collection> parents = new ArrayList>(); - boolean foundParents = true; - for (ConfiguredObjectRecord parent : record.getParents().values()) - { - if (!resolvedObjects.containsKey(parent.getId())) - { - foundParents = false; - break; - } - else - { - parents.add(resolvedObjects.get(parent.getId())); - } - } - if (foundParents) - { - iter.remove(); - UnresolvedConfiguredObject recovered = - factory.recover(record, parents.toArray(new ConfiguredObject[parents.size()])); - Collection> dependencies = - recovered.getUnresolvedDependencies(); - if (dependencies.isEmpty()) - { - updatesMade = true; - ConfiguredObject resolved = recovered.resolve(); - resolvedObjects.put(resolved.getId(), resolved); - } - else - { - recordsWithUnresolvedDependencies.add(recovered); - } - } - } - Iterator> unresolvedIter = - recordsWithUnresolvedDependencies.iterator(); + ConfiguredObjectFactory factory = getObjectFactory(); - while(unresolvedIter.hasNext()) - { - UnresolvedConfiguredObject unresolvedObject = unresolvedIter.next(); - Collection> dependencies = - new ArrayList>(unresolvedObject.getUnresolvedDependencies()); + Map> resolvedObjects = new HashMap>(); + resolvedObjects.put(getId(), SystemContextImpl.this); + + Collection recordsWithUnresolvedParents = + new ArrayList(Arrays.asList(records)); + Collection> recordsWithUnresolvedDependencies = + new ArrayList>(); - for(ConfiguredObjectDependency dependency : dependencies) + boolean updatesMade; + + do { - if(dependency instanceof ConfiguredObjectIdDependency) + updatesMade = false; + Iterator iter = recordsWithUnresolvedParents.iterator(); + while (iter.hasNext()) { - UUID id = ((ConfiguredObjectIdDependency)dependency).getId(); - if(resolvedObjects.containsKey(id)) + ConfiguredObjectRecord record = iter.next(); + Collection> parents = new ArrayList>(); + boolean foundParents = true; + for (ConfiguredObjectRecord parent : record.getParents().values()) + { + if (!resolvedObjects.containsKey(parent.getId())) + { + foundParents = false; + break; + } + else + { + parents.add(resolvedObjects.get(parent.getId())); + } + } + if (foundParents) { - dependency.resolve(resolvedObjects.get(id)); + iter.remove(); + UnresolvedConfiguredObject recovered = + factory.recover(record, parents.toArray(new ConfiguredObject[parents.size()])); + Collection> dependencies = + recovered.getUnresolvedDependencies(); + if (dependencies.isEmpty()) + { + updatesMade = true; + ConfiguredObject resolved = recovered.resolve(); + resolvedObjects.put(resolved.getId(), resolved); + } + else + { + recordsWithUnresolvedDependencies.add(recovered); + } } + } - else if(dependency instanceof ConfiguredObjectNameDependency) + + Iterator> unresolvedIter = + recordsWithUnresolvedDependencies.iterator(); + + while (unresolvedIter.hasNext()) { - ConfiguredObject dependentObject = null; - for(ConfiguredObject parent : unresolvedObject.getParents()) + UnresolvedConfiguredObject unresolvedObject = unresolvedIter.next(); + Collection> dependencies = + new ArrayList>(unresolvedObject.getUnresolvedDependencies()); + + for (ConfiguredObjectDependency dependency : dependencies) { - dependentObject = parent.findConfiguredObject(dependency.getCategoryClass(), ((ConfiguredObjectNameDependency)dependency).getName()); - if(dependentObject != null) + if (dependency instanceof ConfiguredObjectIdDependency) { - break; + UUID id = ((ConfiguredObjectIdDependency) dependency).getId(); + if (resolvedObjects.containsKey(id)) + { + dependency.resolve(resolvedObjects.get(id)); + } + } + else if (dependency instanceof ConfiguredObjectNameDependency) + { + ConfiguredObject dependentObject = null; + for (ConfiguredObject parent : unresolvedObject.getParents()) + { + dependentObject = parent.findConfiguredObject(dependency.getCategoryClass(), + ((ConfiguredObjectNameDependency) dependency) + .getName() + ); + if (dependentObject != null) + { + break; + } + } + if (dependentObject != null) + { + dependency.resolve(dependentObject); + } + } + else + { + throw new ServerScopedRuntimeException("Unknown dependency type " + + dependency.getClass() + .getSimpleName()); } } - if(dependentObject != null) + if (unresolvedObject.getUnresolvedDependencies().isEmpty()) { - dependency.resolve(dependentObject); + updatesMade = true; + unresolvedIter.remove(); + ConfiguredObject resolved = unresolvedObject.resolve(); + resolvedObjects.put(resolved.getId(), resolved); } } - else - { - throw new ServerScopedRuntimeException("Unknown dependency type " + dependency.getClass().getSimpleName()); - } + + } while (updatesMade && !(recordsWithUnresolvedDependencies.isEmpty() + && recordsWithUnresolvedParents.isEmpty())); + + if (!recordsWithUnresolvedDependencies.isEmpty()) + { + throw new IllegalArgumentException("Cannot resolve some objects: " + + recordsWithUnresolvedDependencies); } - if(unresolvedObject.getUnresolvedDependencies().isEmpty()) + if (!recordsWithUnresolvedParents.isEmpty()) { - updatesMade = true; - unresolvedIter.remove(); - ConfiguredObject resolved = unresolvedObject.resolve(); - resolvedObjects.put(resolved.getId(), resolved); + throw new IllegalArgumentException("Cannot resolve object because their parents cannot be found" + + recordsWithUnresolvedParents); } } - - } while(updatesMade && !(recordsWithUnresolvedDependencies.isEmpty() && recordsWithUnresolvedParents.isEmpty())); - - if(!recordsWithUnresolvedDependencies.isEmpty()) - { - throw new IllegalArgumentException("Cannot resolve some objects: " + recordsWithUnresolvedDependencies); - } - if(!recordsWithUnresolvedParents.isEmpty()) - { - throw new IllegalArgumentException("Cannot resolve object because their parents cannot be found" + recordsWithUnresolvedParents); - } + }); } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java index c9954467e4..28553412f7 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java @@ -24,9 +24,7 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.security.AccessControlException; import java.security.PrivilegedAction; -import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -59,7 +57,6 @@ import org.apache.qpid.server.security.auth.manager.SimpleAuthenticationManager; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.qpid.util.SystemUtils; public class BrokerAdapter extends AbstractConfiguredObject implements Broker, ConfigurationChangeListener, StatisticsGatherer, StatisticsGatherer.Source @@ -77,18 +74,10 @@ public class BrokerAdapter extends AbstractConfiguredObject imple private EventLogger _eventLogger; - private final VirtualHostRegistry _virtualHostRegistry; + //private final VirtualHostRegistry _virtualHostRegistry; private final LogRecorder _logRecorder; - private final Map> _vhostAdapters = new HashMap>(); - private final Map> _portAdapters = new HashMap>(); private final Map _stillInUsePortNumbers = new HashMap(); - private final Map> _authenticationProviders = new HashMap>(); - private final Map> _groupProviders = new HashMap>(); - private final Map> _plugins = new HashMap>(); - private final Map> _keyStores = new HashMap>(); - private final Map> _trustStores = new HashMap>(); - private final Map> _accessControlProviders = new HashMap>(); private final SecurityManager _securityManager; @@ -122,7 +111,7 @@ public class BrokerAdapter extends AbstractConfiguredObject imple parent.getTaskExecutor()); _objectFactory = parent.getObjectFactory(); - _virtualHostRegistry = new VirtualHostRegistry(parent.getEventLogger()); + //_virtualHostRegistry = new VirtualHostRegistry(parent.getEventLogger()); _logRecorder = parent.getLogRecorder(); _eventLogger = parent.getEventLogger(); @@ -220,7 +209,6 @@ public class BrokerAdapter extends AbstractConfiguredObject imple { _managementModeAuthenticationProvider.open(); } - _virtualHostRegistry.setDefaultVirtualHostName(getDefaultVirtualHost()); for(KeyStore keyStore : getChildren(KeyStore.class)) { @@ -375,26 +363,20 @@ public class BrokerAdapter extends AbstractConfiguredObject imple public Collection> getVirtualHosts() { - synchronized(_vhostAdapters) - { - return new ArrayList>(_vhostAdapters.values()); - } + Collection children = getChildren(VirtualHost.class); + return children; } public Collection> getPorts() { - synchronized (_portAdapters) - { - return new ArrayList>(_portAdapters.values()); - } + Collection children = getChildren(Port.class); + return children; } public Collection> getAuthenticationProviders() { - synchronized (_authenticationProviders) - { - return new ArrayList>(_authenticationProviders.values()); - } + Collection children = getChildren(AuthenticationProvider.class); + return children; } public AuthenticationProvider findAuthenticationProviderByName(String authenticationProviderName) @@ -403,40 +385,24 @@ public class BrokerAdapter extends AbstractConfiguredObject imple { return _managementModeAuthenticationProvider; } - Collection> providers = getAuthenticationProviders(); - for (AuthenticationProvider authenticationProvider : providers) - { - if (authenticationProvider.getName().equals(authenticationProviderName)) - { - return authenticationProvider; - } - } - return null; + return getChildByName(AuthenticationProvider.class, authenticationProviderName); } public KeyStore findKeyStoreByName(String keyStoreName) { - synchronized(_keyStores) - { - return _keyStores.get(keyStoreName); - } + return getChildByName(KeyStore.class, keyStoreName); } public TrustStore findTrustStoreByName(String trustStoreName) { - synchronized(_trustStores) - { - return _trustStores.get(trustStoreName); - } + return getChildByName(TrustStore.class, trustStoreName); } @Override public Collection> getGroupProviders() { - synchronized (_groupProviders) - { - return new ArrayList>(_groupProviders.values()); - } + Collection children = getChildren(GroupProvider.class); + return children; } private VirtualHost createVirtualHost(final Map attributes) @@ -462,10 +428,6 @@ public class BrokerAdapter extends AbstractConfiguredObject imple private boolean deleteVirtualHost(final VirtualHost vhost) throws AccessControlException, IllegalStateException { - synchronized (_vhostAdapters) - { - _vhostAdapters.remove(vhost.getName()); - } vhost.removeChangeListener(this); return true; } @@ -476,18 +438,6 @@ public class BrokerAdapter extends AbstractConfiguredObject imple return null; //TODO } - public long getTimeToLive() - { - return 0; - } - - public long setTimeToLive(final long expected, final long desired) - throws IllegalStateException, AccessControlException, IllegalArgumentException - { - throw new IllegalStateException(); - } - - @Override public long getBytesIn() { @@ -514,40 +464,48 @@ public class BrokerAdapter extends AbstractConfiguredObject imple @SuppressWarnings("unchecked") @Override - public C addChild(Class childClass, Map attributes, ConfiguredObject... otherParents) + public C addChild(final Class childClass, final Map attributes, final ConfiguredObject... otherParents) { - if(childClass == VirtualHost.class) - { - return (C) createVirtualHost(attributes); - } - else if(childClass == Port.class) + return runTask( new TaskExecutor.Task() { - return (C) createPort(attributes); - } - else if(childClass == AccessControlProvider.class) - { - return (C) createAccessControlProvider(attributes); - } - else if(childClass == AuthenticationProvider.class) - { - return (C) createAuthenticationProvider(attributes); - } - else if(childClass == KeyStore.class) - { - return (C) createKeyStore(attributes); - } - else if(childClass == TrustStore.class) - { - return (C) createTrustStore(attributes); - } - else if(childClass == GroupProvider.class) - { - return (C) createGroupProvider(attributes); - } - else - { - throw new IllegalArgumentException("Cannot create child of class " + childClass.getSimpleName()); - } + @Override + public C execute() + { + if (childClass == VirtualHost.class) + { + return (C) createVirtualHost(attributes); + } + else if (childClass == Port.class) + { + return (C) createPort(attributes); + } + else if (childClass == AccessControlProvider.class) + { + return (C) createAccessControlProvider(attributes); + } + else if (childClass == AuthenticationProvider.class) + { + return (C) createAuthenticationProvider(attributes); + } + else if (childClass == KeyStore.class) + { + return (C) createKeyStore(attributes); + } + else if (childClass == TrustStore.class) + { + return (C) createTrustStore(attributes); + } + else if (childClass == GroupProvider.class) + { + return (C) createGroupProvider(attributes); + } + else + { + throw new IllegalArgumentException("Cannot create child of class " + childClass.getSimpleName()); + } + } + }); + } /** @@ -569,101 +527,77 @@ public class BrokerAdapter extends AbstractConfiguredObject imple return port; } - private void addPort(Port port) + private void addPort(final Port port) { - synchronized (_portAdapters) - { - int portNumber = port.getPort(); - String portName = port.getName(); - UUID portId = port.getId(); + assert getTaskExecutor().isTaskExecutorThread(); - for(Port p : _portAdapters.values()) - { - if(portNumber == p.getPort()) - { - throw new IllegalConfigurationException("Can't add port " + portName + " because port number " + portNumber + " is already configured for port " + p.getName()); - } - - if(portName.equals(p.getName())) - { - throw new IllegalConfigurationException("Can't add Port because one with name " + portName + " already exists"); - } + int portNumber = port.getPort(); + String portName = port.getName(); - if(portId.equals(p.getId())) + for (Port p : getChildren(Port.class)) + { + if(p != port) + { + if (portNumber == p.getPort()) { - throw new IllegalConfigurationException("Can't add Port because one with id " + portId + " already exists"); + throw new IllegalConfigurationException("Can't add port " + + portName + + " because port number " + + portNumber + + " is already configured for port " + + p.getName()); } } - - _portAdapters.put(port.getId(), port); } + port.addChangeListener(this); + } - private AccessControlProvider createAccessControlProvider(Map attributes) + private AccessControlProvider createAccessControlProvider(final Map attributes) { - AccessControlProvider accessControlProvider; - synchronized (_accessControlProviders) - { - accessControlProvider = (AccessControlProvider) createChild(AccessControlProvider.class, attributes); - addAccessControlProvider(accessControlProvider); - } + assert getTaskExecutor().isTaskExecutorThread(); + + AccessControlProvider accessControlProvider = (AccessControlProvider) createChild(AccessControlProvider.class, attributes); + addAccessControlProvider(accessControlProvider); - boolean quiesce = isManagementMode() ; + boolean quiesce = isManagementMode(); accessControlProvider.setDesiredState(State.INITIALISING, quiesce ? State.QUIESCED : State.ACTIVE); return accessControlProvider; + } - /** - * @throws IllegalConfigurationException if an AuthenticationProvider with the same name already exists - */ - private void addAccessControlProvider(AccessControlProvider accessControlProvider) + private void addAccessControlProvider(final AccessControlProvider accessControlProvider) { - String name = accessControlProvider.getName(); - synchronized (_accessControlProviders) - { - if (_accessControlProviders.containsKey(accessControlProvider.getId())) - { - throw new IllegalConfigurationException("Can't add AccessControlProvider because one with id " + accessControlProvider.getId() + " already exists"); - } - for (AccessControlProvider provider : _accessControlProviders.values()) - { - if (provider.getName().equals(name)) - { - throw new IllegalConfigurationException("Can't add AccessControlProvider because one with name " + name + " already exists"); - } - } - _accessControlProviders.put(accessControlProvider.getId(), accessControlProvider); - } + assert getTaskExecutor().isTaskExecutorThread(); accessControlProvider.addChangeListener(this); accessControlProvider.addChangeListener(_securityManager); + } private boolean deleteAccessControlProvider(AccessControlProvider accessControlProvider) { - AccessControlProvider removedAccessControlProvider; - synchronized (_accessControlProviders) - { - removedAccessControlProvider = _accessControlProviders.remove(accessControlProvider.getId()); - } - - if(removedAccessControlProvider != null) - { - removedAccessControlProvider.removeChangeListener(this); - removedAccessControlProvider.removeChangeListener(_securityManager); - } + accessControlProvider.removeChangeListener(this); + accessControlProvider.removeChangeListener(_securityManager); - return removedAccessControlProvider != null; + return true; } - private AuthenticationProvider createAuthenticationProvider(Map attributes) + private AuthenticationProvider createAuthenticationProvider(final Map attributes) { - AuthenticationProvider authenticationProvider = createChild(AuthenticationProvider.class, attributes); - addAuthenticationProvider(authenticationProvider); - authenticationProvider.setDesiredState(State.INITIALISING, State.ACTIVE); - return authenticationProvider; + return runTask(new TaskExecutor.Task() + { + @Override + public AuthenticationProvider execute() + { + AuthenticationProvider authenticationProvider = createChild(AuthenticationProvider.class, attributes); + addAuthenticationProvider(authenticationProvider); + authenticationProvider.setDesiredState(State.INITIALISING, State.ACTIVE); + return authenticationProvider; + } + }); } private X createChild(Class clazz, Map attributes) @@ -685,61 +619,35 @@ public class BrokerAdapter extends AbstractConfiguredObject imple */ private void addAuthenticationProvider(AuthenticationProvider authenticationProvider) { - String name = authenticationProvider.getName(); - synchronized (_authenticationProviders) - { - if (_authenticationProviders.containsKey(authenticationProvider.getId())) - { - throw new IllegalConfigurationException("Cannot add AuthenticationProvider because one with id " + authenticationProvider.getId() + " already exists"); - } - for (AuthenticationProvider provider : _authenticationProviders.values()) - { - if (provider.getName().equals(name)) - { - throw new IllegalConfigurationException("Cannot add AuthenticationProvider because one with name " + name + " already exists"); - } - } - _authenticationProviders.put(authenticationProvider.getId(), authenticationProvider); - } + assert getTaskExecutor().isTaskExecutorThread(); + authenticationProvider.addChangeListener(this); } - private GroupProvider createGroupProvider(Map attributes) + private GroupProvider createGroupProvider(final Map attributes) { - GroupProvider groupProvider = createChild(GroupProvider.class, attributes); - addGroupProvider(groupProvider); - groupProvider.setDesiredState(State.INITIALISING, State.ACTIVE); - return groupProvider; + return runTask(new TaskExecutor.Task>() + { + @Override + public GroupProvider execute() + { + GroupProvider groupProvider = createChild(GroupProvider.class, attributes); + addGroupProvider(groupProvider); + groupProvider.setDesiredState(State.INITIALISING, State.ACTIVE); + return groupProvider; + } + }); } private void addGroupProvider(GroupProvider groupProvider) { - synchronized (_groupProviders) - { - String name = groupProvider.getName(); - if(_groupProviders.containsKey(name)) - { - throw new IllegalConfigurationException("Cannot add GroupProvider because one with name " + name + " already exists"); - } - _groupProviders.put(name, groupProvider); - } groupProvider.addChangeListener(this); } private boolean deleteGroupProvider(GroupProvider groupProvider) { - GroupProvider removedGroupProvider = null; - synchronized (_groupProviders) - { - removedGroupProvider = _groupProviders.remove(groupProvider.getName()); - } - - if(removedGroupProvider != null) - { - removedGroupProvider.removeChangeListener(this); - } - - return removedGroupProvider != null; + groupProvider.removeChangeListener(this); + return true; } private KeyStore createKeyStore(Map attributes) @@ -760,58 +668,25 @@ public class BrokerAdapter extends AbstractConfiguredObject imple private void addKeyStore(KeyStore keyStore) { - synchronized (_keyStores) - { - if(_keyStores.containsKey(keyStore.getName())) - { - throw new IllegalConfigurationException("Can't add KeyStore because one with name " + keyStore.getName() + " already exists"); - } - _keyStores.put(keyStore.getName(), keyStore); - } keyStore.addChangeListener(this); } - private boolean deleteKeyStore(KeyStore object) + private boolean deleteKeyStore(KeyStore keyStore) { - synchronized(_keyStores) - { - String name = object.getName(); - KeyStore removedKeyStore = _keyStores.remove(name); - if(removedKeyStore != null) - { - removedKeyStore.removeChangeListener(this); - } - - return removedKeyStore != null; - } + keyStore.removeChangeListener(this); + return true; } private void addTrustStore(TrustStore trustStore) { - synchronized (_trustStores) - { - if(_trustStores.containsKey(trustStore.getName())) - { - throw new IllegalConfigurationException("Can't add TrustStore because one with name " + trustStore.getName() + " already exists"); - } - _trustStores.put(trustStore.getName(), trustStore); - } trustStore.addChangeListener(this); } - private boolean deleteTrustStore(TrustStore object) + private boolean deleteTrustStore(TrustStore trustStore) { - synchronized(_trustStores) - { - String name = object.getName(); - TrustStore removedTrustStore = _trustStores.remove(name); - if(removedTrustStore != null) - { - removedTrustStore.removeChangeListener(this); - } + trustStore.removeChangeListener(this); + return true; - return removedTrustStore != null; - } } @Override @@ -824,56 +699,36 @@ public class BrokerAdapter extends AbstractConfiguredObject imple return super.getAttribute(name); } - private boolean deletePort(State oldState, Port portAdapter) + private boolean deletePort(State oldState, Port port) { - Port removedPort; - synchronized (_portAdapters) - { - removedPort = _portAdapters.remove(portAdapter.getId()); - } + port.removeChangeListener(this); - if (removedPort != null) - { - removedPort.removeChangeListener(this); + // TODO - this seems suspicious, wouldn't it make more sense to not allow deletion from active + // (must be stopped first) or something? - if(oldState == State.ACTIVE) - { - //Record the originally used port numbers of previously-active ports being deleted, to ensure - //when creating new ports we don't try to re-bind a port number that we are currently still using - recordPreviouslyUsedPortNumberIfNecessary(removedPort, removedPort.getPort()); - } + if(oldState == State.ACTIVE) + { + //Record the originally used port numbers of previously-active ports being deleted, to ensure + //when creating new ports we don't try to re-bind a port number that we are currently still using + recordPreviouslyUsedPortNumberIfNecessary(port, port.getPort()); } - return removedPort != null; + + return port != null; } private boolean deleteAuthenticationProvider(AuthenticationProvider authenticationProvider) { - AuthenticationProvider removedAuthenticationProvider; - synchronized (_authenticationProviders) - { - removedAuthenticationProvider = _authenticationProviders.remove(authenticationProvider.getId()); - } - - if(removedAuthenticationProvider != null) + if(authenticationProvider != null) { - removedAuthenticationProvider.removeChangeListener(this); + authenticationProvider.removeChangeListener(this); } - - return removedAuthenticationProvider != null; + return true; } private void addVirtualHost(VirtualHost virtualHost) { - synchronized (_vhostAdapters) - { - String name = virtualHost.getName(); - if (_vhostAdapters.containsKey(name)) - { - throw new IllegalConfigurationException("Virtual host with name " + name + " is already specified!"); - } - _vhostAdapters.put(name, virtualHost); - } + virtualHost.addChangeListener(this); } @@ -883,16 +738,7 @@ public class BrokerAdapter extends AbstractConfiguredObject imple if (desiredState == State.ACTIVE) { initialiseStatisticsReporting(); - changeState(_groupProviders, currentState, State.ACTIVE, false); - changeState(_authenticationProviders, currentState, State.ACTIVE, false); - changeState(_accessControlProviders, currentState, State.ACTIVE, false); - - - changeState(_vhostAdapters, currentState, State.ACTIVE, false); - - changeState(_portAdapters, currentState,State.ACTIVE, false); - changeState(_plugins, currentState,State.ACTIVE, false); - + changeChildState(currentState, State.ACTIVE, false); if (isManagementMode()) { _eventLogger.message(BrokerMessages.MANAGEMENT_MODE(BrokerOptions.MANAGEMENT_MODE_USER_NAME, @@ -908,49 +754,53 @@ public class BrokerAdapter extends AbstractConfiguredObject imple _reportingTimer.cancel(); } - changeState(_plugins, currentState,State.STOPPED, true); - changeState(_portAdapters, currentState, State.STOPPED, true); - changeState(_vhostAdapters,currentState, State.STOPPED, true); - changeState(_authenticationProviders, currentState, State.STOPPED, true); - changeState(_groupProviders, currentState, State.STOPPED, true); - _virtualHostRegistry.close(); + changeChildState(currentState, State.STOPPED, true); return true; } return false; } - private void changeState(Map configuredObjectMap, State currentState, State desiredState, boolean swallowException) + private void changeChildState(final State currentState, + final State desiredState, + final boolean swallowException) { - synchronized(configuredObjectMap) + runTask(new TaskExecutor.VoidTask() { - Collection adapters = configuredObjectMap.values(); - for (ConfiguredObject configuredObject : adapters) + @Override + public void execute() { - if (State.ACTIVE.equals(desiredState) && State.QUIESCED.equals(configuredObject.getState())) + for (Class clazz : Model.getInstance().getChildTypes(getCategoryClass())) { - if (LOGGER.isDebugEnabled()) + for (ConfiguredObject configuredObject : getChildren(clazz)) { - LOGGER.debug(configuredObject + " cannot be activated as it is " +State.QUIESCED); - } - continue; - } - try - { - configuredObject.setDesiredState(currentState, desiredState); - } - catch(RuntimeException e) - { - if (swallowException) - { - LOGGER.error("Failed to stop " + configuredObject, e); - } - else - { - throw e; + if (State.ACTIVE.equals(desiredState) && State.QUIESCED.equals(configuredObject.getState())) + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug(configuredObject + " cannot be activated as it is " + State.QUIESCED); + } + continue; + } + try + { + configuredObject.setDesiredState(currentState, desiredState); + } + catch (RuntimeException e) + { + if (swallowException) + { + LOGGER.error("Failed to stop " + configuredObject, e); + } + else + { + throw e; + } + } } } } - } + }); + } @Override @@ -1023,24 +873,14 @@ public class BrokerAdapter extends AbstractConfiguredObject imple private void addPlugin(ConfiguredObject plugin) { - synchronized(_plugins) - { - if (_plugins.containsKey(plugin.getId())) - { - throw new IllegalConfigurationException("Plugin with id '" + plugin.getId() + "' is already registered!"); - } - _plugins.put(plugin.getId(), plugin); - } plugin.addChangeListener(this); } private Collection> getPlugins() { - synchronized(_plugins) - { - return Collections.unmodifiableCollection(_plugins.values()); - } + Collection children = getChildren(Plugin.class); + return children; } @Override @@ -1058,7 +898,7 @@ public class BrokerAdapter extends AbstractConfiguredObject imple @Override public VirtualHost findVirtualHostByName(String name) { - return _vhostAdapters.get(name); + return getChildByName(VirtualHost.class, name); } @Override @@ -1094,31 +934,15 @@ public class BrokerAdapter extends AbstractConfiguredObject imple @Override public Collection> getKeyStores() { - synchronized(_keyStores) - { - return Collections.unmodifiableCollection(_keyStores.values()); - } + Collection children = getChildren(KeyStore.class); + return children; } @Override public Collection> getTrustStores() { - synchronized(_trustStores) - { - return Collections.unmodifiableCollection(_trustStores.values()); - } - } - - @Override - public VirtualHostRegistry getVirtualHostRegistry() - { - return _virtualHostRegistry; - } - - @Override - public TaskExecutor getTaskExecutor() - { - return super.getTaskExecutor(); + Collection children = getChildren(TrustStore.class); + return children; } @Override @@ -1149,10 +973,8 @@ public class BrokerAdapter extends AbstractConfiguredObject imple @Override public Collection> getAccessControlProviders() { - synchronized (_accessControlProviders) - { - return new ArrayList>(_accessControlProviders.values()); - } + Collection children = getChildren(AccessControlProvider.class); + return children; } private void recordPreviouslyUsedPortNumberIfNecessary(Port port, Integer portNumber) @@ -1232,9 +1054,12 @@ public class BrokerAdapter extends AbstractConfiguredObject imple _messagesReceived.reset(); _dataReceived.reset(); - for (VirtualHostImpl vhost : _virtualHostRegistry.getVirtualHosts()) + for (VirtualHost vhost : getVirtualHosts()) { - vhost.resetStatistics(); + if(vhost instanceof VirtualHostImpl) + { + ((VirtualHostImpl) vhost).resetStatistics(); + } } } @@ -1285,27 +1110,37 @@ public class BrokerAdapter extends AbstractConfiguredObject imple _eventLogger.message(BrokerMessages.STATS_MSGS(RECEIVED, _messagesReceived.getPeak(), _messagesReceived.getTotal())); - Collection hosts = _virtualHostRegistry.getVirtualHosts(); + Collection> hosts = getVirtualHosts(); - if (hosts.size() > 1) - { - for (VirtualHostImpl vhost : hosts) + for (VirtualHost vhost : hosts) { - String name = vhost.getName(); - StatisticsCounter dataDelivered = vhost.getDataDeliveryStatistics(); - StatisticsCounter messagesDelivered = vhost.getMessageDeliveryStatistics(); - StatisticsCounter dataReceived = vhost.getDataReceiptStatistics(); - StatisticsCounter messagesReceived = vhost.getMessageReceiptStatistics(); - EventLogger logger = vhost.getEventLogger(); - logger.message(VirtualHostMessages.STATS_DATA(name, - DELIVERED, - dataDelivered.getPeak() / 1024.0, - dataDelivered.getTotal())); - logger.message(VirtualHostMessages.STATS_MSGS(name, DELIVERED, messagesDelivered.getPeak(), messagesDelivered.getTotal())); - logger.message(VirtualHostMessages.STATS_DATA(name, RECEIVED, dataReceived.getPeak() / 1024.0, dataReceived.getTotal())); - logger.message(VirtualHostMessages.STATS_MSGS(name, RECEIVED, messagesReceived.getPeak(), messagesReceived.getTotal())); + if(vhost instanceof VirtualHostImpl) + { + VirtualHostImpl vhostImpl = (VirtualHostImpl) vhost; + String name = vhost.getName(); + StatisticsCounter dataDelivered = vhostImpl.getDataDeliveryStatistics(); + StatisticsCounter messagesDelivered = vhostImpl.getMessageDeliveryStatistics(); + StatisticsCounter dataReceived = vhostImpl.getDataReceiptStatistics(); + StatisticsCounter messagesReceived = vhostImpl.getMessageReceiptStatistics(); + EventLogger logger = vhostImpl.getEventLogger(); + logger.message(VirtualHostMessages.STATS_DATA(name, + DELIVERED, + dataDelivered.getPeak() / 1024.0, + dataDelivered.getTotal())); + logger.message(VirtualHostMessages.STATS_MSGS(name, + DELIVERED, + messagesDelivered.getPeak(), + messagesDelivered.getTotal())); + logger.message(VirtualHostMessages.STATS_DATA(name, + RECEIVED, + dataReceived.getPeak() / 1024.0, + dataReceived.getTotal())); + logger.message(VirtualHostMessages.STATS_MSGS(name, + RECEIVED, + messagesReceived.getPeak(), + messagesReceived.getTotal())); + } } - } if (_reset) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java index 45b02c7ab3..4121e24368 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -420,7 +421,7 @@ abstract public class AbstractPort> extends AbstractCo //ManagementMode needs this relaxed to allow its overriding management ports to be inserted. //Enforce only a single port of each management protocol, as the plugins will only use one. - Collection> existingPorts = broker.getPorts(); + Collection> existingPorts = new HashSet>(broker.getPorts()); existingPorts.remove(this); for (Port existingPort : existingPorts) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java index a986cd8179..d694db264d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java @@ -24,6 +24,7 @@ import org.apache.qpid.server.model.AuthenticationProvider; import org.apache.qpid.server.model.ManagedAttribute; import org.apache.qpid.server.model.ManagedObject; import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; @ManagedObject( category = false, type = "AMQP") public interface AmqpPort> extends Port @@ -53,4 +54,6 @@ public interface AmqpPort> extends Port @ManagedAttribute( automate = true, mandatory = true ) AuthenticationProvider getAuthenticationProvider(); + + VirtualHostImpl getVirtualHost(String name); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java index 8c6744fbf2..ad52f420b6 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java @@ -41,11 +41,13 @@ import org.apache.qpid.server.model.ManagedAttributeField; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.model.TrustStore; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.plugin.QpidServiceLoader; import org.apache.qpid.server.plugin.TransportProviderFactory; import org.apache.qpid.server.transport.AcceptingTransport; import org.apache.qpid.server.transport.TransportProvider; import org.apache.qpid.server.util.ServerScopedRuntimeException; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.transport.network.security.ssl.QpidMultipleTrustManager; public class AmqpPortImpl extends AbstractPortWithAuthProvider implements AmqpPort @@ -89,6 +91,17 @@ public class AmqpPortImpl extends AbstractPortWithAuthProvider imp return _receiveBufferSize; } + @Override + public VirtualHostImpl getVirtualHost(String name) + { + // TODO - aliases + if(name == null || name.trim().length() == 0) + { + name = _broker.getDefaultVirtualHost(); + } + return (VirtualHostImpl) _broker.getChildByName(VirtualHost.class, name); + } + protected Set getDefaultProtocols() { Set defaultProtocols = EnumSet.of(Protocol.AMQP_0_8, Protocol.AMQP_0_9, Protocol.AMQP_0_9_1, diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManager.java index 2800064298..5db250a90f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManager.java @@ -45,7 +45,6 @@ import javax.xml.bind.DatatypeConverter; import org.apache.qpid.server.configuration.ConfiguredObjectRecoverer; import org.apache.qpid.server.configuration.RecovererProvider; -import org.apache.qpid.server.configuration.updater.ChangeAttributesTask; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.Broker; @@ -255,42 +254,34 @@ public class ScramSHA1AuthenticationManager @Override public boolean createUser(final String username, final String password, final Map attributes) { - if (getTaskExecutor().isTaskExecutorThread()) + return runTask(new TaskExecutor.Task() { - getSecurityManager().authoriseUserOperation(Operation.CREATE, username); - if(_users.containsKey(username)) + @Override + public Boolean execute() { - throw new IllegalArgumentException("User '"+username+"' already exists"); - } - try - { - Map userAttrs = new HashMap(); - userAttrs.put(User.ID, UUID.randomUUID()); - userAttrs.put(User.NAME, username); - userAttrs.put(User.PASSWORD, createStoredPassword(password)); - userAttrs.put(User.TYPE, SCRAM_USER_TYPE); - ScramAuthUser user = new ScramAuthUser(userAttrs, this); - user.create(); - - return true; - } - catch (SaslException e) - { - throw new IllegalArgumentException(e); - } - } - else - { - return getTaskExecutor().submitAndWait(new TaskExecutor.Task() - { - @Override - public Boolean call() + getSecurityManager().authoriseUserOperation(Operation.CREATE, username); + if (_users.containsKey(username)) { - return createUser(username, password, attributes); + throw new IllegalArgumentException("User '" + username + "' already exists"); } - }); - } - + try + { + Map userAttrs = new HashMap(); + userAttrs.put(User.ID, UUID.randomUUID()); + userAttrs.put(User.NAME, username); + userAttrs.put(User.PASSWORD, createStoredPassword(password)); + userAttrs.put(User.TYPE, SCRAM_USER_TYPE); + ScramAuthUser user = new ScramAuthUser(userAttrs, ScramSHA1AuthenticationManager.this); + user.create(); + + return true; + } + catch (SaslException e) + { + throw new IllegalArgumentException(e); + } + } + }); } private SecurityManager getSecurityManager() @@ -301,115 +292,64 @@ public class ScramSHA1AuthenticationManager @Override public void deleteUser(final String user) throws AccountNotFoundException { - if (getTaskExecutor().isTaskExecutorThread()) + runTask(new TaskExecutor.VoidTaskWithException() { - - final ScramAuthUser authUser = getUser(user); - if(authUser != null) - { - authUser.setState(State.ACTIVE, State.DELETED); - } - else + @Override + public void execute() throws AccountNotFoundException { - throw new AccountNotFoundException("No such user: '" + user + "'"); - } - } - else - { - AccountNotFoundException e = - getTaskExecutor().submitAndWait(new TaskExecutor.Task() { - - @Override - public AccountNotFoundException call() + final ScramAuthUser authUser = getUser(user); + if(authUser != null) { - try - { - deleteUser(user); - return null; - } - catch (AccountNotFoundException e) - { - return e; - } - + authUser.setState(State.ACTIVE, State.DELETED); + } + else + { + throw new AccountNotFoundException("No such user: '" + user + "'"); } - }); - - if(e != null) - { - throw e; } - } + }); } @Override public void setPassword(final String username, final String password) throws AccountNotFoundException { - if (getTaskExecutor().isTaskExecutorThread()) + runTask(new TaskExecutor.VoidTaskWithException() { - final ScramAuthUser authUser = getUser(username); - if(authUser != null) - { - authUser.setPassword(password); - } - else + @Override + public void execute() throws AccountNotFoundException { - throw new AccountNotFoundException("No such user: '" + username + "'"); - } - } - else - { - AccountNotFoundException e = - getTaskExecutor().submitAndWait(new TaskExecutor.Task() - { - - @Override - public AccountNotFoundException call() - { - try - { - setPassword(username, password); - return null; - } - catch (AccountNotFoundException e) - { - return e; - } - } - }); - - if (e != null) - { - throw e; + final ScramAuthUser authUser = getUser(username); + if (authUser != null) + { + authUser.setPassword(password); + } + else + { + throw new AccountNotFoundException("No such user: '" + username + "'"); + } } - } + }); } @Override public Map> getUsers() { - if (getTaskExecutor().isTaskExecutorThread()) + return runTask(new TaskExecutor.Task>>() { - Map> users = new HashMap>(); - for(String user : _users.keySet()) + @Override + public Map> execute() { - users.put(user, Collections.emptyMap()); - } - return users; - } - else - { - return getTaskExecutor().submitAndWait(new TaskExecutor.Task>>() - { - @Override - public Map> call() + + Map> users = new HashMap>(); + for (String user : _users.keySet()) { - return getUsers(); + users.put(user, Collections.emptyMap()); } - }); - } + return users; + } + }); } @Override @@ -491,27 +431,31 @@ public class ScramSHA1AuthenticationManager public void setAttributes(final Map attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException { - if (getTaskExecutor().isTaskExecutorThread()) + runTask(new TaskExecutor.VoidTask() { - Map modifiedAttributes = new HashMap(attributes); - final String newPassword = (String) attributes.get(User.PASSWORD); - if(attributes.containsKey(User.PASSWORD) && !newPassword.equals(getActualAttributes().get(User.PASSWORD))) + + @Override + public void execute() { - try - { - modifiedAttributes.put(User.PASSWORD, _authenticationManager.createStoredPassword(newPassword)); - } - catch (SaslException e) + Map modifiedAttributes = new HashMap(attributes); + final String newPassword = (String) attributes.get(User.PASSWORD); + if (attributes.containsKey(User.PASSWORD) + && !newPassword.equals(getActualAttributes().get(User.PASSWORD))) { - throw new IllegalArgumentException(e); + try + { + modifiedAttributes.put(User.PASSWORD, + _authenticationManager.createStoredPassword(newPassword)); + } + catch (SaslException e) + { + throw new IllegalArgumentException(e); + } } + ScramSHA1AuthenticationManager.ScramAuthUser.super.setAttributes(modifiedAttributes); } - super.setAttributes(modifiedAttributes); - } - else - { - getTaskExecutor().submitAndWait(new ChangeAttributesTask(this, attributes)); - } + }); + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 5189bc6fee..510c071dd3 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -172,7 +172,7 @@ public abstract class AbstractVirtualHost> exte _broker = broker; _dtxRegistry = new DtxRegistry(); - _eventLogger = _broker.getVirtualHostRegistry().getEventLogger(); + _eventLogger = _broker.getParent(SystemContext.class).getEventLogger(); _eventLogger.message(VirtualHostMessages.CREATED(getName())); @@ -244,8 +244,6 @@ public abstract class AbstractVirtualHost> exte getMessageStore().addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_OVERFULL); getMessageStore().addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); - _broker.getVirtualHostRegistry().registerVirtualHost(this); - synchronized(_aliases) { @@ -869,11 +867,6 @@ public abstract class AbstractVirtualHost> exte return null; } - public VirtualHostRegistry getVirtualHostRegistry() - { - return _broker.getVirtualHostRegistry(); - } - public void registerMessageDelivered(long messageSize) { _messagesDelivered.registerEvent(1L); @@ -1448,15 +1441,7 @@ public abstract class AbstractVirtualHost> exte } else if (desiredState == State.STOPPED) { - try - { - close(); - } - finally - { - _broker.getVirtualHostRegistry().unregisterVirtualHost(this); - } - + close(); return true; } else if (desiredState == State.DELETED) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index d0737e8311..68469118f6 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -108,8 +108,6 @@ public interface VirtualHostImpl< X extends VirtualHostImpl, Q extends AM int getHouseKeepingActiveCount(); - VirtualHostRegistry getVirtualHostRegistry(); - DtxRegistry getDtxRegistry(); LinkRegistry getLinkRegistry(String remoteContainerId); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java deleted file mode 100644 index cc9ff549c4..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost; - -import org.apache.qpid.common.Closeable; -import org.apache.qpid.server.logging.EventLogger; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - - -public class VirtualHostRegistry implements Closeable -{ - private final Map _registry = new ConcurrentHashMap(); - private String _defaultVirtualHostName; - private final EventLogger _eventLogger; - - - public VirtualHostRegistry(EventLogger eventLogger) - { - _eventLogger = eventLogger; - } - - public synchronized void registerVirtualHost(VirtualHostImpl host) - { - if(_registry.containsKey(host.getName())) - { - throw new IllegalArgumentException("Virtual Host with name " + host.getName() + " already registered."); - } - _registry.put(host.getName(),host); - } - - public synchronized void unregisterVirtualHost(VirtualHostImpl host) - { - _registry.remove(host.getName()); - } - - public VirtualHostImpl getVirtualHost(String name) - { - if(name == null || name.trim().length() == 0 || "/".equals(name.trim())) - { - name = getDefaultVirtualHostName(); - } - - return _registry.get(name); - } - - public VirtualHostImpl getDefaultVirtualHost() - { - return getVirtualHost(getDefaultVirtualHostName()); - } - - private String getDefaultVirtualHostName() - { - return _defaultVirtualHostName; - } - - public void setDefaultVirtualHostName(String defaultVirtualHostName) - { - _defaultVirtualHostName = defaultVirtualHostName; - } - - - public Collection getVirtualHosts() - { - return new ArrayList(_registry.values()); - } - - public void close() - { - for (VirtualHostImpl virtualHost : getVirtualHosts()) - { - virtualHost.close(); - } - } - - public EventLogger getEventLogger() - { - return _eventLogger; - } -} diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostCreationTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostCreationTest.java index ea1d22f9ef..5831fe3caa 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostCreationTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostCreationTest.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.configuration.startup; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -38,12 +39,12 @@ import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.ConfiguredObjectFactory; import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl; import org.apache.qpid.server.model.Model; +import org.apache.qpid.server.model.SystemContext; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TestMemoryMessageStore; import org.apache.qpid.server.virtualhost.StandardVirtualHost; -import org.apache.qpid.server.virtualhost.VirtualHostRegistry; public class VirtualHostCreationTest extends TestCase { @@ -53,13 +54,14 @@ public class VirtualHostCreationTest extends TestCase SecurityManager securityManager = mock(SecurityManager.class); ConfigurationEntry entry = mock(ConfigurationEntry.class); ConfiguredObjectFactory objectFactory = new ConfiguredObjectFactoryImpl(Model.getInstance()); + SystemContext systemContext = mock(SystemContext.class); + Broker parent = mock(Broker.class); when(parent.getObjectFactory()).thenReturn(objectFactory); when(parent.getSecurityManager()).thenReturn(securityManager); when(parent.getCategoryClass()).thenReturn(Broker.class); - VirtualHostRegistry virtualHostRegistry = mock(VirtualHostRegistry.class); - when(virtualHostRegistry.getEventLogger()).thenReturn(mock(EventLogger.class)); - when(parent.getVirtualHostRegistry()).thenReturn(virtualHostRegistry); + when(systemContext.getEventLogger()).thenReturn(mock(EventLogger.class)); + when(parent.getParent(eq(SystemContext.class))).thenReturn(systemContext); Map attributes = new HashMap(); attributes.put(VirtualHost.NAME, getName()); @@ -90,11 +92,11 @@ public class VirtualHostCreationTest extends TestCase public void checkMandatoryAttributesAreValidated(String[] mandatoryAttributes, Map attributes) { SecurityManager securityManager = mock(SecurityManager.class); + SystemContext systemContext = mock(SystemContext.class); Broker parent = mock(Broker.class); when(parent.getSecurityManager()).thenReturn(securityManager); - VirtualHostRegistry virtualHostRegistry = mock(VirtualHostRegistry.class); - when(virtualHostRegistry.getEventLogger()).thenReturn(mock(EventLogger.class)); - when(parent.getVirtualHostRegistry()).thenReturn(virtualHostRegistry); + when(parent.getParent(eq(SystemContext.class))).thenReturn(systemContext); + when(systemContext.getEventLogger()).thenReturn(mock(EventLogger.class)); for (String name : mandatoryAttributes) { diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/TaskExecutorTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/TaskExecutorTest.java index 083160a31d..fb331c73c1 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/TaskExecutorTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/TaskExecutorTest.java @@ -135,10 +135,10 @@ public class TaskExecutorTest extends TestCase public void testSubmitAndWait() throws Exception { _executor.start(); - Object result = _executor.submitAndWait(new TaskExecutor.Task() + Object result = _executor.run(new TaskExecutor.Task() { @Override - public String call() + public String execute() { return "DONE"; } @@ -149,7 +149,7 @@ public class TaskExecutorTest extends TestCase public void testSubmitAndWaitInNotAuthorizedContext() { _executor.start(); - Object subject = _executor.submitAndWait(new SubjectRetriever()); + Object subject = _executor.run(new SubjectRetriever()); assertNull("Subject must be null", subject); } @@ -162,7 +162,7 @@ public class TaskExecutorTest extends TestCase @Override public Object run() { - return _executor.submitAndWait(new SubjectRetriever()); + return _executor.run(new SubjectRetriever()); } }); assertEquals("Unexpected subject", subject, result); @@ -176,7 +176,7 @@ public class TaskExecutorTest extends TestCase @Override public Object run() { - return _executor.submitAndWait(new SubjectRetriever()); + return _executor.run(new SubjectRetriever()); } }); assertEquals("Unexpected subject", null, result); @@ -188,11 +188,11 @@ public class TaskExecutorTest extends TestCase _executor.start(); try { - _executor.submitAndWait(new TaskExecutor.Task() + _executor.run(new TaskExecutor.Task() { @Override - public Void call() + public Void execute() { throw exception; } @@ -215,15 +215,15 @@ public class TaskExecutorTest extends TestCase @Override public Object run() { - _executor.submitAndWait(new TaskExecutor.Task() - { - @Override - public Void call() + _executor.run(new TaskExecutor.Task() { - taskSubject.set(Subject.getSubject(AccessController.getContext())); - return null; - } - }); + @Override + public Void execute() + { + taskSubject.set(Subject.getSubject(AccessController.getContext())); + return null; + } + }); return null; } }); @@ -234,7 +234,7 @@ public class TaskExecutorTest extends TestCase private class SubjectRetriever implements TaskExecutor.Task { @Override - public Subject call() + public Subject execute() { return Subject.getSubject(AccessController.getContext()); } @@ -251,7 +251,7 @@ public class TaskExecutorTest extends TestCase } @Override - public Void call() + public Void execute() { if (_waitLatch != null) { diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java index 064cfe651a..ec108a419d 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java @@ -43,6 +43,7 @@ public class VirtualHostTest extends QpidTestCase private Broker _broker; private StatisticsGatherer _statisticsGatherer; private RecovererProvider _recovererProvider; + private TaskExecutor _taskExecutor; @Override protected void setUp() throws Exception @@ -50,14 +51,22 @@ public class VirtualHostTest extends QpidTestCase super.setUp(); _broker = BrokerTestHelper.createBrokerMock(); - TaskExecutor taskExecutor = mock(TaskExecutor.class); - when(taskExecutor.isTaskExecutorThread()).thenReturn(true); - when(_broker.getTaskExecutor()).thenReturn(taskExecutor); + _taskExecutor = new TaskExecutor(); + _taskExecutor.start(); + when(_broker.getTaskExecutor()).thenReturn(_taskExecutor); _recovererProvider = mock(RecovererProvider.class); _statisticsGatherer = mock(StatisticsGatherer.class); } + + @Override + public void tearDown() throws Exception + { + _taskExecutor.stopImmediately(); + super.tearDown(); + } + public void testInitialisingState() { VirtualHost host = createHost(); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderTest.java index 9c6a840d76..d8ee851813 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderTest.java @@ -49,6 +49,7 @@ public class FileSystemPreferencesProviderTest extends QpidTestCase private Broker _broker; private String _user1, _user2; private File _preferencesFile; + private TaskExecutor _taskExecutor; protected void setUp() throws Exception { @@ -60,9 +61,10 @@ public class FileSystemPreferencesProviderTest extends QpidTestCase _preferencesFile = TestFileUtils.createTempFile(this, ".prefs.json", TEST_PREFERENCES); _broker = BrokerTestHelper.createBrokerMock(); - TaskExecutor taskExecutor = mock(TaskExecutor.class); - when(taskExecutor.isTaskExecutorThread()).thenReturn(true); - when(_broker.getTaskExecutor()).thenReturn(taskExecutor); + _taskExecutor = new TaskExecutor(); + _taskExecutor.start(); + when(_broker.getTaskExecutor()).thenReturn(_taskExecutor); + when(_authenticationProvider.getParent(Broker.class)).thenReturn(_broker); } @@ -76,6 +78,7 @@ public class FileSystemPreferencesProviderTest extends QpidTestCase } BrokerTestHelper.tearDown(); _preferencesFile.delete(); + _taskExecutor.stopImmediately(); } finally { diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java index 01f4ed4299..e350774772 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java @@ -62,7 +62,6 @@ import org.apache.qpid.server.virtualhost.AbstractVirtualHost; import org.apache.qpid.server.virtualhost.QueueExistsException; import org.apache.qpid.server.virtualhost.StandardVirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import org.apache.qpid.server.virtualhost.VirtualHostRegistry; public class BrokerTestHelper { @@ -80,17 +79,22 @@ public class BrokerTestHelper { ConfiguredObjectFactory objectFactory = new ConfiguredObjectFactoryImpl(Model.getInstance()); SubjectCreator subjectCreator = mock(SubjectCreator.class); + when(subjectCreator.getMechanisms()).thenReturn(""); Broker broker = mock(Broker.class); when(broker.getConnection_sessionCountLimit()).thenReturn(1); when(broker.getConnection_closeWhenNoRoute()).thenReturn(false); when(broker.getId()).thenReturn(UUID.randomUUID()); when(broker.getSubjectCreator(any(SocketAddress.class))).thenReturn(subjectCreator); - when(broker.getVirtualHostRegistry()).thenReturn(new VirtualHostRegistry(new EventLogger())); when(broker.getSecurityManager()).thenReturn(new SecurityManager(mock(Broker.class), false)); when(broker.getObjectFactory()).thenReturn(objectFactory); when(broker.getEventLogger()).thenReturn(new EventLogger()); when(broker.getCategoryClass()).thenReturn(Broker.class); + + SystemContext systemContext = mock(SystemContext.class); + when(systemContext.getEventLogger()).thenReturn(new EventLogger()); + when(broker.getParent(eq(SystemContext.class))).thenReturn(systemContext); + return broker; } @@ -102,7 +106,7 @@ public class BrokerTestHelper { } - public static VirtualHostImpl createVirtualHost(VirtualHostRegistry virtualHostRegistry, Map attributes) + public static VirtualHostImpl createVirtualHost(Map attributes) throws Exception { @@ -115,7 +119,6 @@ public class BrokerTestHelper ConfiguredObjectFactory objectFactory = new ConfiguredObjectFactoryImpl(Model.getInstance()); Broker broker = mock(Broker.class); when(broker.getParent(eq(SystemContext.class))).thenReturn(systemContext); - when(broker.getVirtualHostRegistry()).thenReturn(virtualHostRegistry); when(broker.getTaskExecutor()).thenReturn(TASK_EXECUTOR); SecurityManager securityManager = new SecurityManager(broker, false); when(broker.getSecurityManager()).thenReturn(securityManager); @@ -132,11 +135,6 @@ public class BrokerTestHelper } public static VirtualHostImpl createVirtualHost(String name) throws Exception - { - return createVirtualHost(name, new VirtualHostRegistry(new EventLogger())); - } - - public static VirtualHostImpl createVirtualHost(String name, VirtualHostRegistry virtualHostRegistry) throws Exception { Map attributes = new HashMap(); attributes.put(org.apache.qpid.server.model.VirtualHost.TYPE, StandardVirtualHost.TYPE); @@ -147,7 +145,7 @@ public class BrokerTestHelper attributes.put(org.apache.qpid.server.model.VirtualHost.MESSAGE_STORE_SETTINGS, messageStoreSettings); attributes.put(org.apache.qpid.server.model.VirtualHost.NAME, name); - return createVirtualHost(virtualHostRegistry, attributes); + return createVirtualHost(attributes); } public static AMQSessionModel createSession(int channelId, AMQConnectionModel connection) diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java index 85eede527a..942781e7ba 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java @@ -66,12 +66,6 @@ public class MockVirtualHost implements VirtualHostImpl For each AMQProtocolHandler @@ -142,11 +139,6 @@ public class AMQStateManager implements AMQMethodListener } - public VirtualHostRegistry getVirtualHostRegistry() - { - return _broker.getVirtualHostRegistry(); - } - public AMQProtocolSession getProtocolSession() { return _protocolSession; diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index 031febbb1a..5b8d3a488e 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -49,6 +49,7 @@ import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.SessionModelListener; @@ -133,7 +134,8 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod { host = _broker.getDefaultVirtualHost(); } - _vhost = _broker.getVirtualHostRegistry().getVirtualHost(host); + + _vhost = ((AmqpPort)_port).getVirtualHost(host); if(_vhost == null) { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java index e674cef2d0..de2b594211 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java @@ -35,7 +35,6 @@ import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.NetworkConnection; @@ -51,26 +50,16 @@ public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase super.setUp(); BrokerTestHelper.setUp(); _broker = BrokerTestHelper.createBrokerMock(); - VirtualHostRegistry virtualHostRegistry = _broker.getVirtualHostRegistry(); when(_broker.getAttribute(Broker.DEFAULT_VIRTUAL_HOST)).thenReturn("default"); when(_broker.getDefaultVirtualHost()).thenReturn("default"); - // AMQP 1-0 connection needs default vhost to be present - _virtualHost = BrokerTestHelper.createVirtualHost("default", virtualHostRegistry); } @Override protected void tearDown() throws Exception { - try - { - _virtualHost.close(); - } - finally - { BrokerTestHelper.tearDown(); super.tearDown(); - } } private static final byte[] AMQP_0_8_HEADER = diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java index b1941f4d39..1b0f44af49 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java @@ -38,7 +38,6 @@ import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl; import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.exchange.TopicExchange; -import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.model.Binding; @@ -61,7 +60,6 @@ import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.AbstractVirtualHost; import org.apache.qpid.server.virtualhost.StandardVirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.util.FileUtils; @@ -180,7 +178,7 @@ public class VirtualHostMessageStoreTest extends QpidTestCase try { - _virtualHost = (AbstractVirtualHost) BrokerTestHelper.createVirtualHost(new VirtualHostRegistry(new EventLogger()), _attributes); + _virtualHost = (AbstractVirtualHost) BrokerTestHelper.createVirtualHost(_attributes); } catch (Exception e) { -- cgit v1.2.1