diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2015-04-12 15:03:34 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2015-04-12 15:03:34 +0000 |
| commit | 3a7a946e952545d34966a5569839b631df92e448 (patch) | |
| tree | 2c4b45c3c404d623dc7d4ede882a4eae8f02b148 /qpid/java | |
| parent | b096615214f5368b0f302954c0c642b057c49301 (diff) | |
| download | qpid-python-3a7a946e952545d34966a5569839b631df92e448.tar.gz | |
QPID-5818 : [Java Broker] creating children from within the configuration thread leads to deadlock as the configuration thread blocks waiting for a task which cannot be executed because it needs the config thread. Instead use asynchronous child creation.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1673014 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
17 files changed, 286 insertions, 254 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java index 852f512ca6..731cfd0895 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java @@ -46,6 +46,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -1651,8 +1652,8 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im @Override public String toString() { - return getClass().getSimpleName() + "[name=" + getName() + ", categoryClass=" + getCategoryClass() + ", type=" - + getType() + ", id=" + getId() + "]"; + return AbstractConfiguredObject.this.getClass().getSimpleName() + "[name=" + getName() + ", categoryClass=" + getCategoryClass() + ", type=" + + getType() + ", id=" + getId() + ", attributes=" + getAttributes() + "]"; } }; } @@ -1662,27 +1663,49 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im public <C extends ConfiguredObject> C createChild(final Class<C> childClass, final Map<String, Object> attributes, final ConfiguredObject... otherParents) { - return _taskExecutor.run(new Task<C>() { + return doSync(createChildAsync(childClass, attributes, otherParents)); + } + @SuppressWarnings("unchecked") + @Override + public <C extends ConfiguredObject> ListenableFuture<C> createChildAsync(final Class<C> childClass, final Map<String, Object> attributes, + final ConfiguredObject... otherParents) + { + return doOnConfigThread(new Callable<ListenableFuture<C>>() + { @Override - public C execute() + public ListenableFuture<C> call() throws Exception { authoriseCreateChild(childClass, attributes, otherParents); - C child = addChild(childClass, attributes, otherParents); - if (child != null) - { - childAdded(child); - } - return child; + return doAfter(addChildAsync(childClass, attributes, otherParents), + new CallableWithArgument<ListenableFuture<C>, C>() + { + + @Override + public ListenableFuture<C> call(final C child) throws Exception + { + if (child != null) + { + childAdded(child); + } + return Futures.immediateFuture(child); + } + }); } }); } + protected <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) { throw new UnsupportedOperationException(); } + protected <C extends ConfiguredObject> ListenableFuture<C> addChildAsync(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) + { + throw new UnsupportedOperationException(); + } + private <C extends ConfiguredObject> void registerChild(final C child) { synchronized(_children) @@ -1861,18 +1884,18 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im doSync(setAttributesAsync(attributes)); } - protected final ChainedListenableFuture doAfter(ListenableFuture<Void> first, final Runnable second) + protected final ChainedListenableFuture<Void> doAfter(ListenableFuture<?> first, final Runnable second) { return doAfter(getTaskExecutor().getExecutor(), first, second); } - protected static final ChainedListenableFuture doAfter(Executor executor, ListenableFuture<Void> first, final Runnable second) + protected static <V> ChainedListenableFuture<Void> doAfter(Executor executor, ListenableFuture<V> first, final Runnable second) { - final ChainedSettableFuture returnVal = new ChainedSettableFuture(executor); - Futures.addCallback(first, new FutureCallback<Void>() + final ChainedSettableFuture<Void> returnVal = new ChainedSettableFuture<Void>(executor); + Futures.addCallback(first, new FutureCallback<V>() { @Override - public void onSuccess(final Void result) + public void onSuccess(final V result) { try { @@ -1895,13 +1918,19 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im return returnVal; } - public static interface ChainedListenableFuture extends ListenableFuture<Void> + public interface CallableWithArgument<V,A> + { + V call(A argument) throws Exception; + } + + public static interface ChainedListenableFuture<V> extends ListenableFuture<V> { - ChainedListenableFuture then(Runnable r); - ChainedListenableFuture then(Callable<ListenableFuture<Void>> r); + ChainedListenableFuture<Void> then(Runnable r); + ChainedListenableFuture<V> then(Callable<ListenableFuture<V>> r); + <A> ChainedListenableFuture<A> then(CallableWithArgument<ListenableFuture<A>,V> r); } - public static class ChainedSettableFuture extends AbstractFuture<Void> implements ChainedListenableFuture + public static class ChainedSettableFuture<V> extends AbstractFuture<V> implements ChainedListenableFuture<V> { private final Executor _exector; @@ -1911,7 +1940,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } @Override - public boolean set(Void value) + public boolean set(V value) { return super.set(value); } @@ -1923,40 +1952,96 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } @Override - public ChainedListenableFuture then(final Runnable r) + public ChainedListenableFuture<Void> then(final Runnable r) { return doAfter(_exector, this, r); } @Override - public ChainedListenableFuture then(final Callable<ListenableFuture<Void>> r) + public ChainedListenableFuture<V> then(final Callable<ListenableFuture<V>> r) { return doAfter(_exector, this,r); } + + @Override + public <A> ChainedListenableFuture<A> then(final CallableWithArgument<ListenableFuture<A>,V> r) + { + return doAfter(_exector, this, r); + } } - protected final ChainedListenableFuture doAfter(ListenableFuture<Void> first, final Callable<ListenableFuture<Void>> second) + protected final <V> ChainedListenableFuture<V> doAfter(ListenableFuture<V> first, final Callable<ListenableFuture<V>> second) { return doAfter(getTaskExecutor().getExecutor(), first, second); } - protected static final ChainedListenableFuture doAfter(final Executor executor, ListenableFuture<Void> first, final Callable<ListenableFuture<Void>> second) + protected final <V,A> ChainedListenableFuture<V> doAfter(ListenableFuture<A> first, final CallableWithArgument<ListenableFuture<V>,A> second) { - final ChainedSettableFuture returnVal = new ChainedSettableFuture(executor); - Futures.addCallback(first, new FutureCallback<Void>() + return doAfter(getTaskExecutor().getExecutor(), first, second); + } + + + protected static <V> ChainedListenableFuture<V> doAfter(final Executor executor, ListenableFuture<V> first, final Callable<ListenableFuture<V>> second) + { + final ChainedSettableFuture<V> returnVal = new ChainedSettableFuture<V>(executor); + Futures.addCallback(first, new FutureCallback<V>() { @Override - public void onSuccess(final Void result) + public void onSuccess(final V result) { try { - final ListenableFuture<Void> future = second.call(); - Futures.addCallback(future, new FutureCallback<Void>() + final ListenableFuture<V> future = second.call(); + Futures.addCallback(future, new FutureCallback<V>() { @Override - public void onSuccess(final Void result) + public void onSuccess(final V result) { - returnVal.set(null); + returnVal.set(result); + } + + @Override + public void onFailure(final Throwable t) + { + returnVal.setException(t); + } + }, executor); + + } + catch(Throwable e) + { + returnVal.setException(e); + } + } + + @Override + public void onFailure(final Throwable t) + { + returnVal.setException(t); + } + }, executor); + + return returnVal; + } + + + protected static <V,A> ChainedListenableFuture<V> doAfter(final Executor executor, ListenableFuture<A> first, final CallableWithArgument<ListenableFuture<V>,A> second) + { + final ChainedSettableFuture<V> returnVal = new ChainedSettableFuture<>(executor); + Futures.addCallback(first, new FutureCallback<A>() + { + @Override + public void onSuccess(final A result) + { + try + { + final ListenableFuture<V> future = second.call(result); + Futures.addCallback(future, new FutureCallback<V>() + { + @Override + public void onSuccess(final V result) + { + returnVal.set(result); } @Override @@ -1983,6 +2068,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im return returnVal; } + @Override public ListenableFuture<Void> setAttributesAsync(final Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java index f97d2dfe14..d1a044f9dd 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java @@ -23,6 +23,8 @@ package org.apache.qpid.server.model; import java.util.HashMap; import java.util.Map; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; @@ -72,14 +74,21 @@ abstract public class AbstractConfiguredObjectTypeFactory<X extends AbstractConf final SettableFuture<X> returnVal = SettableFuture.create(); final X instance = createInstance(attributes, parents); final ListenableFuture<Void> createFuture = instance.createAsync(); - createFuture.addListener(new Runnable() + Futures.addCallback(createFuture, new FutureCallback<Void>() { @Override - public void run() + public void onSuccess(final Void result) { returnVal.set(instance); } - }, MoreExecutors.sameThreadExecutor()); + + @Override + public void onFailure(final Throwable t) + { + returnVal.setException(t); + } + },MoreExecutors.sameThreadExecutor()); + return returnVal; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java index b058ec95b8..bbe7916883 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java @@ -236,6 +236,9 @@ public interface ConfiguredObject<X extends ConfiguredObject<X>> <C extends ConfiguredObject> C createChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents); + <C extends ConfiguredObject> ListenableFuture<C> createChildAsync(Class<C> childClass, + Map<String, Object> attributes, + ConfiguredObject... otherParents); void setAttributes(Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException; ListenableFuture<Void> setAttributesAsync(Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java index 09e911d627..3fc6e13e87 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.model.adapter; import java.security.AccessControlException; import java.security.PrivilegedAction; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -42,7 +43,6 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.common.QpidProperties; import org.apache.qpid.server.BrokerOptions; import org.apache.qpid.server.configuration.IllegalConfigurationException; -import org.apache.qpid.server.configuration.updater.Task; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.LogRecorder; import org.apache.qpid.server.logging.messages.BrokerMessages; @@ -288,7 +288,8 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple } } - final boolean brokerShutdownOnErroredChild = getContextValue(Boolean.class, BROKER_FAIL_STARTUP_WITH_ERRORED_CHILD); + final boolean brokerShutdownOnErroredChild = getContextValue(Boolean.class, + BROKER_FAIL_STARTUP_WITH_ERRORED_CHILD); if (!_parent.isManagementMode() && brokerShutdownOnErroredChild && hasBrokerAnyErroredChildren) { throw new IllegalStateException(String.format("Broker context variable %s is set and the broker has %s children", @@ -497,24 +498,32 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple return children; } - private VirtualHostNode<?> createVirtualHostNode(Map<String, Object> attributes) + private ListenableFuture<VirtualHostNode> createVirtualHostNodeAsync(Map<String, Object> attributes) throws AccessControlException, IllegalArgumentException { - final VirtualHostNode virtualHostNode = getObjectFactory().create(VirtualHostNode.class,attributes, this); - - // permission has already been granted to create the virtual host - // disable further access check on other operations, e.g. create exchange - Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), new PrivilegedAction<Object>() - { - @Override - public Object run() - { - virtualHostNode.start(); - return null; - } - }); - return virtualHostNode; + return doAfter(getObjectFactory().createAsync(VirtualHostNode.class, attributes, this), + new CallableWithArgument<ListenableFuture<VirtualHostNode>, VirtualHostNode>() + { + @Override + public ListenableFuture<VirtualHostNode> call(final VirtualHostNode virtualHostNode) + throws Exception + { + // permission has already been granted to create the virtual host + // disable further access check on other operations, e.g. create exchange + Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), + new PrivilegedAction<Object>() + { + @Override + public Object run() + { + virtualHostNode.start(); + return null; + } + }); + return Futures.immediateFuture(virtualHostNode); + } + }); } @Override @@ -543,140 +552,63 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple @SuppressWarnings("unchecked") @Override - public <C extends ConfiguredObject> C addChild(final Class<C> childClass, final Map<String, Object> attributes, final ConfiguredObject... otherParents) + public <C extends ConfiguredObject> ListenableFuture<C> addChildAsync(final Class<C> childClass, final Map<String, Object> attributes, final ConfiguredObject... otherParents) { - return runTask(new Task<C>() + if (childClass == VirtualHostNode.class) { - @Override - public C execute() - { - if (childClass == VirtualHostNode.class) - { - return (C) createVirtualHostNode(attributes); - } - else if (childClass == Port.class) - { - return (C) createPort(attributes); - } - else if (childClass == AccessControlProvider.class) - { - return (C) createAccessControlProvider(attributes); - } - else if (childClass == AuthenticationProvider.class) - { - return (C) createAuthenticationProvider(attributes); - } - else if (childClass == KeyStore.class) - { - return (C) createKeyStore(attributes); - } - else if (childClass == TrustStore.class) - { - return (C) createTrustStore(attributes); - } - else if (childClass == GroupProvider.class) - { - return (C) createGroupProvider(attributes); - } - else - { - return createChild(childClass, attributes); - } - } - }); - - } - - /** - * Called when adding a new port via the management interface - */ - private Port<?> createPort(Map<String, Object> attributes) - { - Port<?> port = createChild(Port.class, attributes); - addPort(port); - return port; - } - - private void addPort(final Port<?> port) - { - port.addChangeListener(this); - - } - - private AccessControlProvider<?> createAccessControlProvider(final Map<String, Object> attributes) - { - final Collection<AccessControlProvider<?>> currentProviders = getAccessControlProviders(); - if(currentProviders != null && !currentProviders.isEmpty()) + return (ListenableFuture<C>) createVirtualHostNodeAsync(attributes); + } + else if (Arrays.asList(Port.class, + AccessControlProvider.class, + AuthenticationProvider.class, + KeyStore.class, + TrustStore.class, + GroupProvider.class).contains(childClass)) { - throw new IllegalConfigurationException("Cannot add a second AccessControlProvider"); + return createAndAddChangeListener(childClass, attributes); + } + else + { + return getObjectFactory().createAsync(childClass, attributes, this); } - AccessControlProvider<?> accessControlProvider = (AccessControlProvider<?>) createChild(AccessControlProvider.class, attributes); - accessControlProvider.addChangeListener(this); - return accessControlProvider; } - private boolean deleteAccessControlProvider(AccessControlProvider<?> accessControlProvider) + private <V extends ConfiguredObject> ListenableFuture<V> createAndAddChangeListener(Class<V> clazz, Map<String,Object> attributes) { - accessControlProvider.removeChangeListener(this); - - return true; + return addChangeListener(getObjectFactory().createAsync(clazz, attributes, this)); } - private AuthenticationProvider createAuthenticationProvider(final Map<String, Object> attributes) + private <V extends ConfiguredObject> ListenableFuture<V> addChangeListener(ListenableFuture<V> child) { - return runTask(new Task<AuthenticationProvider>() + return doAfter(child, new CallableWithArgument<ListenableFuture<V>, V>() { @Override - public AuthenticationProvider execute() + public ListenableFuture<V> call(final V child) throws Exception { - AuthenticationProvider<?> authenticationProvider = createChild(AuthenticationProvider.class, attributes); - addAuthenticationProvider(authenticationProvider); - - return authenticationProvider; + child.addChangeListener(BrokerAdapter.this); + return Futures.immediateFuture(child); } }); } - private <X extends ConfiguredObject> X createChild(Class<X> clazz, Map<String, Object> attributes) + private AccessControlProvider<?> createAccessControlProvider(final Map<String, Object> attributes) { - if(!attributes.containsKey(ConfiguredObject.ID)) - { - attributes = new HashMap<String, Object>(attributes); - attributes.put(ConfiguredObject.ID, UUID.randomUUID()); - } - final X instance = (X) getObjectFactory().create(clazz,attributes, this); - return instance; - } - - /** - * @throws IllegalConfigurationException if an AuthenticationProvider with the same name already exists - */ - private void addAuthenticationProvider(AuthenticationProvider<?> authenticationProvider) - { - authenticationProvider.addChangeListener(this); - } + AccessControlProvider<?> accessControlProvider = (AccessControlProvider<?>) (AccessControlProvider) getObjectFactory() + .create(AccessControlProvider.class, attributes, this); + accessControlProvider.addChangeListener(this); - private GroupProvider<?> createGroupProvider(final Map<String, Object> attributes) - { - return runTask(new Task<GroupProvider<?>>() - { - @Override - public GroupProvider<?> execute() - { - GroupProvider<?> groupProvider = createChild(GroupProvider.class, attributes); - addGroupProvider(groupProvider); + return accessControlProvider; - return groupProvider; - } - }); } - private void addGroupProvider(GroupProvider<?> groupProvider) + private boolean deleteAccessControlProvider(AccessControlProvider<?> accessControlProvider) { - groupProvider.addChangeListener(this); + accessControlProvider.removeChangeListener(this); + + return true; } private boolean deleteGroupProvider(GroupProvider groupProvider) @@ -685,38 +617,12 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple return true; } - private KeyStore createKeyStore(Map<String, Object> attributes) - { - - KeyStore<?> keyStore = createChild(KeyStore.class, attributes); - - addKeyStore(keyStore); - return keyStore; - } - - private TrustStore createTrustStore(Map<String, Object> attributes) - { - TrustStore trustStore = createChild(TrustStore.class, attributes); - addTrustStore(trustStore); - return trustStore; - } - - private void addKeyStore(KeyStore keyStore) - { - keyStore.addChangeListener(this); - } - private boolean deleteKeyStore(KeyStore keyStore) { keyStore.removeChangeListener(this); return true; } - private void addTrustStore(TrustStore trustStore) - { - trustStore.addChangeListener(this); - } - private boolean deleteTrustStore(TrustStore trustStore) { trustStore.removeChangeListener(this); @@ -740,11 +646,6 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple return true; } - private void addVirtualHostNode(VirtualHostNode<?> virtualHostNode) - { - virtualHostNode.addChangeListener(this); - } - private boolean deleteVirtualHostNode(final VirtualHostNode virtualHostNode) throws AccessControlException, IllegalStateException { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java index 0b7be2c28a..2cca7cdc84 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java @@ -231,7 +231,7 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection } @Override - public <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) + public <C extends ConfiguredObject> ListenableFuture<C> addChildAsync(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) { if(childClass == Session.class) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java index 7c34adbaa0..07c17c5c92 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java @@ -211,7 +211,7 @@ public class FileBasedGroupProviderImpl } @Override - public <C extends ConfiguredObject> C addChild(Class<C> childClass, + public <C extends ConfiguredObject> ListenableFuture<C> addChildAsync(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) { if (childClass == Group.class) @@ -231,7 +231,7 @@ public class FileBasedGroupProviderImpl attrMap.put(Group.NAME, groupName); GroupAdapter groupAdapter = new GroupAdapter(attrMap); groupAdapter.create(); - return (C) groupAdapter; + return Futures.immediateFuture((C) groupAdapter); } @@ -433,7 +433,7 @@ public class FileBasedGroupProviderImpl @Override - public <C extends ConfiguredObject> C addChild(Class<C> childClass, + public <C extends ConfiguredObject> ListenableFuture<C> addChildAsync(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) { @@ -448,7 +448,7 @@ public class FileBasedGroupProviderImpl attrMap.put(GroupMember.NAME, memberName); GroupMemberAdapter groupMemberAdapter = new GroupMemberAdapter(attrMap); groupMemberAdapter.create(); - return (C) groupMemberAdapter; + return Futures.immediateFuture((C) groupMemberAdapter); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 568fb546b5..8b977da478 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -3003,7 +3003,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } @Override - protected <C extends ConfiguredObject> C addChild(final Class<C> childClass, + protected <C extends ConfiguredObject> ListenableFuture<C> addChildAsync(final Class<C> childClass, final Map<String, Object> attributes, final ConfiguredObject... otherParents) { @@ -3016,12 +3016,12 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> { if(binding.getExchange() == otherParents[0] && binding.getName().equals(bindingKey)) { - return (C) binding; + return Futures.immediateFuture((C) binding); } } return null; } - return super.addChild(childClass, attributes, otherParents); + return super.addChildAsync(childClass, attributes, otherParents); } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java index 8f53eec4f2..6e9d9b7cd1 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java @@ -141,15 +141,21 @@ public abstract class AbstractAuthenticationManager<T extends AbstractAuthentica @SuppressWarnings("unchecked") @Override - public <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) + public <C extends ConfiguredObject> ListenableFuture<C> addChildAsync(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) { if(childClass == PreferencesProvider.class) { attributes = new HashMap<>(attributes); - PreferencesProvider<?> pp = getObjectFactory().create(PreferencesProvider.class, attributes, this); - - _preferencesProvider = pp; - return (C)pp; + return doAfter(getObjectFactory().createAsync(PreferencesProvider.class, attributes, this), + new CallableWithArgument<ListenableFuture<C>, PreferencesProvider>() + { + @Override + public ListenableFuture<C> call(final PreferencesProvider preferencesProvider) throws Exception + { + _preferencesProvider = preferencesProvider; + return Futures.immediateFuture((C)preferencesProvider); + } + }); } throw new IllegalArgumentException("Cannot create child of class " + childClass.getSimpleName()); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ConfigModelPasswordManagingAuthenticationProvider.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ConfigModelPasswordManagingAuthenticationProvider.java index 50a2a36130..65e6128d71 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ConfigModelPasswordManagingAuthenticationProvider.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ConfigModelPasswordManagingAuthenticationProvider.java @@ -32,6 +32,9 @@ import javax.security.auth.login.AccountNotFoundException; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.configuration.updater.Task; import org.apache.qpid.server.configuration.updater.VoidTaskWithException; import org.apache.qpid.server.model.Broker; @@ -189,7 +192,7 @@ public abstract class ConfigModelPasswordManagingAuthenticationProvider<X extend } @Override - public <C extends ConfiguredObject> C addChild(final Class<C> childClass, + public <C extends ConfiguredObject> ListenableFuture<C> addChildAsync(final Class<C> childClass, final Map<String, Object> attributes, final ConfiguredObject... otherParents) { @@ -203,9 +206,9 @@ public abstract class ConfigModelPasswordManagingAuthenticationProvider<X extend attributes.put(User.PASSWORD, createStoredPassword((String) attributes.get(User.PASSWORD))); ManagedUser user = new ManagedUser(attributes, ConfigModelPasswordManagingAuthenticationProvider.this); user.create(); - return (C)getUser(username); + return Futures.immediateFuture((C)getUser(username)); } - return super.addChild(childClass, attributes, otherParents); + return super.addChildAsync(childClass, attributes, otherParents); } abstract void validateUser(final ManagedUser managedUser); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java index 6631ebab54..aa85b887c2 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java @@ -334,7 +334,7 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal } @Override - public <C extends ConfiguredObject> C addChild(Class<C> childClass, + public <C extends ConfiguredObject> ListenableFuture<C> addChildAsync(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) { @@ -351,10 +351,10 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal throw new IllegalArgumentException("User '" + username + "' was not added into principal database"); } _userMap.put(p, principalAdapter); - return (C)principalAdapter; + return Futures.immediateFuture((C)principalAdapter); } - return super.addChild(childClass, attributes, otherParents); + return super.addChildAsync(childClass, attributes, otherParents); } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupImpl.java index 96d32f4179..8669ce2304 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupImpl.java @@ -58,17 +58,17 @@ public class GroupImpl extends AbstractConfiguredObject<GroupImpl> implements Gr } @Override - protected <C extends ConfiguredObject> C addChild(final Class<C> childClass, + protected <C extends ConfiguredObject> ListenableFuture<C> addChildAsync(final Class<C> childClass, final Map<String, Object> attributes, final ConfiguredObject... otherParents) { if(childClass == GroupMember.class) { - return (C) getObjectFactory().create(childClass, attributes, this); + return getObjectFactory().createAsync(childClass, attributes, this); } else { - return super.addChild(childClass, attributes, otherParents); + return super.addChildAsync(childClass, attributes, otherParents); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupProviderImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupProviderImpl.java index 7dc032cc90..be3e405c82 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupProviderImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupProviderImpl.java @@ -75,19 +75,18 @@ public class GroupProviderImpl extends AbstractConfiguredObject<GroupProviderImp } @Override - protected <C extends ConfiguredObject> C addChild(final Class<C> childClass, + protected <C extends ConfiguredObject> ListenableFuture<C> addChildAsync(final Class<C> childClass, final Map<String, Object> attributes, final ConfiguredObject... otherParents) { if(childClass == Group.class) { - C child = (C) getObjectFactory().create(childClass, attributes, this); + return getObjectFactory().createAsync(childClass, attributes, this); - return child; } else { - return super.addChild(childClass, attributes, otherParents); + return super.addChildAsync(childClass, attributes, otherParents); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index ccbee865fb..e74da76d4e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -484,17 +484,17 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } @Override - protected <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) + protected <C extends ConfiguredObject> ListenableFuture<C> addChildAsync(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) { checkVHostStateIsActive(); if(childClass == Exchange.class) { - return (C) addExchange(attributes); + return (ListenableFuture<C>) addExchangeAsync(attributes); } else if(childClass == Queue.class) { - return (C) addQueue(attributes); + return (ListenableFuture<C>) addQueueAsync(attributes); } else if(childClass == VirtualHostAlias.class) @@ -693,7 +693,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte return (AMQQueue<?> )createChild(Queue.class, attributes); } - private AMQQueue<?> addQueue(Map<String, Object> attributes) throws QueueExistsException + private ListenableFuture<? extends AMQQueue<?>> addQueueAsync(Map<String, Object> attributes) throws QueueExistsException { if (shouldCreateDLQ(attributes)) { @@ -704,7 +704,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte attributes = new LinkedHashMap<String, Object>(attributes); attributes.put(Queue.ALTERNATE_EXCHANGE, altExchangeName); } - return addQueueWithoutDLQ(attributes); + return Futures.immediateFuture(addQueueWithoutDLQ(attributes)); } private AMQQueue<?> addQueueWithoutDLQ(Map<String, Object> attributes) throws QueueExistsException @@ -778,34 +778,34 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } - private ExchangeImpl addExchange(Map<String,Object> attributes) - throws ExchangeExistsException, ReservedExchangeNameException, - NoFactoryForTypeException - { - try - { - return (ExchangeImpl) getObjectFactory().create(Exchange.class, attributes, this); - } - catch (DuplicateNameException e) - { - throw new ExchangeExistsException(String.format("Exchange with name '%s' already exists", e.getName()), getExchange(e.getName())); - } - - } - private ListenableFuture<ExchangeImpl> addExchangeAsync(Map<String,Object> attributes) throws ExchangeExistsException, ReservedExchangeNameException, NoFactoryForTypeException { - try - { - ListenableFuture result = getObjectFactory().createAsync(Exchange.class, attributes, this); - return result; - } - catch (DuplicateNameException e) - { - throw new ExchangeExistsException(getExchange(e.getName())); - } + final SettableFuture<ExchangeImpl> returnVal = SettableFuture.create(); + Futures.addCallback(getObjectFactory().createAsync(Exchange.class, attributes, this), + new FutureCallback<Exchange>() + { + @Override + public void onSuccess(final Exchange result) + { + returnVal.set((ExchangeImpl) result); + } + + @Override + public void onFailure(final Throwable t) + { + if(t instanceof DuplicateNameException) + { + returnVal.setException(new ExchangeExistsException(getExchange(((DuplicateNameException)t).getName()))); + } + else + { + returnVal.setException(t); + } + } + }); + return returnVal; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java index 7bd1b14e42..018945b147 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java @@ -60,14 +60,14 @@ public abstract class AbstractStandardVirtualHostNode<X extends AbstractStandard @SuppressWarnings({ "rawtypes", "unchecked" }) @Override - protected <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, + protected <C extends ConfiguredObject> ListenableFuture<C> addChildAsync(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) { if(childClass == VirtualHost.class) { - return (C) getObjectFactory().create(VirtualHost.class, attributes, this); + return getObjectFactory().createAsync(childClass, attributes, this); } - return super.addChild(childClass, attributes, otherParents); + return super.addChildAsync(childClass, attributes, otherParents); } @Override diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/auth/manager/MD5AuthenticationManagerTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/auth/manager/MD5AuthenticationManagerTest.java index 25540dcb92..5281986325 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/auth/manager/MD5AuthenticationManagerTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/auth/manager/MD5AuthenticationManagerTest.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.security.auth.manager; import javax.security.sasl.SaslServer; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ExecutionException; import org.apache.qpid.server.model.User; import org.apache.qpid.server.security.auth.AuthenticationResult; @@ -89,13 +90,13 @@ public class MD5AuthenticationManagerTest extends ManagedAuthenticationManagerTe return getAuthManager().authenticate(ss, response); } - private User createUser(String userName, String userPassword) + private User createUser(String userName, String userPassword) throws ExecutionException, InterruptedException { final Map<String, Object> childAttrs = new HashMap<String, Object>(); childAttrs.put(User.NAME, userName); childAttrs.put(User.PASSWORD, userPassword); - User user = getAuthManager().addChild(User.class, childAttrs); + User user = getAuthManager().addChildAsync(User.class, childAttrs).get(); assertNotNull("User should be created but addChild returned null", user); assertEquals(userName, user.getName()); return user; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/auth/manager/ManagedAuthenticationManagerTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/auth/manager/ManagedAuthenticationManagerTestBase.java index 65cf7ad9d9..b23feb9605 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/auth/manager/ManagedAuthenticationManagerTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/auth/manager/ManagedAuthenticationManagerTestBase.java @@ -27,6 +27,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ExecutionException; import javax.security.auth.login.AccountNotFoundException; import javax.security.sasl.SaslException; @@ -120,7 +121,7 @@ abstract class ManagedAuthenticationManagerTestBase extends QpidTestCase } - public void testAddChildAndThenDelete() + public void testAddChildAndThenDelete() throws ExecutionException, InterruptedException { // No children should be present before the test starts assertEquals("No users should be present before the test starts", 0, _authManager.getChildren(User.class).size()); @@ -130,7 +131,7 @@ abstract class ManagedAuthenticationManagerTestBase extends QpidTestCase childAttrs.put(User.NAME, getTestName()); childAttrs.put(User.PASSWORD, "password"); - User user = _authManager.addChild(User.class, childAttrs); + User user = _authManager.addChildAsync(User.class, childAttrs).get(); assertNotNull("User should be created but addChild returned null", user); assertEquals(getTestName(), user.getName()); if(!isPlain()) @@ -159,7 +160,7 @@ abstract class ManagedAuthenticationManagerTestBase extends QpidTestCase } - public void testCreateUser() + public void testCreateUser() throws ExecutionException, InterruptedException { assertEquals("No users should be present before the test starts", 0, _authManager.getChildren(User.class).size()); assertTrue(_authManager.createUser(getTestName(), "password", Collections.<String, String>emptyMap())); @@ -178,7 +179,7 @@ abstract class ManagedAuthenticationManagerTestBase extends QpidTestCase childAttrs.put(User.PASSWORD, "password"); try { - user = _authManager.addChild(User.class, childAttrs); + user = _authManager.addChildAsync(User.class, childAttrs).get(); fail("Should not be able to create a second user with the same name"); } catch(IllegalArgumentException e) diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostNodeRestTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostNodeRestTest.java index 083187b8db..6efdeace9e 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostNodeRestTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostNodeRestTest.java @@ -63,6 +63,29 @@ public class VirtualHostNodeRestTest extends QpidRestTestCase assertFalse("Store should not exist after deletion", storePathAsFile.exists()); } + public void testCreateVirtualHostNodeWithVirtualHost() throws Exception + { + String nodeName = "virtualhostnode-" + getTestName(); + + Map<String, Object> nodeData = new HashMap<String, Object>(); + nodeData.put(VirtualHostNode.NAME, nodeName); + nodeData.put(VirtualHostNode.TYPE, getTestProfileVirtualHostNodeType()); + + nodeData.put("virtualHostInitialConfiguration", "{ \"type\" : \"DERBY\" }"); + + getRestTestHelper().submitRequest("virtualhostnode/" + nodeName, + "PUT", + nodeData, + HttpServletResponse.SC_CREATED); + + + Map<String, Object> virtualhostNode = getRestTestHelper().getJsonAsSingletonList("virtualhostnode/" + nodeName); + Asserts.assertVirtualHostNode(nodeName, virtualhostNode); + + Map<String, Object> virtualhost = getRestTestHelper().getJsonAsSingletonList("virtualhost/" + nodeName + "/" + nodeName); + Asserts.assertVirtualHost(nodeName, virtualhost); + } + public void testCreateVirtualHostNodeWithDefaultStorePath() throws Exception { String virtualhostNodeType = getTestProfileVirtualHostNodeType(); |
