summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java9
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java35
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/ChangeAttributesTask.java62
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/ChangeStateTask.java67
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/CreateChildTask.java78
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/SetAttributeTask.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java123
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java136
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContextImpl.java199
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java583
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java13
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManager.java212
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java19
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java101
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostCreationTest.java16
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/TaskExecutorTest.java34
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java15
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderTest.java9
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java18
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java6
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java4
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java3
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java10
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java4
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java11
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java4
30 files changed, 692 insertions, 1095 deletions
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java
index 3e23df6d87..5ad113f827 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java
@@ -59,6 +59,7 @@ public class VirtualHostTest extends QpidTestCase
private File _bdbStorePath;
private VirtualHost<?,?,?> _host;
private ConfigurationEntryStore _store;
+ private TaskExecutor _taskExecutor;
@Override
protected void setUp() throws Exception
@@ -67,9 +68,9 @@ public class VirtualHostTest extends QpidTestCase
_store = mock(ConfigurationEntryStore.class);
_broker = BrokerTestHelper.createBrokerMock();
- TaskExecutor taslExecutor = mock(TaskExecutor.class);
- when(taslExecutor.isTaskExecutorThread()).thenReturn(true);
- when(_broker.getTaskExecutor()).thenReturn(taslExecutor);
+ _taskExecutor = new TaskExecutor();
+ _taskExecutor.start();
+ when(_broker.getTaskExecutor()).thenReturn(_taskExecutor);
_statisticsGatherer = mock(StatisticsGatherer.class);
@@ -89,6 +90,8 @@ public class VirtualHostTest extends QpidTestCase
}
finally
{
+ _taskExecutor.stopImmediately();
+
if (_bdbStorePath != null)
{
FileUtils.delete(_bdbStorePath, true);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
index c1a5f92717..76826bfc56 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
@@ -29,6 +29,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.exchange.AbstractExchange;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.logging.EventLogger;
@@ -255,26 +256,22 @@ public class BindingImpl
public void setArguments(final Map<String, Object> arguments)
{
- if(getTaskExecutor().isTaskExecutorThread())
- {
- _arguments = arguments;
- super.setAttribute(ARGUMENTS, getActualAttributes().get(ARGUMENTS), arguments);
- if (isDurable())
- {
- VirtualHostImpl<?, ?, ?> vhost = (VirtualHostImpl<?, ?, ?>) _exchange.getParent(VirtualHost.class);
- vhost.getDurableConfigurationStore().update(true, asObjectRecord());
- }
- }
- else
- {
- getTaskExecutor().submitAndWait(new Runnable()
- {
- @Override
- public void run()
+ runTask(new TaskExecutor.VoidTask()
{
- setArguments(arguments);
+ @Override
+ public void execute()
+ {
+ _arguments = arguments;
+ BindingImpl.super.setAttribute(ARGUMENTS, getActualAttributes().get(ARGUMENTS), arguments);
+ if (isDurable())
+ {
+ VirtualHostImpl<?, ?, ?> vhost =
+ (VirtualHostImpl<?, ?, ?>) _exchange.getParent(VirtualHost.class);
+ vhost.getDurableConfigurationStore().update(true, asObjectRecord());
+ }
+ }
}
- });
- }
+ );
+
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/ChangeAttributesTask.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/ChangeAttributesTask.java
deleted file mode 100644
index fb672635b4..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/ChangeAttributesTask.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.configuration.updater;
-
-import java.util.Map;
-import java.util.concurrent.Callable;
-
-import org.apache.qpid.server.model.ConfiguredObject;
-
-public class ChangeAttributesTask implements TaskExecutor.Task<Void>
-{
- private final Map<String, Object> _attributes;
- private final ConfiguredObject _object;
-
- public ChangeAttributesTask(ConfiguredObject target, Map<String, Object> attributes)
- {
- super();
- _object = target;
- _attributes = attributes;
- }
-
- @Override
- public Void call()
- {
- _object.setAttributes(_attributes);
- return null;
- }
-
- public Map<String, Object> getAttributes()
- {
- return _attributes;
- }
-
- public ConfiguredObject getObject()
- {
- return _object;
- }
-
- @Override
- public String toString()
- {
- return "ChangeAttributesTask [object=" + _object + ", attributes=" + _attributes + "]";
- }
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/ChangeStateTask.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/ChangeStateTask.java
deleted file mode 100644
index 42ce1d8c03..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/ChangeStateTask.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.configuration.updater;
-
-import java.util.concurrent.Callable;
-
-import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.State;
-
-public final class ChangeStateTask implements TaskExecutor.Task<State>
-{
- private ConfiguredObject _object;
- private State _expectedState;
- private State _desiredState;
-
- public ChangeStateTask(ConfiguredObject object, State expectedState, State desiredState)
- {
- _object = object;
- _expectedState = expectedState;
- _desiredState = desiredState;
- }
-
- public ConfiguredObject getObject()
- {
- return _object;
- }
-
- public State getExpectedState()
- {
- return _expectedState;
- }
-
- public State getDesiredState()
- {
- return _desiredState;
- }
-
- @Override
- public State call()
- {
- return _object.setDesiredState(_expectedState, _desiredState);
- }
-
- @Override
- public String toString()
- {
- return "ChangeStateTask [object=" + _object + ", expectedState=" + _expectedState + ", desiredState=" + _desiredState + "]";
- }
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/CreateChildTask.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/CreateChildTask.java
deleted file mode 100644
index 27bc6d73f6..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/CreateChildTask.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.configuration.updater;
-
-import java.util.Arrays;
-import java.util.Map;
-import java.util.concurrent.Callable;
-
-import org.apache.qpid.server.model.ConfiguredObject;
-
-public final class CreateChildTask implements TaskExecutor.Task<ConfiguredObject>
-{
- private ConfiguredObject _object;
- private Class<? extends ConfiguredObject> _childClass;
- private Map<String, Object> _attributes;
- private ConfiguredObject[] _otherParents;
-
- public CreateChildTask(ConfiguredObject object, Class<? extends ConfiguredObject> childClass, Map<String, Object> attributes,
- ConfiguredObject... otherParents)
- {
- _object = object;
- _childClass = childClass;
- _attributes = attributes;
- _otherParents = otherParents;
- }
-
- public ConfiguredObject getObject()
- {
- return _object;
- }
-
- public Class<? extends ConfiguredObject> getChildClass()
- {
- return _childClass;
- }
-
- public Map<String, Object> getAttributes()
- {
- return _attributes;
- }
-
- public ConfiguredObject[] getOtherParents()
- {
- return _otherParents;
- }
-
- @Override
- public ConfiguredObject call()
- {
- return _object.createChild(_childClass, _attributes, _otherParents);
- }
-
- @Override
- public String toString()
- {
- return "CreateChildTask [object=" + _object + ", childClass=" + _childClass + ", attributes=" + _attributes
- + ", otherParents=" + Arrays.toString(_otherParents) + "]";
- }
-
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/SetAttributeTask.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/SetAttributeTask.java
index f8d29c368b..6ad8b8187f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/SetAttributeTask.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/SetAttributeTask.java
@@ -60,7 +60,7 @@ public final class SetAttributeTask implements TaskExecutor.Task<Object>
}
@Override
- public Object call()
+ public Object execute()
{
return _object.setAttribute(_attributeName, _expectedValue, _desiredValue);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
index 4e3601efcc..29b03ff962 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
@@ -49,11 +49,27 @@ public class TaskExecutor
private final AtomicReference<State> _state;
private volatile ExecutorService _executor;
- public static interface Task<X> extends Callable<X>
+ public static interface Task<X>
{
- X call();
+ X execute();
}
+ public static interface VoidTask
+ {
+ void execute();
+ }
+
+ public static interface TaskWithException<X,E extends Exception>
+ {
+ X execute() throws E;
+ }
+
+ public static interface VoidTaskWithException<E extends Exception>
+ {
+ void execute() throws E;
+ }
+
+
public TaskExecutor()
{
_state = new AtomicReference<State>(State.INITIALISING);
@@ -142,20 +158,109 @@ public class TaskExecutor
return future;
}
- public void submitAndWait(final Runnable task) throws CancellationException
+ public void run(final VoidTask task) throws CancellationException
{
- submitAndWait(new Task<Void>()
+ run(new Task<Void>()
{
@Override
- public Void call()
+ public Void execute()
{
- task.run();
+ task.execute();
return null;
}
});
}
- public <T> T submitAndWait(Task<T> task) throws CancellationException
+ private static class ExceptionTaskWrapper<T, E extends Exception> implements Task<T>
+ {
+ private final TaskWithException<T,E> _underlying;
+ private E _exception;
+
+ private ExceptionTaskWrapper(final TaskWithException<T, E> underlying)
+ {
+ _underlying = underlying;
+ }
+
+
+ @Override
+ public T execute()
+ {
+ try
+ {
+ return _underlying.execute();
+ }
+ catch (Exception e)
+ {
+ _exception = (E) e;
+ return null;
+ }
+ }
+
+ E getException()
+ {
+ return _exception;
+ }
+ }
+
+
+ private static class ExceptionVoidTaskWrapper<E extends Exception> implements Task<Void>
+ {
+ private final VoidTaskWithException<E> _underlying;
+ private E _exception;
+
+ private ExceptionVoidTaskWrapper(final VoidTaskWithException<E> underlying)
+ {
+ _underlying = underlying;
+ }
+
+
+ @Override
+ public Void execute()
+ {
+ try
+ {
+ _underlying.execute();
+
+ }
+ catch (Exception e)
+ {
+ _exception = (E) e;
+ }
+ return null;
+ }
+
+ E getException()
+ {
+ return _exception;
+ }
+ }
+
+ public <T, E extends Exception> T run(TaskWithException<T,E> task) throws CancellationException, E
+ {
+ ExceptionTaskWrapper<T,E> wrapper = new ExceptionTaskWrapper<T, E>(task);
+ T result = run(wrapper);
+ if(wrapper.getException() != null)
+ {
+ throw wrapper.getException();
+ }
+ else
+ {
+ return result;
+ }
+ }
+
+
+ public <E extends Exception> void run(VoidTaskWithException<E> task) throws CancellationException, E
+ {
+ ExceptionVoidTaskWrapper<E> wrapper = new ExceptionVoidTaskWrapper<E>(task);
+ run(wrapper);
+ if(wrapper.getException() != null)
+ {
+ throw wrapper.getException();
+ }
+ }
+
+ public <T> T run(Task<T> task) throws CancellationException
{
try
{
@@ -207,7 +312,7 @@ public class TaskExecutor
{
LOGGER.debug("Performing task " + userTask);
}
- T result = userTask.call();
+ T result = userTask.execute();
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Task " + userTask + " is performed successfully with result:" + result);
@@ -215,7 +320,7 @@ public class TaskExecutor
return result;
}
- private class CallableWrapper<T> implements Task<T>
+ private class CallableWrapper<T> implements Callable<T>
{
private Task<T> _userTask;
private Subject _contextSubject;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
index 03b19608af..6a9b6df36c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
@@ -48,10 +48,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import javax.security.auth.Subject;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
-import org.apache.qpid.server.configuration.updater.ChangeAttributesTask;
-import org.apache.qpid.server.configuration.updater.ChangeStateTask;
-import org.apache.qpid.server.configuration.updater.CreateChildTask;
-import org.apache.qpid.server.configuration.updater.SetAttributeTask;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
@@ -544,23 +540,25 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
public final State setDesiredState(final State currentState, final State desiredState)
throws IllegalStateTransitionException, AccessControlException
{
- if (_taskExecutor.isTaskExecutorThread())
- {
- authoriseSetDesiredState(currentState, desiredState);
- if (setState(currentState, desiredState))
- {
- notifyStateChanged(currentState, desiredState);
- return desiredState;
- }
- else
- {
- return getState();
- }
- }
- else
- {
- return _taskExecutor.submitAndWait(new ChangeStateTask(this, currentState, desiredState));
- }
+
+
+ return runTask(new TaskExecutor.Task<State>()
+ {
+ @Override
+ public State execute()
+ {
+ authoriseSetDesiredState(currentState, desiredState);
+ if (setState(currentState, desiredState))
+ {
+ notifyStateChanged(currentState, desiredState);
+ return desiredState;
+ }
+ else
+ {
+ return getState();
+ }
+ }
+ });
}
/**
@@ -727,25 +725,25 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
public Object setAttribute(final String name, final Object expected, final Object desired)
throws IllegalStateException, AccessControlException, IllegalArgumentException
{
- if (_taskExecutor.isTaskExecutorThread())
+ return _taskExecutor.run(new TaskExecutor.Task<Object>()
{
- authoriseSetAttributes(createProxyForValidation(Collections.singletonMap(name, desired)),
- Collections.singleton(name));
-
- if (changeAttribute(name, expected, desired))
- {
- attributeSet(name, expected, desired);
- return desired;
- }
- else
+ @Override
+ public Object execute()
{
- return getAttribute(name);
+ authoriseSetAttributes(createProxyForValidation(Collections.singletonMap(name, desired)),
+ Collections.singleton(name));
+
+ if (changeAttribute(name, expected, desired))
+ {
+ attributeSet(name, expected, desired);
+ return desired;
+ }
+ else
+ {
+ return getAttribute(name);
+ }
}
- }
- else
- {
- return _taskExecutor.submitAndWait(new SetAttributeTask(this, name, expected, desired));
- }
+ });
}
protected boolean changeAttribute(final String name, final Object expected, final Object desired)
@@ -864,22 +862,23 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
@SuppressWarnings("unchecked")
@Override
- public <C extends ConfiguredObject> C createChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents)
+ public <C extends ConfiguredObject> C createChild(final Class<C> childClass, final Map<String, Object> attributes,
+ final ConfiguredObject... otherParents)
{
- if (_taskExecutor.isTaskExecutorThread())
- {
- authoriseCreateChild(childClass, attributes, otherParents);
- C child = addChild(childClass, attributes, otherParents);
- if (child != null)
+ return _taskExecutor.run(new TaskExecutor.Task<C>() {
+
+ @Override
+ public C execute()
{
- childAdded(child);
+ authoriseCreateChild(childClass, attributes, otherParents);
+ C child = addChild(childClass, attributes, otherParents);
+ if (child != null)
+ {
+ childAdded(child);
+ }
+ return child;
}
- return child;
- }
- else
- {
- return (C)_taskExecutor.submitAndWait(new CreateChildTask(this, childClass, attributes, otherParents));
- }
+ });
}
protected <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents)
@@ -973,18 +972,39 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
return _taskExecutor;
}
+ protected final <C> C runTask(TaskExecutor.Task<C> task)
+ {
+ return _taskExecutor.run(task);
+ }
+
+ protected void runTask(TaskExecutor.VoidTask task)
+ {
+ _taskExecutor.run(task);
+ }
+
+ protected final <T, E extends Exception> T runTask(TaskExecutor.TaskWithException<T,E> task) throws E
+ {
+ return _taskExecutor.run(task);
+ }
+
+ protected final <E extends Exception> void runTask(TaskExecutor.VoidTaskWithException<E> task) throws E
+ {
+ _taskExecutor.run(task);
+ }
+
+
@Override
public void setAttributes(final Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException
{
- if (getTaskExecutor().isTaskExecutorThread())
+ runTask(new TaskExecutor.VoidTask()
{
- authoriseSetAttributes(createProxyForValidation(attributes), attributes.keySet());
- changeAttributes(attributes);
- }
- else
- {
- getTaskExecutor().submitAndWait(new ChangeAttributesTask(this, attributes));
- }
+ @Override
+ public void execute()
+ {
+ authoriseSetAttributes(createProxyForValidation(attributes), attributes.keySet());
+ changeAttributes(attributes);
+ }
+ });
}
protected void authoriseSetAttributes(final ConfiguredObject<?> proxyForValidation,
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
index dbf138b8a2..a634d25c14 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
@@ -30,7 +30,6 @@ import org.apache.qpid.server.logging.LogRecorder;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.stats.StatisticsGatherer;
-import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
@ManagedObject( defaultType = "broker" )
public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventLoggerProvider, StatisticsGatherer
@@ -178,11 +177,6 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
Collection<TrustStore<?>> getTrustStores();
- /*
- * TODO: Remove this method. Eventually the broker will become a registry.
- */
- VirtualHostRegistry getVirtualHostRegistry();
-
TaskExecutor getTaskExecutor();
boolean isManagementMode();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContextImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContextImpl.java
index 68ca0f76c6..3ead1699b1 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContextImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContextImpl.java
@@ -84,121 +84,138 @@ public class SystemContextImpl extends AbstractConfiguredObject<SystemContextImp
}
@Override
- public void resolveObjects(ConfiguredObjectRecord... records)
+ public void resolveObjects(final ConfiguredObjectRecord... records)
{
-
- ConfiguredObjectFactory factory = getObjectFactory();
-
- Map<UUID, ConfiguredObject<?>> resolvedObjects = new HashMap<UUID, ConfiguredObject<?>>();
- resolvedObjects.put(getId(), this);
-
- Collection<ConfiguredObjectRecord> recordsWithUnresolvedParents = new ArrayList<ConfiguredObjectRecord>(Arrays.asList(records));
- Collection<UnresolvedConfiguredObject<? extends ConfiguredObject>> recordsWithUnresolvedDependencies =
- new ArrayList<UnresolvedConfiguredObject<? extends ConfiguredObject>>();
-
- boolean updatesMade;
-
- do
+ runTask(new TaskExecutor.VoidTask()
{
- updatesMade = false;
- Iterator<ConfiguredObjectRecord> iter = recordsWithUnresolvedParents.iterator();
- while (iter.hasNext())
+ @Override
+ public void execute()
{
- ConfiguredObjectRecord record = iter.next();
- Collection<ConfiguredObject<?>> parents = new ArrayList<ConfiguredObject<?>>();
- boolean foundParents = true;
- for (ConfiguredObjectRecord parent : record.getParents().values())
- {
- if (!resolvedObjects.containsKey(parent.getId()))
- {
- foundParents = false;
- break;
- }
- else
- {
- parents.add(resolvedObjects.get(parent.getId()));
- }
- }
- if (foundParents)
- {
- iter.remove();
- UnresolvedConfiguredObject<? extends ConfiguredObject> recovered =
- factory.recover(record, parents.toArray(new ConfiguredObject<?>[parents.size()]));
- Collection<ConfiguredObjectDependency<?>> dependencies =
- recovered.getUnresolvedDependencies();
- if (dependencies.isEmpty())
- {
- updatesMade = true;
- ConfiguredObject<?> resolved = recovered.resolve();
- resolvedObjects.put(resolved.getId(), resolved);
- }
- else
- {
- recordsWithUnresolvedDependencies.add(recovered);
- }
- }
- }
- Iterator<UnresolvedConfiguredObject<? extends ConfiguredObject>> unresolvedIter =
- recordsWithUnresolvedDependencies.iterator();
+ ConfiguredObjectFactory factory = getObjectFactory();
- while(unresolvedIter.hasNext())
- {
- UnresolvedConfiguredObject<? extends ConfiguredObject> unresolvedObject = unresolvedIter.next();
- Collection<ConfiguredObjectDependency<?>> dependencies =
- new ArrayList<ConfiguredObjectDependency<?>>(unresolvedObject.getUnresolvedDependencies());
+ Map<UUID, ConfiguredObject<?>> resolvedObjects = new HashMap<UUID, ConfiguredObject<?>>();
+ resolvedObjects.put(getId(), SystemContextImpl.this);
+
+ Collection<ConfiguredObjectRecord> recordsWithUnresolvedParents =
+ new ArrayList<ConfiguredObjectRecord>(Arrays.asList(records));
+ Collection<UnresolvedConfiguredObject<? extends ConfiguredObject>> recordsWithUnresolvedDependencies =
+ new ArrayList<UnresolvedConfiguredObject<? extends ConfiguredObject>>();
- for(ConfiguredObjectDependency dependency : dependencies)
+ boolean updatesMade;
+
+ do
{
- if(dependency instanceof ConfiguredObjectIdDependency)
+ updatesMade = false;
+ Iterator<ConfiguredObjectRecord> iter = recordsWithUnresolvedParents.iterator();
+ while (iter.hasNext())
{
- UUID id = ((ConfiguredObjectIdDependency)dependency).getId();
- if(resolvedObjects.containsKey(id))
+ ConfiguredObjectRecord record = iter.next();
+ Collection<ConfiguredObject<?>> parents = new ArrayList<ConfiguredObject<?>>();
+ boolean foundParents = true;
+ for (ConfiguredObjectRecord parent : record.getParents().values())
+ {
+ if (!resolvedObjects.containsKey(parent.getId()))
+ {
+ foundParents = false;
+ break;
+ }
+ else
+ {
+ parents.add(resolvedObjects.get(parent.getId()));
+ }
+ }
+ if (foundParents)
{
- dependency.resolve(resolvedObjects.get(id));
+ iter.remove();
+ UnresolvedConfiguredObject<? extends ConfiguredObject> recovered =
+ factory.recover(record, parents.toArray(new ConfiguredObject<?>[parents.size()]));
+ Collection<ConfiguredObjectDependency<?>> dependencies =
+ recovered.getUnresolvedDependencies();
+ if (dependencies.isEmpty())
+ {
+ updatesMade = true;
+ ConfiguredObject<?> resolved = recovered.resolve();
+ resolvedObjects.put(resolved.getId(), resolved);
+ }
+ else
+ {
+ recordsWithUnresolvedDependencies.add(recovered);
+ }
}
+
}
- else if(dependency instanceof ConfiguredObjectNameDependency)
+
+ Iterator<UnresolvedConfiguredObject<? extends ConfiguredObject>> unresolvedIter =
+ recordsWithUnresolvedDependencies.iterator();
+
+ while (unresolvedIter.hasNext())
{
- ConfiguredObject<?> dependentObject = null;
- for(ConfiguredObject<?> parent : unresolvedObject.getParents())
+ UnresolvedConfiguredObject<? extends ConfiguredObject> unresolvedObject = unresolvedIter.next();
+ Collection<ConfiguredObjectDependency<?>> dependencies =
+ new ArrayList<ConfiguredObjectDependency<?>>(unresolvedObject.getUnresolvedDependencies());
+
+ for (ConfiguredObjectDependency dependency : dependencies)
{
- dependentObject = parent.findConfiguredObject(dependency.getCategoryClass(), ((ConfiguredObjectNameDependency)dependency).getName());
- if(dependentObject != null)
+ if (dependency instanceof ConfiguredObjectIdDependency)
{
- break;
+ UUID id = ((ConfiguredObjectIdDependency) dependency).getId();
+ if (resolvedObjects.containsKey(id))
+ {
+ dependency.resolve(resolvedObjects.get(id));
+ }
+ }
+ else if (dependency instanceof ConfiguredObjectNameDependency)
+ {
+ ConfiguredObject<?> dependentObject = null;
+ for (ConfiguredObject<?> parent : unresolvedObject.getParents())
+ {
+ dependentObject = parent.findConfiguredObject(dependency.getCategoryClass(),
+ ((ConfiguredObjectNameDependency) dependency)
+ .getName()
+ );
+ if (dependentObject != null)
+ {
+ break;
+ }
+ }
+ if (dependentObject != null)
+ {
+ dependency.resolve(dependentObject);
+ }
+ }
+ else
+ {
+ throw new ServerScopedRuntimeException("Unknown dependency type "
+ + dependency.getClass()
+ .getSimpleName());
}
}
- if(dependentObject != null)
+ if (unresolvedObject.getUnresolvedDependencies().isEmpty())
{
- dependency.resolve(dependentObject);
+ updatesMade = true;
+ unresolvedIter.remove();
+ ConfiguredObject<?> resolved = unresolvedObject.resolve();
+ resolvedObjects.put(resolved.getId(), resolved);
}
}
- else
- {
- throw new ServerScopedRuntimeException("Unknown dependency type " + dependency.getClass().getSimpleName());
- }
+
+ } while (updatesMade && !(recordsWithUnresolvedDependencies.isEmpty()
+ && recordsWithUnresolvedParents.isEmpty()));
+
+ if (!recordsWithUnresolvedDependencies.isEmpty())
+ {
+ throw new IllegalArgumentException("Cannot resolve some objects: "
+ + recordsWithUnresolvedDependencies);
}
- if(unresolvedObject.getUnresolvedDependencies().isEmpty())
+ if (!recordsWithUnresolvedParents.isEmpty())
{
- updatesMade = true;
- unresolvedIter.remove();
- ConfiguredObject<?> resolved = unresolvedObject.resolve();
- resolvedObjects.put(resolved.getId(), resolved);
+ throw new IllegalArgumentException("Cannot resolve object because their parents cannot be found"
+ + recordsWithUnresolvedParents);
}
}
-
- } while(updatesMade && !(recordsWithUnresolvedDependencies.isEmpty() && recordsWithUnresolvedParents.isEmpty()));
-
- if(!recordsWithUnresolvedDependencies.isEmpty())
- {
- throw new IllegalArgumentException("Cannot resolve some objects: " + recordsWithUnresolvedDependencies);
- }
- if(!recordsWithUnresolvedParents.isEmpty())
- {
- throw new IllegalArgumentException("Cannot resolve object because their parents cannot be found" + recordsWithUnresolvedParents);
- }
+ });
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
index c9954467e4..28553412f7 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
@@ -24,9 +24,7 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.security.AccessControlException;
import java.security.PrivilegedAction;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -59,7 +57,6 @@ import org.apache.qpid.server.security.auth.manager.SimpleAuthenticationManager;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.util.SystemUtils;
public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> implements Broker<BrokerAdapter>, ConfigurationChangeListener, StatisticsGatherer, StatisticsGatherer.Source
@@ -77,18 +74,10 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
private EventLogger _eventLogger;
- private final VirtualHostRegistry _virtualHostRegistry;
+ //private final VirtualHostRegistry _virtualHostRegistry;
private final LogRecorder _logRecorder;
- private final Map<String, VirtualHost<?,?,?>> _vhostAdapters = new HashMap<String, VirtualHost<?,?,?>>();
- private final Map<UUID, Port<?>> _portAdapters = new HashMap<UUID, Port<?>>();
private final Map<Port, Integer> _stillInUsePortNumbers = new HashMap<Port, Integer>();
- private final Map<UUID, AuthenticationProvider<?>> _authenticationProviders = new HashMap<UUID, AuthenticationProvider<?>>();
- private final Map<String, GroupProvider<?>> _groupProviders = new HashMap<String, GroupProvider<?>>();
- private final Map<UUID, ConfiguredObject<?>> _plugins = new HashMap<UUID, ConfiguredObject<?>>();
- private final Map<String, KeyStore<?>> _keyStores = new HashMap<String, KeyStore<?>>();
- private final Map<String, TrustStore<?>> _trustStores = new HashMap<String, TrustStore<?>>();
- private final Map<UUID, AccessControlProvider<?>> _accessControlProviders = new HashMap<UUID, AccessControlProvider<?>>();
private final SecurityManager _securityManager;
@@ -122,7 +111,7 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
parent.getTaskExecutor());
_objectFactory = parent.getObjectFactory();
- _virtualHostRegistry = new VirtualHostRegistry(parent.getEventLogger());
+ //_virtualHostRegistry = new VirtualHostRegistry(parent.getEventLogger());
_logRecorder = parent.getLogRecorder();
_eventLogger = parent.getEventLogger();
@@ -220,7 +209,6 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
{
_managementModeAuthenticationProvider.open();
}
- _virtualHostRegistry.setDefaultVirtualHostName(getDefaultVirtualHost());
for(KeyStore<?> keyStore : getChildren(KeyStore.class))
{
@@ -375,26 +363,20 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
public Collection<VirtualHost<?,?,?>> getVirtualHosts()
{
- synchronized(_vhostAdapters)
- {
- return new ArrayList<VirtualHost<?,?,?>>(_vhostAdapters.values());
- }
+ Collection children = getChildren(VirtualHost.class);
+ return children;
}
public Collection<Port<?>> getPorts()
{
- synchronized (_portAdapters)
- {
- return new ArrayList<Port<?>>(_portAdapters.values());
- }
+ Collection children = getChildren(Port.class);
+ return children;
}
public Collection<AuthenticationProvider<?>> getAuthenticationProviders()
{
- synchronized (_authenticationProviders)
- {
- return new ArrayList<AuthenticationProvider<?>>(_authenticationProviders.values());
- }
+ Collection children = getChildren(AuthenticationProvider.class);
+ return children;
}
public AuthenticationProvider<?> findAuthenticationProviderByName(String authenticationProviderName)
@@ -403,40 +385,24 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
{
return _managementModeAuthenticationProvider;
}
- Collection<AuthenticationProvider<?>> providers = getAuthenticationProviders();
- for (AuthenticationProvider<?> authenticationProvider : providers)
- {
- if (authenticationProvider.getName().equals(authenticationProviderName))
- {
- return authenticationProvider;
- }
- }
- return null;
+ return getChildByName(AuthenticationProvider.class, authenticationProviderName);
}
public KeyStore<?> findKeyStoreByName(String keyStoreName)
{
- synchronized(_keyStores)
- {
- return _keyStores.get(keyStoreName);
- }
+ return getChildByName(KeyStore.class, keyStoreName);
}
public TrustStore<?> findTrustStoreByName(String trustStoreName)
{
- synchronized(_trustStores)
- {
- return _trustStores.get(trustStoreName);
- }
+ return getChildByName(TrustStore.class, trustStoreName);
}
@Override
public Collection<GroupProvider<?>> getGroupProviders()
{
- synchronized (_groupProviders)
- {
- return new ArrayList<GroupProvider<?>>(_groupProviders.values());
- }
+ Collection children = getChildren(GroupProvider.class);
+ return children;
}
private VirtualHost createVirtualHost(final Map<String, Object> attributes)
@@ -462,10 +428,6 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
private boolean deleteVirtualHost(final VirtualHost vhost) throws AccessControlException, IllegalStateException
{
- synchronized (_vhostAdapters)
- {
- _vhostAdapters.remove(vhost.getName());
- }
vhost.removeChangeListener(this);
return true;
}
@@ -476,18 +438,6 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
return null; //TODO
}
- public long getTimeToLive()
- {
- return 0;
- }
-
- public long setTimeToLive(final long expected, final long desired)
- throws IllegalStateException, AccessControlException, IllegalArgumentException
- {
- throw new IllegalStateException();
- }
-
-
@Override
public long getBytesIn()
{
@@ -514,40 +464,48 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
@SuppressWarnings("unchecked")
@Override
- public <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents)
+ public <C extends ConfiguredObject> C addChild(final Class<C> childClass, final Map<String, Object> attributes, final ConfiguredObject... otherParents)
{
- if(childClass == VirtualHost.class)
- {
- return (C) createVirtualHost(attributes);
- }
- else if(childClass == Port.class)
+ return runTask( new TaskExecutor.Task<C>()
{
- return (C) createPort(attributes);
- }
- else if(childClass == AccessControlProvider.class)
- {
- return (C) createAccessControlProvider(attributes);
- }
- else if(childClass == AuthenticationProvider.class)
- {
- return (C) createAuthenticationProvider(attributes);
- }
- else if(childClass == KeyStore.class)
- {
- return (C) createKeyStore(attributes);
- }
- else if(childClass == TrustStore.class)
- {
- return (C) createTrustStore(attributes);
- }
- else if(childClass == GroupProvider.class)
- {
- return (C) createGroupProvider(attributes);
- }
- else
- {
- throw new IllegalArgumentException("Cannot create child of class " + childClass.getSimpleName());
- }
+ @Override
+ public C execute()
+ {
+ if (childClass == VirtualHost.class)
+ {
+ return (C) createVirtualHost(attributes);
+ }
+ else if (childClass == Port.class)
+ {
+ return (C) createPort(attributes);
+ }
+ else if (childClass == AccessControlProvider.class)
+ {
+ return (C) createAccessControlProvider(attributes);
+ }
+ else if (childClass == AuthenticationProvider.class)
+ {
+ return (C) createAuthenticationProvider(attributes);
+ }
+ else if (childClass == KeyStore.class)
+ {
+ return (C) createKeyStore(attributes);
+ }
+ else if (childClass == TrustStore.class)
+ {
+ return (C) createTrustStore(attributes);
+ }
+ else if (childClass == GroupProvider.class)
+ {
+ return (C) createGroupProvider(attributes);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Cannot create child of class " + childClass.getSimpleName());
+ }
+ }
+ });
+
}
/**
@@ -569,101 +527,77 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
return port;
}
- private void addPort(Port<?> port)
+ private void addPort(final Port<?> port)
{
- synchronized (_portAdapters)
- {
- int portNumber = port.getPort();
- String portName = port.getName();
- UUID portId = port.getId();
+ assert getTaskExecutor().isTaskExecutorThread();
- for(Port<?> p : _portAdapters.values())
- {
- if(portNumber == p.getPort())
- {
- throw new IllegalConfigurationException("Can't add port " + portName + " because port number " + portNumber + " is already configured for port " + p.getName());
- }
-
- if(portName.equals(p.getName()))
- {
- throw new IllegalConfigurationException("Can't add Port because one with name " + portName + " already exists");
- }
+ int portNumber = port.getPort();
+ String portName = port.getName();
- if(portId.equals(p.getId()))
+ for (Port<?> p : getChildren(Port.class))
+ {
+ if(p != port)
+ {
+ if (portNumber == p.getPort())
{
- throw new IllegalConfigurationException("Can't add Port because one with id " + portId + " already exists");
+ throw new IllegalConfigurationException("Can't add port "
+ + portName
+ + " because port number "
+ + portNumber
+ + " is already configured for port "
+ + p.getName());
}
}
-
- _portAdapters.put(port.getId(), port);
}
+
port.addChangeListener(this);
+
}
- private AccessControlProvider<?> createAccessControlProvider(Map<String, Object> attributes)
+ private AccessControlProvider<?> createAccessControlProvider(final Map<String, Object> attributes)
{
- AccessControlProvider<?> accessControlProvider;
- synchronized (_accessControlProviders)
- {
- accessControlProvider = (AccessControlProvider<?>) createChild(AccessControlProvider.class, attributes);
- addAccessControlProvider(accessControlProvider);
- }
+ assert getTaskExecutor().isTaskExecutorThread();
+
+ AccessControlProvider<?> accessControlProvider = (AccessControlProvider<?>) createChild(AccessControlProvider.class, attributes);
+ addAccessControlProvider(accessControlProvider);
- boolean quiesce = isManagementMode() ;
+ boolean quiesce = isManagementMode();
accessControlProvider.setDesiredState(State.INITIALISING, quiesce ? State.QUIESCED : State.ACTIVE);
return accessControlProvider;
+
}
- /**
- * @throws IllegalConfigurationException if an AuthenticationProvider with the same name already exists
- */
- private void addAccessControlProvider(AccessControlProvider<?> accessControlProvider)
+ private void addAccessControlProvider(final AccessControlProvider<?> accessControlProvider)
{
- String name = accessControlProvider.getName();
- synchronized (_accessControlProviders)
- {
- if (_accessControlProviders.containsKey(accessControlProvider.getId()))
- {
- throw new IllegalConfigurationException("Can't add AccessControlProvider because one with id " + accessControlProvider.getId() + " already exists");
- }
- for (AccessControlProvider<?> provider : _accessControlProviders.values())
- {
- if (provider.getName().equals(name))
- {
- throw new IllegalConfigurationException("Can't add AccessControlProvider because one with name " + name + " already exists");
- }
- }
- _accessControlProviders.put(accessControlProvider.getId(), accessControlProvider);
- }
+ assert getTaskExecutor().isTaskExecutorThread();
accessControlProvider.addChangeListener(this);
accessControlProvider.addChangeListener(_securityManager);
+
}
private boolean deleteAccessControlProvider(AccessControlProvider<?> accessControlProvider)
{
- AccessControlProvider removedAccessControlProvider;
- synchronized (_accessControlProviders)
- {
- removedAccessControlProvider = _accessControlProviders.remove(accessControlProvider.getId());
- }
-
- if(removedAccessControlProvider != null)
- {
- removedAccessControlProvider.removeChangeListener(this);
- removedAccessControlProvider.removeChangeListener(_securityManager);
- }
+ accessControlProvider.removeChangeListener(this);
+ accessControlProvider.removeChangeListener(_securityManager);
- return removedAccessControlProvider != null;
+ return true;
}
- private AuthenticationProvider createAuthenticationProvider(Map<String, Object> attributes)
+ private AuthenticationProvider createAuthenticationProvider(final Map<String, Object> attributes)
{
- AuthenticationProvider<?> authenticationProvider = createChild(AuthenticationProvider.class, attributes);
- addAuthenticationProvider(authenticationProvider);
- authenticationProvider.setDesiredState(State.INITIALISING, State.ACTIVE);
- return authenticationProvider;
+ return runTask(new TaskExecutor.Task<AuthenticationProvider>()
+ {
+ @Override
+ public AuthenticationProvider execute()
+ {
+ AuthenticationProvider<?> authenticationProvider = createChild(AuthenticationProvider.class, attributes);
+ addAuthenticationProvider(authenticationProvider);
+ authenticationProvider.setDesiredState(State.INITIALISING, State.ACTIVE);
+ return authenticationProvider;
+ }
+ });
}
private <X extends ConfiguredObject> X createChild(Class<X> clazz, Map<String, Object> attributes)
@@ -685,61 +619,35 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
*/
private void addAuthenticationProvider(AuthenticationProvider<?> authenticationProvider)
{
- String name = authenticationProvider.getName();
- synchronized (_authenticationProviders)
- {
- if (_authenticationProviders.containsKey(authenticationProvider.getId()))
- {
- throw new IllegalConfigurationException("Cannot add AuthenticationProvider because one with id " + authenticationProvider.getId() + " already exists");
- }
- for (AuthenticationProvider provider : _authenticationProviders.values())
- {
- if (provider.getName().equals(name))
- {
- throw new IllegalConfigurationException("Cannot add AuthenticationProvider because one with name " + name + " already exists");
- }
- }
- _authenticationProviders.put(authenticationProvider.getId(), authenticationProvider);
- }
+ assert getTaskExecutor().isTaskExecutorThread();
+
authenticationProvider.addChangeListener(this);
}
- private GroupProvider<?> createGroupProvider(Map<String, Object> attributes)
+ private GroupProvider<?> createGroupProvider(final Map<String, Object> attributes)
{
- GroupProvider<?> groupProvider = createChild(GroupProvider.class, attributes);
- addGroupProvider(groupProvider);
- groupProvider.setDesiredState(State.INITIALISING, State.ACTIVE);
- return groupProvider;
+ return runTask(new TaskExecutor.Task<GroupProvider<?>>()
+ {
+ @Override
+ public GroupProvider<?> execute()
+ {
+ GroupProvider<?> groupProvider = createChild(GroupProvider.class, attributes);
+ addGroupProvider(groupProvider);
+ groupProvider.setDesiredState(State.INITIALISING, State.ACTIVE);
+ return groupProvider;
+ }
+ });
}
private void addGroupProvider(GroupProvider<?> groupProvider)
{
- synchronized (_groupProviders)
- {
- String name = groupProvider.getName();
- if(_groupProviders.containsKey(name))
- {
- throw new IllegalConfigurationException("Cannot add GroupProvider because one with name " + name + " already exists");
- }
- _groupProviders.put(name, groupProvider);
- }
groupProvider.addChangeListener(this);
}
private boolean deleteGroupProvider(GroupProvider groupProvider)
{
- GroupProvider removedGroupProvider = null;
- synchronized (_groupProviders)
- {
- removedGroupProvider = _groupProviders.remove(groupProvider.getName());
- }
-
- if(removedGroupProvider != null)
- {
- removedGroupProvider.removeChangeListener(this);
- }
-
- return removedGroupProvider != null;
+ groupProvider.removeChangeListener(this);
+ return true;
}
private KeyStore createKeyStore(Map<String, Object> attributes)
@@ -760,58 +668,25 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
private void addKeyStore(KeyStore keyStore)
{
- synchronized (_keyStores)
- {
- if(_keyStores.containsKey(keyStore.getName()))
- {
- throw new IllegalConfigurationException("Can't add KeyStore because one with name " + keyStore.getName() + " already exists");
- }
- _keyStores.put(keyStore.getName(), keyStore);
- }
keyStore.addChangeListener(this);
}
- private boolean deleteKeyStore(KeyStore object)
+ private boolean deleteKeyStore(KeyStore keyStore)
{
- synchronized(_keyStores)
- {
- String name = object.getName();
- KeyStore removedKeyStore = _keyStores.remove(name);
- if(removedKeyStore != null)
- {
- removedKeyStore.removeChangeListener(this);
- }
-
- return removedKeyStore != null;
- }
+ keyStore.removeChangeListener(this);
+ return true;
}
private void addTrustStore(TrustStore trustStore)
{
- synchronized (_trustStores)
- {
- if(_trustStores.containsKey(trustStore.getName()))
- {
- throw new IllegalConfigurationException("Can't add TrustStore because one with name " + trustStore.getName() + " already exists");
- }
- _trustStores.put(trustStore.getName(), trustStore);
- }
trustStore.addChangeListener(this);
}
- private boolean deleteTrustStore(TrustStore object)
+ private boolean deleteTrustStore(TrustStore trustStore)
{
- synchronized(_trustStores)
- {
- String name = object.getName();
- TrustStore removedTrustStore = _trustStores.remove(name);
- if(removedTrustStore != null)
- {
- removedTrustStore.removeChangeListener(this);
- }
+ trustStore.removeChangeListener(this);
+ return true;
- return removedTrustStore != null;
- }
}
@Override
@@ -824,56 +699,36 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
return super.getAttribute(name);
}
- private boolean deletePort(State oldState, Port portAdapter)
+ private boolean deletePort(State oldState, Port port)
{
- Port<?> removedPort;
- synchronized (_portAdapters)
- {
- removedPort = _portAdapters.remove(portAdapter.getId());
- }
+ port.removeChangeListener(this);
- if (removedPort != null)
- {
- removedPort.removeChangeListener(this);
+ // TODO - this seems suspicious, wouldn't it make more sense to not allow deletion from active
+ // (must be stopped first) or something?
- if(oldState == State.ACTIVE)
- {
- //Record the originally used port numbers of previously-active ports being deleted, to ensure
- //when creating new ports we don't try to re-bind a port number that we are currently still using
- recordPreviouslyUsedPortNumberIfNecessary(removedPort, removedPort.getPort());
- }
+ if(oldState == State.ACTIVE)
+ {
+ //Record the originally used port numbers of previously-active ports being deleted, to ensure
+ //when creating new ports we don't try to re-bind a port number that we are currently still using
+ recordPreviouslyUsedPortNumberIfNecessary(port, port.getPort());
}
- return removedPort != null;
+
+ return port != null;
}
private boolean deleteAuthenticationProvider(AuthenticationProvider<?> authenticationProvider)
{
- AuthenticationProvider removedAuthenticationProvider;
- synchronized (_authenticationProviders)
- {
- removedAuthenticationProvider = _authenticationProviders.remove(authenticationProvider.getId());
- }
-
- if(removedAuthenticationProvider != null)
+ if(authenticationProvider != null)
{
- removedAuthenticationProvider.removeChangeListener(this);
+ authenticationProvider.removeChangeListener(this);
}
-
- return removedAuthenticationProvider != null;
+ return true;
}
private void addVirtualHost(VirtualHost<?,?,?> virtualHost)
{
- synchronized (_vhostAdapters)
- {
- String name = virtualHost.getName();
- if (_vhostAdapters.containsKey(name))
- {
- throw new IllegalConfigurationException("Virtual host with name " + name + " is already specified!");
- }
- _vhostAdapters.put(name, virtualHost);
- }
+
virtualHost.addChangeListener(this);
}
@@ -883,16 +738,7 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
if (desiredState == State.ACTIVE)
{
initialiseStatisticsReporting();
- changeState(_groupProviders, currentState, State.ACTIVE, false);
- changeState(_authenticationProviders, currentState, State.ACTIVE, false);
- changeState(_accessControlProviders, currentState, State.ACTIVE, false);
-
-
- changeState(_vhostAdapters, currentState, State.ACTIVE, false);
-
- changeState(_portAdapters, currentState,State.ACTIVE, false);
- changeState(_plugins, currentState,State.ACTIVE, false);
-
+ changeChildState(currentState, State.ACTIVE, false);
if (isManagementMode())
{
_eventLogger.message(BrokerMessages.MANAGEMENT_MODE(BrokerOptions.MANAGEMENT_MODE_USER_NAME,
@@ -908,49 +754,53 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
_reportingTimer.cancel();
}
- changeState(_plugins, currentState,State.STOPPED, true);
- changeState(_portAdapters, currentState, State.STOPPED, true);
- changeState(_vhostAdapters,currentState, State.STOPPED, true);
- changeState(_authenticationProviders, currentState, State.STOPPED, true);
- changeState(_groupProviders, currentState, State.STOPPED, true);
- _virtualHostRegistry.close();
+ changeChildState(currentState, State.STOPPED, true);
return true;
}
return false;
}
- private void changeState(Map<?, ? extends ConfiguredObject> configuredObjectMap, State currentState, State desiredState, boolean swallowException)
+ private void changeChildState(final State currentState,
+ final State desiredState,
+ final boolean swallowException)
{
- synchronized(configuredObjectMap)
+ runTask(new TaskExecutor.VoidTask()
{
- Collection<? extends ConfiguredObject> adapters = configuredObjectMap.values();
- for (ConfiguredObject configuredObject : adapters)
+ @Override
+ public void execute()
{
- if (State.ACTIVE.equals(desiredState) && State.QUIESCED.equals(configuredObject.getState()))
+ for (Class<? extends ConfiguredObject> clazz : Model.getInstance().getChildTypes(getCategoryClass()))
{
- if (LOGGER.isDebugEnabled())
+ for (ConfiguredObject configuredObject : getChildren(clazz))
{
- LOGGER.debug(configuredObject + " cannot be activated as it is " +State.QUIESCED);
- }
- continue;
- }
- try
- {
- configuredObject.setDesiredState(currentState, desiredState);
- }
- catch(RuntimeException e)
- {
- if (swallowException)
- {
- LOGGER.error("Failed to stop " + configuredObject, e);
- }
- else
- {
- throw e;
+ if (State.ACTIVE.equals(desiredState) && State.QUIESCED.equals(configuredObject.getState()))
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug(configuredObject + " cannot be activated as it is " + State.QUIESCED);
+ }
+ continue;
+ }
+ try
+ {
+ configuredObject.setDesiredState(currentState, desiredState);
+ }
+ catch (RuntimeException e)
+ {
+ if (swallowException)
+ {
+ LOGGER.error("Failed to stop " + configuredObject, e);
+ }
+ else
+ {
+ throw e;
+ }
+ }
}
}
}
- }
+ });
+
}
@Override
@@ -1023,24 +873,14 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
private void addPlugin(ConfiguredObject<?> plugin)
{
- synchronized(_plugins)
- {
- if (_plugins.containsKey(plugin.getId()))
- {
- throw new IllegalConfigurationException("Plugin with id '" + plugin.getId() + "' is already registered!");
- }
- _plugins.put(plugin.getId(), plugin);
- }
plugin.addChangeListener(this);
}
private Collection<ConfiguredObject<?>> getPlugins()
{
- synchronized(_plugins)
- {
- return Collections.unmodifiableCollection(_plugins.values());
- }
+ Collection children = getChildren(Plugin.class);
+ return children;
}
@Override
@@ -1058,7 +898,7 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
@Override
public VirtualHost findVirtualHostByName(String name)
{
- return _vhostAdapters.get(name);
+ return getChildByName(VirtualHost.class, name);
}
@Override
@@ -1094,31 +934,15 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
@Override
public Collection<KeyStore<?>> getKeyStores()
{
- synchronized(_keyStores)
- {
- return Collections.unmodifiableCollection(_keyStores.values());
- }
+ Collection children = getChildren(KeyStore.class);
+ return children;
}
@Override
public Collection<TrustStore<?>> getTrustStores()
{
- synchronized(_trustStores)
- {
- return Collections.unmodifiableCollection(_trustStores.values());
- }
- }
-
- @Override
- public VirtualHostRegistry getVirtualHostRegistry()
- {
- return _virtualHostRegistry;
- }
-
- @Override
- public TaskExecutor getTaskExecutor()
- {
- return super.getTaskExecutor();
+ Collection children = getChildren(TrustStore.class);
+ return children;
}
@Override
@@ -1149,10 +973,8 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
@Override
public Collection<AccessControlProvider<?>> getAccessControlProviders()
{
- synchronized (_accessControlProviders)
- {
- return new ArrayList<AccessControlProvider<?>>(_accessControlProviders.values());
- }
+ Collection children = getChildren(AccessControlProvider.class);
+ return children;
}
private void recordPreviouslyUsedPortNumberIfNecessary(Port port, Integer portNumber)
@@ -1232,9 +1054,12 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
_messagesReceived.reset();
_dataReceived.reset();
- for (VirtualHostImpl vhost : _virtualHostRegistry.getVirtualHosts())
+ for (VirtualHost vhost : getVirtualHosts())
{
- vhost.resetStatistics();
+ if(vhost instanceof VirtualHostImpl)
+ {
+ ((VirtualHostImpl) vhost).resetStatistics();
+ }
}
}
@@ -1285,27 +1110,37 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
_eventLogger.message(BrokerMessages.STATS_MSGS(RECEIVED,
_messagesReceived.getPeak(),
_messagesReceived.getTotal()));
- Collection<VirtualHostImpl> hosts = _virtualHostRegistry.getVirtualHosts();
+ Collection<VirtualHost<?,?,?>> hosts = getVirtualHosts();
- if (hosts.size() > 1)
- {
- for (VirtualHostImpl vhost : hosts)
+ for (VirtualHost vhost : hosts)
{
- String name = vhost.getName();
- StatisticsCounter dataDelivered = vhost.getDataDeliveryStatistics();
- StatisticsCounter messagesDelivered = vhost.getMessageDeliveryStatistics();
- StatisticsCounter dataReceived = vhost.getDataReceiptStatistics();
- StatisticsCounter messagesReceived = vhost.getMessageReceiptStatistics();
- EventLogger logger = vhost.getEventLogger();
- logger.message(VirtualHostMessages.STATS_DATA(name,
- DELIVERED,
- dataDelivered.getPeak() / 1024.0,
- dataDelivered.getTotal()));
- logger.message(VirtualHostMessages.STATS_MSGS(name, DELIVERED, messagesDelivered.getPeak(), messagesDelivered.getTotal()));
- logger.message(VirtualHostMessages.STATS_DATA(name, RECEIVED, dataReceived.getPeak() / 1024.0, dataReceived.getTotal()));
- logger.message(VirtualHostMessages.STATS_MSGS(name, RECEIVED, messagesReceived.getPeak(), messagesReceived.getTotal()));
+ if(vhost instanceof VirtualHostImpl)
+ {
+ VirtualHostImpl vhostImpl = (VirtualHostImpl) vhost;
+ String name = vhost.getName();
+ StatisticsCounter dataDelivered = vhostImpl.getDataDeliveryStatistics();
+ StatisticsCounter messagesDelivered = vhostImpl.getMessageDeliveryStatistics();
+ StatisticsCounter dataReceived = vhostImpl.getDataReceiptStatistics();
+ StatisticsCounter messagesReceived = vhostImpl.getMessageReceiptStatistics();
+ EventLogger logger = vhostImpl.getEventLogger();
+ logger.message(VirtualHostMessages.STATS_DATA(name,
+ DELIVERED,
+ dataDelivered.getPeak() / 1024.0,
+ dataDelivered.getTotal()));
+ logger.message(VirtualHostMessages.STATS_MSGS(name,
+ DELIVERED,
+ messagesDelivered.getPeak(),
+ messagesDelivered.getTotal()));
+ logger.message(VirtualHostMessages.STATS_DATA(name,
+ RECEIVED,
+ dataReceived.getPeak() / 1024.0,
+ dataReceived.getTotal()));
+ logger.message(VirtualHostMessages.STATS_MSGS(name,
+ RECEIVED,
+ messagesReceived.getPeak(),
+ messagesReceived.getTotal()));
+ }
}
- }
if (_reset)
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java
index 45b02c7ab3..4121e24368 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -420,7 +421,7 @@ abstract public class AbstractPort<X extends AbstractPort<X>> extends AbstractCo
//ManagementMode needs this relaxed to allow its overriding management ports to be inserted.
//Enforce only a single port of each management protocol, as the plugins will only use one.
- Collection<Port<?>> existingPorts = broker.getPorts();
+ Collection<Port<?>> existingPorts = new HashSet<Port<?>>(broker.getPorts());
existingPorts.remove(this);
for (Port<?> existingPort : existingPorts)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
index a986cd8179..d694db264d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
@@ -24,6 +24,7 @@ import org.apache.qpid.server.model.AuthenticationProvider;
import org.apache.qpid.server.model.ManagedAttribute;
import org.apache.qpid.server.model.ManagedObject;
import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@ManagedObject( category = false, type = "AMQP")
public interface AmqpPort<X extends AmqpPort<X>> extends Port<X>
@@ -53,4 +54,6 @@ public interface AmqpPort<X extends AmqpPort<X>> extends Port<X>
@ManagedAttribute( automate = true, mandatory = true )
AuthenticationProvider getAuthenticationProvider();
+
+ VirtualHostImpl getVirtualHost(String name);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
index 8c6744fbf2..ad52f420b6 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
@@ -41,11 +41,13 @@ import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.TrustStore;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.plugin.QpidServiceLoader;
import org.apache.qpid.server.plugin.TransportProviderFactory;
import org.apache.qpid.server.transport.AcceptingTransport;
import org.apache.qpid.server.transport.TransportProvider;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.transport.network.security.ssl.QpidMultipleTrustManager;
public class AmqpPortImpl extends AbstractPortWithAuthProvider<AmqpPortImpl> implements AmqpPort<AmqpPortImpl>
@@ -89,6 +91,17 @@ public class AmqpPortImpl extends AbstractPortWithAuthProvider<AmqpPortImpl> imp
return _receiveBufferSize;
}
+ @Override
+ public VirtualHostImpl getVirtualHost(String name)
+ {
+ // TODO - aliases
+ if(name == null || name.trim().length() == 0)
+ {
+ name = _broker.getDefaultVirtualHost();
+ }
+ return (VirtualHostImpl) _broker.getChildByName(VirtualHost.class, name);
+ }
+
protected Set<Protocol> getDefaultProtocols()
{
Set<Protocol> defaultProtocols = EnumSet.of(Protocol.AMQP_0_8, Protocol.AMQP_0_9, Protocol.AMQP_0_9_1,
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManager.java
index 2800064298..5db250a90f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManager.java
@@ -45,7 +45,6 @@ import javax.xml.bind.DatatypeConverter;
import org.apache.qpid.server.configuration.ConfiguredObjectRecoverer;
import org.apache.qpid.server.configuration.RecovererProvider;
-import org.apache.qpid.server.configuration.updater.ChangeAttributesTask;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.Broker;
@@ -255,42 +254,34 @@ public class ScramSHA1AuthenticationManager
@Override
public boolean createUser(final String username, final String password, final Map<String, String> attributes)
{
- if (getTaskExecutor().isTaskExecutorThread())
+ return runTask(new TaskExecutor.Task<Boolean>()
{
- getSecurityManager().authoriseUserOperation(Operation.CREATE, username);
- if(_users.containsKey(username))
+ @Override
+ public Boolean execute()
{
- throw new IllegalArgumentException("User '"+username+"' already exists");
- }
- try
- {
- Map<String,Object> userAttrs = new HashMap<String, Object>();
- userAttrs.put(User.ID, UUID.randomUUID());
- userAttrs.put(User.NAME, username);
- userAttrs.put(User.PASSWORD, createStoredPassword(password));
- userAttrs.put(User.TYPE, SCRAM_USER_TYPE);
- ScramAuthUser user = new ScramAuthUser(userAttrs, this);
- user.create();
-
- return true;
- }
- catch (SaslException e)
- {
- throw new IllegalArgumentException(e);
- }
- }
- else
- {
- return getTaskExecutor().submitAndWait(new TaskExecutor.Task<Boolean>()
- {
- @Override
- public Boolean call()
+ getSecurityManager().authoriseUserOperation(Operation.CREATE, username);
+ if (_users.containsKey(username))
{
- return createUser(username, password, attributes);
+ throw new IllegalArgumentException("User '" + username + "' already exists");
}
- });
- }
-
+ try
+ {
+ Map<String, Object> userAttrs = new HashMap<String, Object>();
+ userAttrs.put(User.ID, UUID.randomUUID());
+ userAttrs.put(User.NAME, username);
+ userAttrs.put(User.PASSWORD, createStoredPassword(password));
+ userAttrs.put(User.TYPE, SCRAM_USER_TYPE);
+ ScramAuthUser user = new ScramAuthUser(userAttrs, ScramSHA1AuthenticationManager.this);
+ user.create();
+
+ return true;
+ }
+ catch (SaslException e)
+ {
+ throw new IllegalArgumentException(e);
+ }
+ }
+ });
}
private SecurityManager getSecurityManager()
@@ -301,115 +292,64 @@ public class ScramSHA1AuthenticationManager
@Override
public void deleteUser(final String user) throws AccountNotFoundException
{
- if (getTaskExecutor().isTaskExecutorThread())
+ runTask(new TaskExecutor.VoidTaskWithException<AccountNotFoundException>()
{
-
- final ScramAuthUser authUser = getUser(user);
- if(authUser != null)
- {
- authUser.setState(State.ACTIVE, State.DELETED);
- }
- else
+ @Override
+ public void execute() throws AccountNotFoundException
{
- throw new AccountNotFoundException("No such user: '" + user + "'");
- }
- }
- else
- {
- AccountNotFoundException e =
- getTaskExecutor().submitAndWait(new TaskExecutor.Task<AccountNotFoundException>() {
-
- @Override
- public AccountNotFoundException call()
+ final ScramAuthUser authUser = getUser(user);
+ if(authUser != null)
{
- try
- {
- deleteUser(user);
- return null;
- }
- catch (AccountNotFoundException e)
- {
- return e;
- }
-
+ authUser.setState(State.ACTIVE, State.DELETED);
+ }
+ else
+ {
+ throw new AccountNotFoundException("No such user: '" + user + "'");
}
- });
-
- if(e != null)
- {
- throw e;
}
- }
+ });
}
@Override
public void setPassword(final String username, final String password) throws AccountNotFoundException
{
- if (getTaskExecutor().isTaskExecutorThread())
+ runTask(new TaskExecutor.VoidTaskWithException<AccountNotFoundException>()
{
- final ScramAuthUser authUser = getUser(username);
- if(authUser != null)
- {
- authUser.setPassword(password);
- }
- else
+ @Override
+ public void execute() throws AccountNotFoundException
{
- throw new AccountNotFoundException("No such user: '" + username + "'");
- }
- }
- else
- {
- AccountNotFoundException e =
- getTaskExecutor().submitAndWait(new TaskExecutor.Task<AccountNotFoundException>()
- {
-
- @Override
- public AccountNotFoundException call()
- {
- try
- {
- setPassword(username, password);
- return null;
- }
- catch (AccountNotFoundException e)
- {
- return e;
- }
- }
- });
-
- if (e != null)
- {
- throw e;
+ final ScramAuthUser authUser = getUser(username);
+ if (authUser != null)
+ {
+ authUser.setPassword(password);
+ }
+ else
+ {
+ throw new AccountNotFoundException("No such user: '" + username + "'");
+ }
}
- }
+ });
}
@Override
public Map<String, Map<String, String>> getUsers()
{
- if (getTaskExecutor().isTaskExecutorThread())
+ return runTask(new TaskExecutor.Task<Map<String, Map<String, String>>>()
{
- Map<String, Map<String,String>> users = new HashMap<String, Map<String, String>>();
- for(String user : _users.keySet())
+ @Override
+ public Map<String, Map<String, String>> execute()
{
- users.put(user, Collections.<String,String>emptyMap());
- }
- return users;
- }
- else
- {
- return getTaskExecutor().submitAndWait(new TaskExecutor.Task<Map<String, Map<String, String>>>()
- {
- @Override
- public Map<String, Map<String, String>> call()
+
+ Map<String, Map<String, String>> users = new HashMap<String, Map<String, String>>();
+ for (String user : _users.keySet())
{
- return getUsers();
+ users.put(user, Collections.<String, String>emptyMap());
}
- });
- }
+ return users;
+ }
+ });
}
@Override
@@ -491,27 +431,31 @@ public class ScramSHA1AuthenticationManager
public void setAttributes(final Map<String, Object> attributes)
throws IllegalStateException, AccessControlException, IllegalArgumentException
{
- if (getTaskExecutor().isTaskExecutorThread())
+ runTask(new TaskExecutor.VoidTask()
{
- Map<String,Object> modifiedAttributes = new HashMap<String, Object>(attributes);
- final String newPassword = (String) attributes.get(User.PASSWORD);
- if(attributes.containsKey(User.PASSWORD) && !newPassword.equals(getActualAttributes().get(User.PASSWORD)))
+
+ @Override
+ public void execute()
{
- try
- {
- modifiedAttributes.put(User.PASSWORD, _authenticationManager.createStoredPassword(newPassword));
- }
- catch (SaslException e)
+ Map<String, Object> modifiedAttributes = new HashMap<String, Object>(attributes);
+ final String newPassword = (String) attributes.get(User.PASSWORD);
+ if (attributes.containsKey(User.PASSWORD)
+ && !newPassword.equals(getActualAttributes().get(User.PASSWORD)))
{
- throw new IllegalArgumentException(e);
+ try
+ {
+ modifiedAttributes.put(User.PASSWORD,
+ _authenticationManager.createStoredPassword(newPassword));
+ }
+ catch (SaslException e)
+ {
+ throw new IllegalArgumentException(e);
+ }
}
+ ScramSHA1AuthenticationManager.ScramAuthUser.super.setAttributes(modifiedAttributes);
}
- super.setAttributes(modifiedAttributes);
- }
- else
- {
- getTaskExecutor().submitAndWait(new ChangeAttributesTask(this, attributes));
- }
+ });
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 5189bc6fee..510c071dd3 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -172,7 +172,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
_broker = broker;
_dtxRegistry = new DtxRegistry();
- _eventLogger = _broker.getVirtualHostRegistry().getEventLogger();
+ _eventLogger = _broker.getParent(SystemContext.class).getEventLogger();
_eventLogger.message(VirtualHostMessages.CREATED(getName()));
@@ -244,8 +244,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
getMessageStore().addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
getMessageStore().addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
- _broker.getVirtualHostRegistry().registerVirtualHost(this);
-
synchronized(_aliases)
{
@@ -869,11 +867,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
return null;
}
- public VirtualHostRegistry getVirtualHostRegistry()
- {
- return _broker.getVirtualHostRegistry();
- }
-
public void registerMessageDelivered(long messageSize)
{
_messagesDelivered.registerEvent(1L);
@@ -1448,15 +1441,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
else if (desiredState == State.STOPPED)
{
- try
- {
- close();
- }
- finally
- {
- _broker.getVirtualHostRegistry().unregisterVirtualHost(this);
- }
-
+ close();
return true;
}
else if (desiredState == State.DELETED)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
index d0737e8311..68469118f6 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
@@ -108,8 +108,6 @@ public interface VirtualHostImpl< X extends VirtualHostImpl<X,Q,E>, Q extends AM
int getHouseKeepingActiveCount();
- VirtualHostRegistry getVirtualHostRegistry();
-
DtxRegistry getDtxRegistry();
LinkRegistry getLinkRegistry(String remoteContainerId);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
deleted file mode 100644
index cc9ff549c4..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost;
-
-import org.apache.qpid.common.Closeable;
-import org.apache.qpid.server.logging.EventLogger;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-public class VirtualHostRegistry implements Closeable
-{
- private final Map<String, VirtualHostImpl> _registry = new ConcurrentHashMap<String, VirtualHostImpl>();
- private String _defaultVirtualHostName;
- private final EventLogger _eventLogger;
-
-
- public VirtualHostRegistry(EventLogger eventLogger)
- {
- _eventLogger = eventLogger;
- }
-
- public synchronized void registerVirtualHost(VirtualHostImpl host)
- {
- if(_registry.containsKey(host.getName()))
- {
- throw new IllegalArgumentException("Virtual Host with name " + host.getName() + " already registered.");
- }
- _registry.put(host.getName(),host);
- }
-
- public synchronized void unregisterVirtualHost(VirtualHostImpl host)
- {
- _registry.remove(host.getName());
- }
-
- public VirtualHostImpl getVirtualHost(String name)
- {
- if(name == null || name.trim().length() == 0 || "/".equals(name.trim()))
- {
- name = getDefaultVirtualHostName();
- }
-
- return _registry.get(name);
- }
-
- public VirtualHostImpl getDefaultVirtualHost()
- {
- return getVirtualHost(getDefaultVirtualHostName());
- }
-
- private String getDefaultVirtualHostName()
- {
- return _defaultVirtualHostName;
- }
-
- public void setDefaultVirtualHostName(String defaultVirtualHostName)
- {
- _defaultVirtualHostName = defaultVirtualHostName;
- }
-
-
- public Collection<VirtualHostImpl> getVirtualHosts()
- {
- return new ArrayList<VirtualHostImpl>(_registry.values());
- }
-
- public void close()
- {
- for (VirtualHostImpl virtualHost : getVirtualHosts())
- {
- virtualHost.close();
- }
- }
-
- public EventLogger getEventLogger()
- {
- return _eventLogger;
- }
-}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostCreationTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostCreationTest.java
index ea1d22f9ef..5831fe3caa 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostCreationTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostCreationTest.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.configuration.startup;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -38,12 +39,12 @@ import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.Model;
+import org.apache.qpid.server.model.SystemContext;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestMemoryMessageStore;
import org.apache.qpid.server.virtualhost.StandardVirtualHost;
-import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
public class VirtualHostCreationTest extends TestCase
{
@@ -53,13 +54,14 @@ public class VirtualHostCreationTest extends TestCase
SecurityManager securityManager = mock(SecurityManager.class);
ConfigurationEntry entry = mock(ConfigurationEntry.class);
ConfiguredObjectFactory objectFactory = new ConfiguredObjectFactoryImpl(Model.getInstance());
+ SystemContext systemContext = mock(SystemContext.class);
+
Broker parent = mock(Broker.class);
when(parent.getObjectFactory()).thenReturn(objectFactory);
when(parent.getSecurityManager()).thenReturn(securityManager);
when(parent.getCategoryClass()).thenReturn(Broker.class);
- VirtualHostRegistry virtualHostRegistry = mock(VirtualHostRegistry.class);
- when(virtualHostRegistry.getEventLogger()).thenReturn(mock(EventLogger.class));
- when(parent.getVirtualHostRegistry()).thenReturn(virtualHostRegistry);
+ when(systemContext.getEventLogger()).thenReturn(mock(EventLogger.class));
+ when(parent.getParent(eq(SystemContext.class))).thenReturn(systemContext);
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(VirtualHost.NAME, getName());
@@ -90,11 +92,11 @@ public class VirtualHostCreationTest extends TestCase
public void checkMandatoryAttributesAreValidated(String[] mandatoryAttributes, Map<String, Object> attributes)
{
SecurityManager securityManager = mock(SecurityManager.class);
+ SystemContext systemContext = mock(SystemContext.class);
Broker parent = mock(Broker.class);
when(parent.getSecurityManager()).thenReturn(securityManager);
- VirtualHostRegistry virtualHostRegistry = mock(VirtualHostRegistry.class);
- when(virtualHostRegistry.getEventLogger()).thenReturn(mock(EventLogger.class));
- when(parent.getVirtualHostRegistry()).thenReturn(virtualHostRegistry);
+ when(parent.getParent(eq(SystemContext.class))).thenReturn(systemContext);
+ when(systemContext.getEventLogger()).thenReturn(mock(EventLogger.class));
for (String name : mandatoryAttributes)
{
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/TaskExecutorTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/TaskExecutorTest.java
index 083160a31d..fb331c73c1 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/TaskExecutorTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/TaskExecutorTest.java
@@ -135,10 +135,10 @@ public class TaskExecutorTest extends TestCase
public void testSubmitAndWait() throws Exception
{
_executor.start();
- Object result = _executor.submitAndWait(new TaskExecutor.Task<Object>()
+ Object result = _executor.run(new TaskExecutor.Task<Object>()
{
@Override
- public String call()
+ public String execute()
{
return "DONE";
}
@@ -149,7 +149,7 @@ public class TaskExecutorTest extends TestCase
public void testSubmitAndWaitInNotAuthorizedContext()
{
_executor.start();
- Object subject = _executor.submitAndWait(new SubjectRetriever());
+ Object subject = _executor.run(new SubjectRetriever());
assertNull("Subject must be null", subject);
}
@@ -162,7 +162,7 @@ public class TaskExecutorTest extends TestCase
@Override
public Object run()
{
- return _executor.submitAndWait(new SubjectRetriever());
+ return _executor.run(new SubjectRetriever());
}
});
assertEquals("Unexpected subject", subject, result);
@@ -176,7 +176,7 @@ public class TaskExecutorTest extends TestCase
@Override
public Object run()
{
- return _executor.submitAndWait(new SubjectRetriever());
+ return _executor.run(new SubjectRetriever());
}
});
assertEquals("Unexpected subject", null, result);
@@ -188,11 +188,11 @@ public class TaskExecutorTest extends TestCase
_executor.start();
try
{
- _executor.submitAndWait(new TaskExecutor.Task<Object>()
+ _executor.run(new TaskExecutor.Task<Object>()
{
@Override
- public Void call()
+ public Void execute()
{
throw exception;
}
@@ -215,15 +215,15 @@ public class TaskExecutorTest extends TestCase
@Override
public Object run()
{
- _executor.submitAndWait(new TaskExecutor.Task<Object>()
- {
- @Override
- public Void call()
+ _executor.run(new TaskExecutor.Task<Object>()
{
- taskSubject.set(Subject.getSubject(AccessController.getContext()));
- return null;
- }
- });
+ @Override
+ public Void execute()
+ {
+ taskSubject.set(Subject.getSubject(AccessController.getContext()));
+ return null;
+ }
+ });
return null;
}
});
@@ -234,7 +234,7 @@ public class TaskExecutorTest extends TestCase
private class SubjectRetriever implements TaskExecutor.Task<Subject>
{
@Override
- public Subject call()
+ public Subject execute()
{
return Subject.getSubject(AccessController.getContext());
}
@@ -251,7 +251,7 @@ public class TaskExecutorTest extends TestCase
}
@Override
- public Void call()
+ public Void execute()
{
if (_waitLatch != null)
{
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
index 064cfe651a..ec108a419d 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
@@ -43,6 +43,7 @@ public class VirtualHostTest extends QpidTestCase
private Broker _broker;
private StatisticsGatherer _statisticsGatherer;
private RecovererProvider _recovererProvider;
+ private TaskExecutor _taskExecutor;
@Override
protected void setUp() throws Exception
@@ -50,14 +51,22 @@ public class VirtualHostTest extends QpidTestCase
super.setUp();
_broker = BrokerTestHelper.createBrokerMock();
- TaskExecutor taskExecutor = mock(TaskExecutor.class);
- when(taskExecutor.isTaskExecutorThread()).thenReturn(true);
- when(_broker.getTaskExecutor()).thenReturn(taskExecutor);
+ _taskExecutor = new TaskExecutor();
+ _taskExecutor.start();
+ when(_broker.getTaskExecutor()).thenReturn(_taskExecutor);
_recovererProvider = mock(RecovererProvider.class);
_statisticsGatherer = mock(StatisticsGatherer.class);
}
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ _taskExecutor.stopImmediately();
+ super.tearDown();
+ }
+
public void testInitialisingState()
{
VirtualHost host = createHost();
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderTest.java
index 9c6a840d76..d8ee851813 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderTest.java
@@ -49,6 +49,7 @@ public class FileSystemPreferencesProviderTest extends QpidTestCase
private Broker _broker;
private String _user1, _user2;
private File _preferencesFile;
+ private TaskExecutor _taskExecutor;
protected void setUp() throws Exception
{
@@ -60,9 +61,10 @@ public class FileSystemPreferencesProviderTest extends QpidTestCase
_preferencesFile = TestFileUtils.createTempFile(this, ".prefs.json", TEST_PREFERENCES);
_broker = BrokerTestHelper.createBrokerMock();
- TaskExecutor taskExecutor = mock(TaskExecutor.class);
- when(taskExecutor.isTaskExecutorThread()).thenReturn(true);
- when(_broker.getTaskExecutor()).thenReturn(taskExecutor);
+ _taskExecutor = new TaskExecutor();
+ _taskExecutor.start();
+ when(_broker.getTaskExecutor()).thenReturn(_taskExecutor);
+
when(_authenticationProvider.getParent(Broker.class)).thenReturn(_broker);
}
@@ -76,6 +78,7 @@ public class FileSystemPreferencesProviderTest extends QpidTestCase
}
BrokerTestHelper.tearDown();
_preferencesFile.delete();
+ _taskExecutor.stopImmediately();
}
finally
{
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
index 01f4ed4299..e350774772 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
@@ -62,7 +62,6 @@ import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
import org.apache.qpid.server.virtualhost.QueueExistsException;
import org.apache.qpid.server.virtualhost.StandardVirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
public class BrokerTestHelper
{
@@ -80,17 +79,22 @@ public class BrokerTestHelper
{
ConfiguredObjectFactory objectFactory = new ConfiguredObjectFactoryImpl(Model.getInstance());
SubjectCreator subjectCreator = mock(SubjectCreator.class);
+
when(subjectCreator.getMechanisms()).thenReturn("");
Broker broker = mock(Broker.class);
when(broker.getConnection_sessionCountLimit()).thenReturn(1);
when(broker.getConnection_closeWhenNoRoute()).thenReturn(false);
when(broker.getId()).thenReturn(UUID.randomUUID());
when(broker.getSubjectCreator(any(SocketAddress.class))).thenReturn(subjectCreator);
- when(broker.getVirtualHostRegistry()).thenReturn(new VirtualHostRegistry(new EventLogger()));
when(broker.getSecurityManager()).thenReturn(new SecurityManager(mock(Broker.class), false));
when(broker.getObjectFactory()).thenReturn(objectFactory);
when(broker.getEventLogger()).thenReturn(new EventLogger());
when(broker.getCategoryClass()).thenReturn(Broker.class);
+
+ SystemContext systemContext = mock(SystemContext.class);
+ when(systemContext.getEventLogger()).thenReturn(new EventLogger());
+ when(broker.getParent(eq(SystemContext.class))).thenReturn(systemContext);
+
return broker;
}
@@ -102,7 +106,7 @@ public class BrokerTestHelper
{
}
- public static VirtualHostImpl createVirtualHost(VirtualHostRegistry virtualHostRegistry, Map<String,Object> attributes)
+ public static VirtualHostImpl createVirtualHost(Map<String, Object> attributes)
throws Exception
{
@@ -115,7 +119,6 @@ public class BrokerTestHelper
ConfiguredObjectFactory objectFactory = new ConfiguredObjectFactoryImpl(Model.getInstance());
Broker broker = mock(Broker.class);
when(broker.getParent(eq(SystemContext.class))).thenReturn(systemContext);
- when(broker.getVirtualHostRegistry()).thenReturn(virtualHostRegistry);
when(broker.getTaskExecutor()).thenReturn(TASK_EXECUTOR);
SecurityManager securityManager = new SecurityManager(broker, false);
when(broker.getSecurityManager()).thenReturn(securityManager);
@@ -133,11 +136,6 @@ public class BrokerTestHelper
public static VirtualHostImpl createVirtualHost(String name) throws Exception
{
- return createVirtualHost(name, new VirtualHostRegistry(new EventLogger()));
- }
-
- public static VirtualHostImpl createVirtualHost(String name, VirtualHostRegistry virtualHostRegistry) throws Exception
- {
Map<String,Object> attributes = new HashMap<String, Object>();
attributes.put(org.apache.qpid.server.model.VirtualHost.TYPE, StandardVirtualHost.TYPE);
@@ -147,7 +145,7 @@ public class BrokerTestHelper
attributes.put(org.apache.qpid.server.model.VirtualHost.MESSAGE_STORE_SETTINGS, messageStoreSettings);
attributes.put(org.apache.qpid.server.model.VirtualHost.NAME, name);
- return createVirtualHost(virtualHostRegistry, attributes);
+ return createVirtualHost(attributes);
}
public static AMQSessionModel createSession(int channelId, AMQConnectionModel connection)
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
index 85eede527a..942781e7ba 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
@@ -66,12 +66,6 @@ public class MockVirtualHost implements VirtualHostImpl<MockVirtualHost, AMQQueu
}
- @Override
- public VirtualHostRegistry getVirtualHostRegistry()
- {
- return null;
- }
-
public AuthenticationManager getAuthenticationManager()
{
return null;
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
index c2ef68d812..6ea52549bf 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
@@ -44,6 +44,7 @@ import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.properties.ConnectionStartProperties;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
@@ -203,7 +204,8 @@ public class ServerConnectionDelegate extends ServerDelegate
{
vhostName = "";
}
- vhost = _broker.getVirtualHostRegistry().getVirtualHost(vhostName);
+
+ vhost = ((AmqpPort)sconn.getPort()).getVirtualHost(vhostName);
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java
index 3608b81e2a..24391f6d77 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java
@@ -30,6 +30,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ConnectionOpenBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.v0_8.state.AMQState;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
@@ -72,7 +73,7 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con
virtualHostName = body.getVirtualHost() == null ? null : String.valueOf(body.getVirtualHost());
}
- VirtualHostImpl virtualHost = stateManager.getVirtualHostRegistry().getVirtualHost(virtualHostName);
+ VirtualHostImpl virtualHost = ((AmqpPort)stateManager.getProtocolSession().getPort()).getVirtualHost(virtualHostName);
if (virtualHost == null)
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java
index 93c7c2e778..af2ceeca7f 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java
@@ -34,16 +34,13 @@ import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
-import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import javax.security.auth.Subject;
-import java.security.PrivilegedAction;
+
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
-import java.util.concurrent.CopyOnWriteArraySet;
/**
* The state manager is responsible for managing the state of the protocol session. <p/> For each AMQProtocolHandler
@@ -142,11 +139,6 @@ public class AMQStateManager implements AMQMethodListener
}
- public VirtualHostRegistry getVirtualHostRegistry()
- {
- return _broker.getVirtualHostRegistry();
- }
-
public AMQProtocolSession getProtocolSession()
{
return _protocolSession;
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
index 031febbb1a..5b8d3a488e 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
@@ -49,6 +49,7 @@ import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.SessionModelListener;
@@ -133,7 +134,8 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
{
host = _broker.getDefaultVirtualHost();
}
- _vhost = _broker.getVirtualHostRegistry().getVirtualHost(host);
+
+ _vhost = ((AmqpPort)_port).getVirtualHost(host);
if(_vhost == null)
{
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
index e674cef2d0..de2b594211 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
@@ -35,7 +35,6 @@ import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
@@ -51,26 +50,16 @@ public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase
super.setUp();
BrokerTestHelper.setUp();
_broker = BrokerTestHelper.createBrokerMock();
- VirtualHostRegistry virtualHostRegistry = _broker.getVirtualHostRegistry();
when(_broker.getAttribute(Broker.DEFAULT_VIRTUAL_HOST)).thenReturn("default");
when(_broker.getDefaultVirtualHost()).thenReturn("default");
- // AMQP 1-0 connection needs default vhost to be present
- _virtualHost = BrokerTestHelper.createVirtualHost("default", virtualHostRegistry);
}
@Override
protected void tearDown() throws Exception
{
- try
- {
- _virtualHost.close();
- }
- finally
- {
BrokerTestHelper.tearDown();
super.tearDown();
- }
}
private static final byte[] AMQP_0_8_HEADER =
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
index b1941f4d39..1b0f44af49 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
@@ -38,7 +38,6 @@ import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.exchange.TopicExchange;
-import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.Binding;
@@ -61,7 +60,6 @@ import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
import org.apache.qpid.server.virtualhost.StandardVirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.FileUtils;
@@ -180,7 +178,7 @@ public class VirtualHostMessageStoreTest extends QpidTestCase
try
{
- _virtualHost = (AbstractVirtualHost<?>) BrokerTestHelper.createVirtualHost(new VirtualHostRegistry(new EventLogger()), _attributes);
+ _virtualHost = (AbstractVirtualHost<?>) BrokerTestHelper.createVirtualHost(_attributes);
}
catch (Exception e)
{