diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-04-17 01:07:34 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-04-17 01:07:34 +0000 |
| commit | 7177135ca38651943b3701b171ef29e4fa52ad86 (patch) | |
| tree | 4d221e6969f959b7c0c2a3c0736f71bdf935b459 /qpid/java | |
| parent | 359bd6e75abf11027b668d33d2d733b4cd399e38 (diff) | |
| download | qpid-python-7177135ca38651943b3701b171ef29e4fa52ad86.tar.gz | |
QPID-5709 : [Java Broker] Replace exchange registry / factory with use of common configured object mechanism for registration of children
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1588126 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
86 files changed, 1678 insertions, 2226 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java index 87fc10530e..de17acabab 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java @@ -127,10 +127,16 @@ public class BDBHAVirtualHost extends AbstractVirtualHost<BDBHAVirtualHost> { _messageStore.getEnvironmentFacade().getEnvironment().flushLog(true); - ConfiguredObjectRecordHandler upgraderRecoverer = new ConfiguredObjectRecordRecoveverAndUpgrader(this, getDurableConfigurationRecoverers()); - _messageStore.visitConfiguredObjectRecords(upgraderRecoverer); - - initialiseModel(); + if(isStoreEmpty()) + { + createDefaultExchanges(); + } + else + { + ConfiguredObjectRecordHandler upgraderRecoverer = + new ConfiguredObjectRecordRecoveverAndUpgrader(this, getDurableConfigurationRecoverers()); + _messageStore.visitConfiguredObjectRecords(upgraderRecoverer); + } new MessageStoreRecoverer(this, getMessageStoreLogSubject()).recover(); @@ -157,8 +163,7 @@ public class BDBHAVirtualHost extends AbstractVirtualHost<BDBHAVirtualHost> getConnectionRegistry().close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT); removeHouseKeepingTasks(); - getQueueRegistry().stopAllAndUnregisterMBeans(); - getExchangeRegistry().clearAndUnregisterMbeans(); + getQueueRegistry().close(); getDtxRegistry().close(); finalState = VirtualHostState.PASSIVE; diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java index 7b1355aa45..3e23df6d87 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java @@ -38,6 +38,7 @@ import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.ConfiguredObjectFactory; +import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl; import org.apache.qpid.server.model.Model; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.VirtualHost; @@ -145,7 +146,7 @@ public class VirtualHostTest extends QpidTestCase private VirtualHost<?,?,?> createHost(Map<String, Object> attributes) { - ConfiguredObjectFactory factory = new ConfiguredObjectFactory(Model.getInstance()); + ConfiguredObjectFactory factory = new ConfiguredObjectFactoryImpl(Model.getInstance()); ConfiguredObjectTypeFactory vhostFactory = factory.getConfiguredObjectTypeFactory(VirtualHost.class, attributes); attributes = new HashMap<String, Object>(attributes); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java index 038667249e..af2315f919 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java @@ -44,6 +44,7 @@ import org.apache.qpid.server.logging.SystemOutMessageLogger; import org.apache.qpid.server.logging.log4j.LoggingManagementFacade; import org.apache.qpid.server.logging.messages.BrokerMessages; import org.apache.qpid.server.model.ConfiguredObjectFactory; +import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl; import org.apache.qpid.server.model.Model; import org.apache.qpid.server.model.SystemContext; import org.apache.qpid.server.model.SystemContextImpl; @@ -60,6 +61,7 @@ public class Broker private volatile IApplicationRegistry _applicationRegistry; private EventLogger _eventLogger; private boolean _configuringOwnLogging = false; + private final TaskExecutor _taskExecutor = new TaskExecutor(); protected static class InitException extends RuntimeException { @@ -85,6 +87,8 @@ public class Broker { _applicationRegistry.close(); } + _taskExecutor.stop(); + } finally { @@ -134,10 +138,10 @@ public class Broker } LogRecorder logRecorder = new LogRecorder(); - TaskExecutor taskExecutor = new TaskExecutor(); - taskExecutor.start(); - ConfiguredObjectFactory configuredObjectFactory = new ConfiguredObjectFactory(Model.getInstance()); - SystemContext systemContext = new SystemContextImpl(taskExecutor, configuredObjectFactory, _eventLogger, logRecorder, options); + + _taskExecutor.start(); + ConfiguredObjectFactory configuredObjectFactory = new ConfiguredObjectFactoryImpl(Model.getInstance()); + SystemContext systemContext = new SystemContextImpl(_taskExecutor, configuredObjectFactory, _eventLogger, logRecorder, options); BrokerConfigurationStoreCreator storeCreator = new BrokerConfigurationStoreCreator(); DurableConfigurationStore store = storeCreator.createStore(systemContext, storeType, options.getInitialConfigurationLocation(), diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java index ba8b553b40..634b250d31 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java @@ -38,8 +38,10 @@ import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.util.StateChangeListener; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; public class BindingImpl extends AbstractConfiguredObject<BindingImpl> @@ -48,8 +50,7 @@ public class BindingImpl private final String _bindingKey; private final AMQQueue _queue; private final ExchangeImpl _exchange; - private final Map<String, Object> _arguments; - private final UUID _id; + private Map<String, Object> _arguments; private final AtomicLong _matches = new AtomicLong(); private final BindingLogSubject _logSubject; @@ -81,7 +82,6 @@ public class BindingImpl public BindingImpl(UUID id, Map<String, Object> attributes, AMQQueue queue, ExchangeImpl exchange) { super(parentsMap(queue,exchange),enhanceWithDurable(combineIdWithAttributes(id, attributes), queue, exchange),queue.getVirtualHost().getTaskExecutor()); - _id = id; _bindingKey = (String)attributes.get(org.apache.qpid.server.model.Binding.NAME); _queue = queue; _exchange = exchange; @@ -198,7 +198,7 @@ public class BindingImpl public String toString() { - return "Binding{bindingKey="+_bindingKey+", exchange="+_exchange+", queue="+_queue+", id= " + _id + " }"; + return "Binding{bindingKey="+_bindingKey+", exchange="+_exchange+", queue="+_queue+", id= " + getId() + " }"; } public void delete() @@ -229,36 +229,6 @@ public class BindingImpl } @Override - public Object getAttribute(final String name) - { - if(ID.equals(name)) - { - return getId(); - } - else if(NAME.equals(name)) - { - return _bindingKey; - } - else if(DURABLE.equals(name)) - { - return isDurable(); - } - else if(LIFETIME_POLICY.equals(name)) - { - return getLifetimePolicy(); - } - else if(QUEUE.equals(name)) - { - return _queue; - } - else if(EXCHANGE.equals(name)) - { - return _exchange; - } - return super.getAttribute(name); - } - - @Override public Object setAttribute(final String name, final Object expected, final Object desired) throws IllegalStateException, AccessControlException, IllegalArgumentException { @@ -276,4 +246,29 @@ public class BindingImpl { return _exchange.getEventLogger(); } + + public void setArguments(final Map<String, Object> arguments) + { + if(getTaskExecutor().isTaskExecutorThread()) + { + _arguments = arguments; + super.setAttribute(ARGUMENTS, getActualAttributes().get(ARGUMENTS), arguments); + if (isDurable()) + { + VirtualHostImpl<?, ?, ?> vhost = (VirtualHostImpl<?, ?, ?>) _exchange.getParent(VirtualHost.class); + vhost.getDurableConfigurationStore().update(true, asObjectRecord()); + } + } + else + { + getTaskExecutor().submitAndWait(new Runnable() + { + @Override + public void run() + { + setArguments(arguments); + } + }); + } + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java index 43ff07e6d0..4e3601efcc 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java @@ -142,6 +142,19 @@ public class TaskExecutor return future; } + public void submitAndWait(final Runnable task) throws CancellationException + { + submitAndWait(new Task<Void>() + { + @Override + public Void call() + { + task.run(); + return null; + } + }); + } + public <T> T submitAndWait(Task<T> task) throws CancellationException { try diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index d135d05e64..b1dd6a3721 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.log4j.Logger; +import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.server.binding.BindingImpl; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.LogSubject; @@ -54,7 +55,6 @@ import org.apache.qpid.server.model.ManagedAttributeField; import org.apache.qpid.server.model.Publisher; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.State; -import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; @@ -65,6 +65,7 @@ import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.StateChangeListener; import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException; import org.apache.qpid.server.virtualhost.RequiredExchangeException; +import org.apache.qpid.server.virtualhost.ReservedExchangeNameException; import org.apache.qpid.server.virtualhost.UnknownExchangeException; import org.apache.qpid.server.virtualhost.VirtualHostImpl; @@ -111,7 +112,15 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> super(parentsMap(vhost), attributes, vhost.getTaskExecutor()); _virtualHost = vhost; // check ACL - _virtualHost.getSecurityManager().authoriseCreateExchange(this); + try + { + _virtualHost.getSecurityManager().authoriseCreateExchange(this); + } + catch (AccessControlException e) + { + deleted(); + throw e; + } _logSubject = new ExchangeLogSubject(this, this.getVirtualHost()); @@ -129,15 +138,52 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> } @Override + public void validate() + { + super.validate(); + + if(!_virtualHost.getSecurityManager().isSystemProcess()) + { + if (isReservedExchangeName(getName())) + { + deleted(); + throw new ReservedExchangeNameException(getName()); + } + } + } + + private boolean isReservedExchangeName(String name) + { + if (name == null || ExchangeDefaults.DEFAULT_EXCHANGE_NAME.equals(name) + || name.startsWith("amq.") || name.startsWith("qpid.")) + { + return true; + } + return false; + } + + + @Override protected void onOpen() { super.onOpen(); - postSetAlternateExchange(); + // Log Exchange creation getEventLogger().message(ExchangeMessages.CREATED(getExchangeType().getType(), getName(), isDurable())); } @Override + protected void onCreate() + { + super.onCreate(); + if(isDurable()) + { + DurableConfigurationStoreHelper.createExchange(getVirtualHost().getDurableConfigurationStore(), this); + } + + } + + @Override public EventLogger getEventLogger() { return _virtualHost.getEventLogger(); @@ -156,8 +202,25 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> return getLifetimePolicy() != LifetimePolicy.PERMANENT; } - public void close() + public void delete() { + _virtualHost.getSecurityManager().authoriseDelete(this); + + if(hasReferrers()) + { + throw new ExchangeIsAlternateException(getName()); + } + + if(getExchangeType().getDefaultExchangeName().equals( getName() )) + { + throw new RequiredExchangeException(getName()); + } + + if (isDurable() && !isAutoDelete()) + { + DurableConfigurationStoreHelper.removeExchange(getVirtualHost().getDurableConfigurationStore(), this); + + } if(_closed.compareAndSet(false,true)) { @@ -180,7 +243,15 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> task.performAction(this); } _closeTaskList.clear(); + + if (isDurable() && !isAutoDelete()) + { + DurableConfigurationStoreHelper.removeExchange(getVirtualHost().getDurableConfigurationStore(), this); + + } } + deleted(); + } public String toString() @@ -623,10 +694,7 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> if (id == null) { - id = UUIDGenerator.generateBindingUUID(getName(), - queue.getName(), - bindingKey, - _virtualHost.getName()); + id = UUID.randomUUID(); } Map<String,Object> attributes = new HashMap<String, Object>(); @@ -636,36 +704,47 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> attributes.put(org.apache.qpid.server.model.Binding.ARGUMENTS, arguments); } - BindingImpl b = new BindingImpl(id, attributes, queue, this); - - BindingImpl existingMapping = _bindingsMap.putIfAbsent(new BindingIdentifier(bindingKey,queue), b); - if (existingMapping == null || force) + BindingImpl existingMapping; + synchronized(this) { - b.addStateChangeListener(_bindingListener); - b.open(); - if (existingMapping != null) - { - existingMapping.delete(); - } + BindingIdentifier bindingIdentifier = new BindingIdentifier(bindingKey, queue); + existingMapping = _bindingsMap.get(bindingIdentifier); - if (b.isDurable() && !restore) + if (existingMapping == null) { - DurableConfigurationStoreHelper.createBinding(_virtualHost.getDurableConfigurationStore(), b); - } + BindingImpl b = new BindingImpl(id, attributes, queue, this); + b.addStateChangeListener(_bindingListener); + b.open(); - queue.addBinding(b); - childAdded(b); + if (b.isDurable() && !restore) + { + DurableConfigurationStoreHelper.createBinding(_virtualHost.getDurableConfigurationStore(), b); + } + _bindingsMap.put(bindingIdentifier, b); + queue.addBinding(b); + childAdded(b); - doAddBinding(b); + doAddBinding(b); - return true; - } - else - { - return false; + return true; + } + else if(force) + { + Map<String,Object> oldArguments = existingMapping.getArguments(); + existingMapping.setArguments(arguments); + onBindingUpdated(existingMapping, oldArguments); + return true; + } + else + { + return false; + } } } + protected abstract void onBindingUpdated(final BindingImpl binding, + final Map<String, Object> oldArguments); + @Override protected boolean setState(final State currentState, final State desiredState) { @@ -803,37 +882,12 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> } @Override - public void delete() - { - try - { - _virtualHost.removeExchange(this,true); - } - catch (ExchangeIsAlternateException e) - { - throw new UnsupportedOperationException(e.getMessage(),e); - } - catch (RequiredExchangeException e) - { - throw new UnsupportedOperationException("'"+e.getMessage()+"' is a reserved exchange and can't be deleted",e); - } - } - - @Override public Object getAttribute(final String name) { if(ConfiguredObject.STATE.equals(name)) { return getState(); } - else if(LIFETIME_POLICY.equals(name)) - { - return getLifetimePolicy(); - } - else if(DURABLE.equals(name)) - { - return isDurable(); - } return super.getAttribute(name); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java deleted file mode 100644 index 4dc9811934..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.exchange; - -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; - -import org.apache.log4j.Logger; - -import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.server.plugin.ExchangeType; -import org.apache.qpid.server.plugin.QpidServiceLoader; -import org.apache.qpid.server.util.MapValueConverter; -import org.apache.qpid.server.virtualhost.UnknownExchangeException; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; - -public class DefaultExchangeFactory implements ExchangeFactory -{ - public static final String DEFAULT_DLE_NAME_SUFFIX = "_DLE"; - - private static final Logger LOGGER = Logger.getLogger(DefaultExchangeFactory.class); - - private static final String[] BASE_EXCHANGE_TYPES = - new String[]{ExchangeDefaults.DIRECT_EXCHANGE_CLASS, - ExchangeDefaults.FANOUT_EXCHANGE_CLASS, - ExchangeDefaults.HEADERS_EXCHANGE_CLASS, - ExchangeDefaults.TOPIC_EXCHANGE_CLASS}; - - private final VirtualHostImpl _host; - private Map<String, ExchangeType<? extends ExchangeImpl>> _exchangeClassMap = new HashMap<String, ExchangeType<? extends ExchangeImpl>>(); - - public DefaultExchangeFactory(VirtualHostImpl host) - { - _host = host; - - @SuppressWarnings("rawtypes") - Iterable<ExchangeType> exchangeTypes = loadExchangeTypes(); - for (ExchangeType<?> exchangeType : exchangeTypes) - { - String typeName = exchangeType.getType(); - - if(LOGGER.isDebugEnabled()) - { - LOGGER.debug("Registering exchange type '" + typeName + "' using class '" + exchangeType.getClass().getName() + "'"); - } - - if(_exchangeClassMap.containsKey(typeName)) - { - ExchangeType<?> existingType = _exchangeClassMap.get(typeName); - - throw new IllegalStateException("ExchangeType with type name '" + typeName + "' is already registered using class '" - + existingType.getClass().getName() + "', can not register class '" - + exchangeType.getClass().getName() + "'"); - } - - _exchangeClassMap.put(typeName, exchangeType); - } - - for(String type : BASE_EXCHANGE_TYPES) - { - if(!_exchangeClassMap.containsKey(type)) - { - throw new IllegalStateException("Did not find expected exchange type: " + type); - } - } - } - - @SuppressWarnings("rawtypes") - protected Iterable<ExchangeType> loadExchangeTypes() - { - return new QpidServiceLoader<ExchangeType>().atLeastOneInstanceOf(ExchangeType.class); - } - - public Collection<ExchangeType<? extends ExchangeImpl>> getRegisteredTypes() - { - return _exchangeClassMap.values(); - } - - @Override - public ExchangeImpl createExchange(final Map<String, Object> attributes) - throws AMQUnknownExchangeType, UnknownExchangeException - { - ExchangeImpl exchange = createExchangeInstance(attributes); - ((AbstractExchange)exchange).create(); - return exchange; - } - - private ExchangeImpl createExchangeInstance(final Map<String, Object> attributes) - { - String type = MapValueConverter.getStringAttribute(org.apache.qpid.server.model.Exchange.TYPE, attributes); - ExchangeType<? extends ExchangeImpl> exchType = _exchangeClassMap.get(type); - if (exchType == null) - { - throw new AMQUnknownExchangeType("Unknown exchange type: " + type,null); - } - return exchType.newInstance(_host, attributes); - - } - - @Override - public ExchangeImpl restoreExchange(Map<String,Object> attributes) - throws AMQUnknownExchangeType, UnknownExchangeException - { - ExchangeImpl exchange = createExchangeInstance(attributes); - exchange.open(); - return exchange; - - } -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java deleted file mode 100644 index a79601bced..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java +++ /dev/null @@ -1,241 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.exchange; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import org.apache.log4j.Logger; - -import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.server.message.MessageDestination; -import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.plugin.ExchangeType; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.store.DurableConfigurationStore; -import org.apache.qpid.server.store.DurableConfigurationStoreHelper; -import org.apache.qpid.server.util.ServerScopedRuntimeException; -import org.apache.qpid.server.virtualhost.UnknownExchangeException; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; - -public class DefaultExchangeRegistry implements ExchangeRegistry -{ - private static final Logger LOGGER = Logger.getLogger(DefaultExchangeRegistry.class); - /** - * Maps from exchange name to exchange instance - */ - private ConcurrentMap<String, ExchangeImpl<?>> _exchangeMap = new ConcurrentHashMap<String, ExchangeImpl<?>>(); - - private MessageDestination _defaultExchange; - - private final VirtualHostImpl _host; - private final QueueRegistry _queueRegistry; - - private final Collection<RegistryChangeListener> _listeners = - Collections.synchronizedCollection(new ArrayList<RegistryChangeListener>()); - - public DefaultExchangeRegistry(VirtualHostImpl host, QueueRegistry queueRegistry) - { - _host = host; - _queueRegistry = queueRegistry; - } - - public void initialise(ExchangeFactory exchangeFactory) - { - //create 'standard' exchanges: - initialiseExchanges(exchangeFactory, getDurableConfigurationStore()); - - _defaultExchange = - new DefaultDestination(_host - ); - - - } - - private void initialiseExchanges(ExchangeFactory factory, DurableConfigurationStore store) - { - for (ExchangeType<? extends ExchangeImpl> type : factory.getRegisteredTypes()) - { - defineExchange(factory, type.getDefaultExchangeName(), type.getType(), store); - } - - } - - private void defineExchange(ExchangeFactory f, String name, String type, DurableConfigurationStore store) - { - try - { - if(getExchange(name) == null) - { - Map<String, Object> attributes = new HashMap<String, Object>(); - attributes.put(org.apache.qpid.server.model.Exchange.ID, - UUIDGenerator.generateExchangeUUID(name, _host.getName())); - attributes.put(org.apache.qpid.server.model.Exchange.NAME, name); - attributes.put(org.apache.qpid.server.model.Exchange.TYPE, type); - attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, true); - ExchangeImpl<?> exchange = f.createExchange(attributes); - exchange.open(); - registerExchange(exchange); - if(exchange.isDurable()) - { - DurableConfigurationStoreHelper.createExchange(store, exchange); - } - } - } - catch (AMQUnknownExchangeType e) - { - throw new ServerScopedRuntimeException("Unknown exchange type while attempting to initialise exchanges - " + - "this is because necessary jar files are not on the classpath", e); - } - catch (UnknownExchangeException e) - { - throw new ServerScopedRuntimeException("Unknown alternate exchange type while attempting to initialise " + - "a mandatory exchange which should not have an alternate: '" + - name + "'"); - } - } - - public DurableConfigurationStore getDurableConfigurationStore() - { - return _host.getDurableConfigurationStore(); - } - - public void registerExchange(ExchangeImpl exchange) - { - _exchangeMap.put(exchange.getName(), exchange); - synchronized (_listeners) - { - for(RegistryChangeListener listener : _listeners) - { - listener.exchangeRegistered(exchange); - } - - } - } - - public MessageDestination getDefaultExchange() - { - return _defaultExchange; - } - - public boolean unregisterExchange(String name, boolean inUse) - { - final ExchangeImpl exchange = _exchangeMap.get(name); - if (exchange != null) - { - - _host.getSecurityManager().authoriseDelete(exchange); - - // TODO: check inUse argument - - ExchangeImpl e = _exchangeMap.remove(name); - // if it is null then it was removed by another thread at the same time, we can ignore - if (e != null) - { - e.close(); - - synchronized (_listeners) - { - for(RegistryChangeListener listener : _listeners) - { - listener.exchangeUnregistered(exchange); - } - } - - } - } - return exchange != null; - - } - - public Collection<ExchangeImpl<?>> getExchanges() - { - return new ArrayList<ExchangeImpl<?>>(_exchangeMap.values()); - } - - public void addRegistryChangeListener(RegistryChangeListener listener) - { - _listeners.add(listener); - } - - public ExchangeImpl<?> getExchange(String name) - { - return name == null ? null : _exchangeMap.get(name); - } - - @Override - public void clearAndUnregisterMbeans() - { - for (final ExchangeImpl<?> exchange : getExchanges()) - { - //TODO: this is a bit of a hack, what if the listeners aren't aware - //that we are just unregistering the MBean because of HA, and aren't - //actually removing the exchange as such. - synchronized (_listeners) - { - for(RegistryChangeListener listener : _listeners) - { - listener.exchangeUnregistered(exchange); - } - } - } - _exchangeMap.clear(); - } - - @Override - public synchronized ExchangeImpl<?> getExchange(UUID exchangeId) - { - Collection<ExchangeImpl<?>> exchanges = _exchangeMap.values(); - for (ExchangeImpl<?> exchange : exchanges) - { - if (exchange.getId().equals(exchangeId)) - { - return exchange; - } - } - return null; - - } - - public boolean isReservedExchangeName(String name) - { - if (name == null || ExchangeDefaults.DEFAULT_EXCHANGE_NAME.equals(name) - || name.startsWith("amq.") || name.startsWith("qpid.")) - { - return true; - } - Collection<ExchangeType<? extends ExchangeImpl>> registeredTypes = _host.getExchangeTypes(); - for (ExchangeType<? extends ExchangeImpl> type : registeredTypes) - { - if (type.getDefaultExchangeName().equals(name)) - { - return true; - } - } - return false; - } -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java index 671cbbe7e7..7b011f6cc1 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java @@ -70,6 +70,11 @@ public class DirectExchange extends AbstractExchange<DirectExchange> recalculateQueues(); } + public synchronized void updateBinding(BindingImpl binding) + { + recalculateQueues(); + } + private void recalculateQueues() { List<BaseQueue> queues = new ArrayList<BaseQueue>(_bindings.size()); @@ -188,6 +193,19 @@ public class DirectExchange extends AbstractExchange<DirectExchange> } + @Override + protected void onBindingUpdated(final BindingImpl binding, final Map<String, Object> oldArguments) + { + String bindingKey = binding.getBindingKey(); + AMQQueue queue = binding.getAMQQueue(); + + assert queue != null; + assert bindingKey != null; + + BindingSet bindings = _bindingsByKey.get(bindingKey); + bindings.updateBinding(binding); + } + protected void onBind(final BindingImpl binding) { String bindingKey = binding.getBindingKey(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeFactory.java new file mode 100644 index 0000000000..c5deb7e3b3 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeFactory.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.exchange; + +import java.util.Map; + +import org.apache.qpid.server.model.AbstractConfiguredObjectTypeFactory; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; + +public class DirectExchangeFactory extends AbstractConfiguredObjectTypeFactory<DirectExchange> +{ + public DirectExchangeFactory() + { + super(DirectExchange.class); + } + + @Override + public DirectExchange createInstance(Map<String, Object> attributes, ConfiguredObject<?>... parents) + { + VirtualHost<?,?,?> virtualHost = getParent(VirtualHost.class, parents); + if (!(virtualHost instanceof VirtualHostImpl)) + { + throw new IllegalArgumentException("Unexpected virtual host is set as a parent. Expected instance of " + VirtualHostImpl.class.getName()); + } + return new DirectExchange((VirtualHostImpl<?, ?, ?>)virtualHost , attributes); + } + +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java deleted file mode 100644 index 2731f665ac..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.exchange; - -import org.apache.qpid.server.plugin.ExchangeType; -import org.apache.qpid.server.virtualhost.UnknownExchangeException; - -import java.util.Collection; -import java.util.Map; - - -public interface ExchangeFactory -{ - - Collection<ExchangeType<? extends ExchangeImpl>> getRegisteredTypes(); - - ExchangeImpl createExchange(Map<String,Object> attributes) throws AMQUnknownExchangeType, UnknownExchangeException; - - ExchangeImpl restoreExchange(Map<String,Object> attributes) throws AMQUnknownExchangeType, UnknownExchangeException; - -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java index ff63a83292..57929b7306 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java @@ -62,7 +62,7 @@ public interface ExchangeImpl<T extends ExchangeImpl<T>> extends Exchange<T>, Ex void restoreBinding(UUID id, String bindingKey, AMQQueue queue, Map<String, Object> argumentMap); - void close(); + void delete(); /** * Determines whether a message would be isBound to a particular queue using a specific routing key and arguments diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java deleted file mode 100644 index 7fa7a64a62..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.exchange; - -import org.apache.qpid.server.message.MessageDestination; - -import java.util.Collection; -import java.util.UUID; - - -public interface ExchangeRegistry -{ - void registerExchange(ExchangeImpl<?> exchange); - - MessageDestination getDefaultExchange(); - - void initialise(ExchangeFactory exchangeFactory); - - ExchangeImpl<?> getExchange(String exchangeName); - - /** - * Unregister an exchange - * @param exchange name of the exchange to delete - * @param ifUnused if true, do NOT delete the exchange if it is in use (has queues bound to it) - */ - boolean unregisterExchange(String exchange, boolean ifUnused); - - void clearAndUnregisterMbeans(); - - ExchangeImpl<?> getExchange(UUID exchangeId); - - Collection<ExchangeImpl<?>> getExchanges(); - - void addRegistryChangeListener(RegistryChangeListener listener); - - /** - * Validates the name of user custom exchange. - * <p> - * Return true if the exchange name is reserved and false otherwise. - */ - boolean isReservedExchangeName(String name); - - interface RegistryChangeListener - { - void exchangeRegistered(ExchangeImpl exchange); - void exchangeUnregistered(ExchangeImpl exchange); - } -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java index 1484480c5d..97c2ebe5e3 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -125,6 +125,80 @@ public class FanoutExchange extends AbstractExchange<FanoutExchange> } + @Override + protected synchronized void onBindingUpdated(final BindingImpl binding, final Map<String, Object> oldArguments) + { + AMQQueue queue = binding.getAMQQueue(); + + if (binding.getArguments() == null || binding.getArguments().isEmpty() || !FilterSupport.argumentsContainFilter( + binding.getArguments())) + { + if(oldArguments != null && !oldArguments.isEmpty() && FilterSupport.argumentsContainFilter(oldArguments)) + { + _unfilteredQueues.add(queue); + if(_queues.containsKey(queue)) + { + _queues.put(queue,_queues.get(queue)+1); + } + else + { + _queues.put(queue, ONE); + } + + // No longer any reason to check filters for this queue + _filteredQueues.remove(queue); + } + // else - nothing has changed, remains unfiltered + } + else + { + HashMap<AMQQueue,Map<BindingImpl, MessageFilter>> filteredBindings = + new HashMap<AMQQueue,Map<BindingImpl, MessageFilter>>(_filteredBindings.get()); + + Map<BindingImpl,MessageFilter> bindingsForQueue; + + final MessageFilter messageFilter; + + try + { + messageFilter = FilterSupport.createMessageFilter(binding.getArguments(), binding.getAMQQueue()); + } + catch (AMQInvalidArgumentException e) + { + _logger.warn("Cannot bind queue " + queue + " to exchange this " + this + " because selector cannot be parsed.", e); + return; + } + + + if (oldArguments != null && !oldArguments.isEmpty() && FilterSupport.argumentsContainFilter(oldArguments)) + { + bindingsForQueue = new HashMap<BindingImpl,MessageFilter>(filteredBindings.remove(binding.getAMQQueue())); + } + else // previously unfiltered + { + bindingsForQueue = new HashMap<BindingImpl,MessageFilter>(); + + Integer oldValue = _queues.remove(queue); + if (ONE.equals(oldValue)) + { + // should start checking filters for this queue + _filteredQueues.add(queue); + _unfilteredQueues.remove(queue); + } + else + { + _queues.put(queue, oldValue - 1); + } + + } + bindingsForQueue.put(binding, messageFilter); + filteredBindings.put(binding.getAMQQueue(),bindingsForQueue); + + _filteredBindings.set(filteredBindings); + + } + + } protected synchronized void onBind(final BindingImpl binding) { @@ -156,8 +230,7 @@ public class FanoutExchange extends AbstractExchange<FanoutExchange> new HashMap<AMQQueue,Map<BindingImpl, MessageFilter>>(_filteredBindings.get()); Map<BindingImpl, MessageFilter> bindingsForQueue = filteredBindings.remove(binding.getAMQQueue()); - final - MessageFilter messageFilter = + final MessageFilter messageFilter = FilterSupport.createMessageFilter(binding.getArguments(), binding.getAMQQueue()); if(bindingsForQueue != null) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeFactory.java new file mode 100644 index 0000000000..5884776778 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeFactory.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.exchange; + +import java.util.Map; + +import org.apache.qpid.server.model.AbstractConfiguredObjectTypeFactory; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; + +public class FanoutExchangeFactory extends AbstractConfiguredObjectTypeFactory<FanoutExchange> +{ + public FanoutExchangeFactory() + { + super(FanoutExchange.class); + } + + @Override + public FanoutExchange createInstance(Map<String, Object> attributes, ConfiguredObject<?>... parents) + { + VirtualHost<?,?,?> virtualHost = getParent(VirtualHost.class, parents); + if (!(virtualHost instanceof VirtualHostImpl)) + { + throw new IllegalArgumentException("Unexpected virtual host is set as a parent. Expected instance of " + VirtualHostImpl.class.getName()); + } + return new FanoutExchange((VirtualHostImpl<?, ?, ?>)virtualHost , attributes); + } + +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index 6aa32c8528..10dd33dee1 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.exchange; import java.util.ArrayList; import java.util.LinkedHashSet; +import java.util.ListIterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -158,6 +159,21 @@ public class HeadersExchange extends AbstractExchange<HeadersExchange> } + @Override + protected void onBindingUpdated(final BindingImpl binding, final Map<String, Object> oldArguments) + { + HeadersBinding headersBinding = new HeadersBinding(binding); + ListIterator<HeadersBinding> iter = _bindingHeaderMatchers.listIterator(); + while(iter.hasNext()) + { + if(iter.next().equals(headersBinding)) + { + iter.set(headersBinding); + } + } + + } + protected void onUnbind(final BindingImpl binding) { assert binding != null; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeFactory.java new file mode 100644 index 0000000000..b3f9834889 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeFactory.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.exchange; + +import java.util.Map; + +import org.apache.qpid.server.model.AbstractConfiguredObjectTypeFactory; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; + +public class HeadersExchangeFactory extends AbstractConfiguredObjectTypeFactory<HeadersExchange> +{ + public HeadersExchangeFactory() + { + super(HeadersExchange.class); + } + + @Override + public HeadersExchange createInstance(Map<String, Object> attributes, ConfiguredObject<?>... parents) + { + VirtualHost<?,?,?> virtualHost = getParent(VirtualHost.class, parents); + if (!(virtualHost instanceof VirtualHostImpl)) + { + throw new IllegalArgumentException("Unexpected virtual host is set as a parent. Expected instance of " + VirtualHostImpl.class.getName()); + } + return new HeadersExchange((VirtualHostImpl<?, ?, ?>)virtualHost , attributes); + } + +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java index 992d9714cb..683b7b1aa4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java @@ -72,6 +72,67 @@ public class TopicExchange extends AbstractExchange<TopicExchange> return TYPE; } + @Override + protected void onBindingUpdated(final BindingImpl binding, final Map<String, Object> oldArguments) + { + final String bindingKey = binding.getBindingKey(); + AMQQueue queue = binding.getAMQQueue(); + Map<String,Object> args = binding.getArguments(); + + assert queue != null; + assert bindingKey != null; + + _logger.debug("Registering queue " + queue.getName() + " with routing key " + bindingKey); + + + String routingKey = TopicNormalizer.normalize(bindingKey); + + try + { + + if (_bindings.containsKey(binding)) + { + Map<String, Object> oldArgs = _bindings.get(binding); + TopicExchangeResult result = _topicExchangeResults.get(routingKey); + + if (FilterSupport.argumentsContainFilter(args)) + { + if (FilterSupport.argumentsContainFilter(oldArgs)) + { + result.replaceQueueFilter(queue, + FilterSupport.createMessageFilter(oldArgs, queue), + FilterSupport.createMessageFilter(args, queue)); + } + else + { + result.addFilteredQueue(queue, FilterSupport.createMessageFilter(args, queue)); + result.removeUnfilteredQueue(queue); + } + } + else + { + if (FilterSupport.argumentsContainFilter(oldArgs)) + { + result.addUnfilteredQueue(queue); + result.removeFilteredQueue(queue, FilterSupport.createMessageFilter(oldArgs, queue)); + } + else + { + // TODO - fix control flow + return; + } + } + + } + } + catch (AMQInvalidArgumentException e) + { + throw new ConnectionScopedRuntimeException(e); + } + + + } + protected synchronized void registerQueue(final BindingImpl binding) throws AMQInvalidArgumentException { final String bindingKey = binding.getBindingKey(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeFactory.java new file mode 100644 index 0000000000..550377a02f --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeFactory.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.exchange; + +import java.util.Map; + +import org.apache.qpid.server.model.AbstractConfiguredObjectTypeFactory; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; + +public class TopicExchangeFactory extends AbstractConfiguredObjectTypeFactory<TopicExchange> +{ + public TopicExchangeFactory() + { + super(TopicExchange.class); + } + + @Override + public TopicExchange createInstance(Map<String, Object> attributes, ConfiguredObject<?>... parents) + { + VirtualHost<?,?,?> virtualHost = getParent(VirtualHost.class, parents); + if (!(virtualHost instanceof VirtualHostImpl)) + { + throw new IllegalArgumentException("Unexpected virtual host is set as a parent. Expected instance of " + VirtualHostImpl.class.getName()); + } + return new TopicExchange((VirtualHostImpl<?, ?, ?>)virtualHost , attributes); + } + +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java index e11550d575..a04635ec74 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java @@ -41,6 +41,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; @@ -104,7 +105,12 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im new ArrayList<ConfigurationChangeListener>(); private final Map<Class<? extends ConfiguredObject>, Collection<ConfiguredObject<?>>> _children = - new HashMap<Class<? extends ConfiguredObject>, Collection<ConfiguredObject<?>>>(); + new ConcurrentHashMap<Class<? extends ConfiguredObject>, Collection<ConfiguredObject<?>>>(); + private final Map<Class<? extends ConfiguredObject>, Map<UUID,ConfiguredObject<?>>> _childrenById = + new ConcurrentHashMap<Class<? extends ConfiguredObject>, Map<UUID,ConfiguredObject<?>>>(); + private final Map<Class<? extends ConfiguredObject>, Map<String,ConfiguredObject<?>>> _childrenByName = + new ConcurrentHashMap<Class<? extends ConfiguredObject>, Map<String,ConfiguredObject<?>>>(); + @ManagedAttributeField private final UUID _id; @@ -178,6 +184,8 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im if(idObj == null) { uuid = UUID.randomUUID(); + attributes = new HashMap<String, Object>(attributes); + attributes.put(ID, uuid); } else { @@ -185,6 +193,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } _id = uuid; + _name = AttributeValueConverter.STRING_CONVERTER.convert(attributes.get(NAME),this); _attributeTypes = getAttributeTypes(getClass()); _automatedFields = getAutomatedFields(getClass()); @@ -205,6 +214,8 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im for (Class<? extends ConfiguredObject> childClass : Model.getInstance().getChildTypes(getCategoryClass())) { _children.put(childClass, new CopyOnWriteArrayList<ConfiguredObject<?>>()); + _childrenById.put(childClass, new ConcurrentHashMap<UUID, ConfiguredObject<?>>()); + _childrenByName.put(childClass, new ConcurrentHashMap<String, ConfiguredObject<?>>()); } for(ConfiguredObject<?> parent : parents.values()) @@ -220,7 +231,6 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im addParent((Class<ConfiguredObject<?>>) entry.getKey(), entry.getValue()); } - _name = AttributeValueConverter.STRING_CONVERTER.convert(attributes.get(NAME),this); Object durableObj = attributes.get(DURABLE); _durable = AttributeValueConverter.BOOLEAN_CONVERTER.convert(durableObj == null ? _attributeTypes.get(DURABLE).getAnnotation().defaultValue() : durableObj, this); @@ -353,7 +363,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } } - public void open() + public final void open() { if(_open.compareAndSet(false,true)) { @@ -364,7 +374,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } - public void create() + public final void create() { if(_open.compareAndSet(false,true)) { @@ -391,7 +401,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im }); } - protected void doValidation() + protected final void doValidation() { applyToChildren(new Action<ConfiguredObject<?>>() { @@ -407,7 +417,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im validate(); } - protected void doResolution() + protected final void doResolution() { resolve(); applyToChildren(new Action<ConfiguredObject<?>>() @@ -423,7 +433,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im }); } - protected void doCreation() + protected final void doCreation() { onCreate(); applyToChildren(new Action<ConfiguredObject<?>>() @@ -454,7 +464,8 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } } - public void validate() + public void + validate() { } @@ -868,29 +879,78 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im private <C extends ConfiguredObject> void registerChild(final C child) { + Class categoryClass = child.getCategoryClass(); + UUID childId = child.getId(); + String name = child.getName(); + if(_childrenById.get(categoryClass).containsKey(childId)) + { + throw new DuplicateIdException(child); + } + if(_childrenByName.get(categoryClass).containsKey(name)) + { + Collection<Class<? extends ConfiguredObject>> parentTypes = + new ArrayList<Class<? extends ConfiguredObject>>(Model.getInstance().getParentTypes(categoryClass)); + parentTypes.remove(getCategoryClass()); + boolean duplicate = true; + + C existing = (C) _childrenByName.get(categoryClass).get(name); + for(Class<? extends ConfiguredObject> parentType : parentTypes) + { + ConfiguredObject existingParent = existing.getParent(parentType); + ConfiguredObject childParent = child.getParent(parentType); + duplicate = existingParent == childParent; + if(!duplicate) + { + break; + } + } + + if(duplicate) + { + throw new DuplicateNameException(child); + } + } _children.get(categoryClass).add(child); + _childrenById.get(categoryClass).put(childId,child); + _childrenByName.get(categoryClass).put(name, child); + } protected void deleted() { - for(ConfiguredObject<?> parent : _parents.values()) + for (ConfiguredObject<?> parent : _parents.values()) { - if(parent instanceof AbstractConfiguredObject<?>) + if (parent instanceof AbstractConfiguredObject<?>) { - ((AbstractConfiguredObject<?>)parent).unregisterChild(this); + AbstractConfiguredObject<?> parentObj = (AbstractConfiguredObject<?>) parent; + parentObj.unregisterChild(this); + parentObj.childRemoved(this); } } } - protected <C extends ConfiguredObject> void unregisterChild(final C child) + private <C extends ConfiguredObject> void unregisterChild(final C child) { - _children.get(child.getCategoryClass()).remove(child); + Class categoryClass = child.getCategoryClass(); + _children.get(categoryClass).remove(child); + _childrenById.get(categoryClass).remove(child.getId()); + _childrenByName.get(categoryClass).remove(child.getName()); } + @Override + public final <C extends ConfiguredObject> C getChildById(final Class<C> clazz, final UUID id) + { + return (C) _childrenById.get(Model.getCategory(clazz)).get(id); + } + @Override + public final <C extends ConfiguredObject> C getChildByName(final Class<C> clazz, final String name) + { + return (C) _childrenByName.get(Model.getCategory(clazz)).get(name); + } @Override public <C extends ConfiguredObject> Collection<C> getChildren(final Class<C> clazz) @@ -1581,7 +1641,8 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im int oldSize = 0; Model model = Model.getInstance(); - Set<Class<? extends ConfiguredObject>> allDescendants = new HashSet<Class<? extends ConfiguredObject>>(model.getChildTypes(candidate)); + Set<Class<? extends ConfiguredObject>> allDescendants = new HashSet<Class<? extends ConfiguredObject>>(model.getChildTypes( + candidate)); while(allDescendants.size() > oldSize) { oldSize = allDescendants.size(); @@ -1661,4 +1722,27 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im throw new ServerScopedRuntimeException("Unable to find attribute definition for method " + method.getName()); } } + + protected final static class DuplicateIdException extends RuntimeException + { + public DuplicateIdException(final ConfiguredObject<?> child) + { + super("Child of type " + child.getClass().getSimpleName() + " already exists with id of " + child.getId()); + } + } + + protected final static class DuplicateNameException extends RuntimeException + { + private final String _name; + public DuplicateNameException(final ConfiguredObject<?> child) + { + super("Child of type " + child.getClass().getSimpleName() + " already exists with name of " + child.getName()); + _name = child.getName(); + } + + public String getName() + { + return _name; + } + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java index 5285a7b611..dbf138b8a2 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java @@ -194,4 +194,6 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL void setEventLogger(EventLogger eventLogger); AuthenticationProvider<?> getManagementModeAuthenticationProvider(); + + ConfiguredObjectFactory getObjectFactory(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java index 5480b1f415..54bab071fd 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java @@ -233,6 +233,10 @@ public interface ConfiguredObject<X extends ConfiguredObject<X>> */ <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz); + <C extends ConfiguredObject> C getChildById(Class<C> clazz, UUID id); + + <C extends ConfiguredObject> C getChildByName(Class<C> clazz, String name); + <C extends ConfiguredObject> C createChild(Class<C> childClass, Map<String, Object> attributes, diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java index 1c5afa6175..35d7df4385 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java @@ -20,133 +20,25 @@ */ package org.apache.qpid.server.model; -import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; import java.util.Map; import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory; -import org.apache.qpid.server.plugin.QpidServiceLoader; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.UnresolvedConfiguredObject; -import org.apache.qpid.server.util.ServerScopedRuntimeException; -public class ConfiguredObjectFactory +public interface ConfiguredObjectFactory { - private final Map<String, String> _defaultTypes = new HashMap<String, String>(); - private final Map<String, Map<String, ConfiguredObjectTypeFactory>> _allFactories = - new HashMap<String, Map<String, ConfiguredObjectTypeFactory>>(); - private final Map<String, Collection<String>> _supportedTypes = new HashMap<String, Collection<String>>(); + <X extends ConfiguredObject<X>> UnresolvedConfiguredObject<X> recover(ConfiguredObjectRecord record, + ConfiguredObject<?>... parents); - private final Model _model; + <X extends ConfiguredObject<X>> ConfiguredObjectTypeFactory<X> getConfiguredObjectTypeFactory(Class<X> categoryClass, + Map<String, Object> attributes); - public ConfiguredObjectFactory(Model model) - { - _model = model; + <X extends ConfiguredObject<X>> ConfiguredObjectTypeFactory<X> getConfiguredObjectTypeFactory(String category, + String type); - QpidServiceLoader<ConfiguredObjectTypeFactory> serviceLoader = new QpidServiceLoader<ConfiguredObjectTypeFactory>(); - Iterable<ConfiguredObjectTypeFactory> allFactories = serviceLoader.instancesOf(ConfiguredObjectTypeFactory.class); - for(ConfiguredObjectTypeFactory factory : allFactories) - { - final Class<? extends ConfiguredObject> categoryClass = factory.getCategoryClass(); - final String categoryName = categoryClass.getSimpleName(); - - Map<String, ConfiguredObjectTypeFactory> categoryFactories = _allFactories.get(categoryName); - if(categoryFactories == null) - { - categoryFactories = new HashMap<String, ConfiguredObjectTypeFactory>(); - _allFactories.put(categoryName, categoryFactories); - _supportedTypes.put(categoryName, new ArrayList<String>()); - ManagedObject annotation = categoryClass.getAnnotation(ManagedObject.class); - if(annotation != null && !"".equals(annotation.defaultType())) - { - _defaultTypes.put(categoryName, annotation.defaultType()); - } - - } - if(categoryFactories.put(factory.getType(),factory) != null) - { - throw new ServerScopedRuntimeException("Misconfiguration - there is more than one factory defined for class " + categoryName - + " with type " + factory.getType()); - } - if(factory.getType() != null) - { - _supportedTypes.get(categoryName).add(factory.getType()); - } - } - } - - public <X extends ConfiguredObject<X>> UnresolvedConfiguredObject<X> recover(ConfiguredObjectRecord record, - ConfiguredObject<?>... parents) - { - String category = record.getType(); - - - String type = (String) record.getAttributes().get(ConfiguredObject.TYPE); - - ConfiguredObjectTypeFactory<X> factory = getConfiguredObjectTypeFactory(category, type); - - if(factory == null) - { - throw new ServerScopedRuntimeException("No factory defined for ConfiguredObject of category " + category + " and type " + type); - } - - return factory.recover(record, parents); - } - - public <X extends ConfiguredObject<X>> ConfiguredObjectTypeFactory<X> getConfiguredObjectTypeFactory(final Class<X> categoryClass, Map<String,Object> attributes) - { - final String category = categoryClass.getSimpleName(); - Map<String, ConfiguredObjectTypeFactory> categoryFactories = _allFactories.get(category); - if(categoryFactories == null) - { - throw new ServerScopedRuntimeException("No factory defined for ConfiguredObject of category " + category); - } - String type = (String) attributes.get(ConfiguredObject.TYPE); - - ConfiguredObjectTypeFactory<X> factory; - - if(type != null) - { - factory = getConfiguredObjectTypeFactory(category, type); - } - else - { - factory = getConfiguredObjectTypeFactory(category, null); - if(factory == null) - { - ManagedObject annotation = categoryClass.getAnnotation(ManagedObject.class); - factory = getConfiguredObjectTypeFactory(category, annotation.defaultType()); - } - } - return factory; - } - - public <X extends ConfiguredObject<X>> ConfiguredObjectTypeFactory<X> getConfiguredObjectTypeFactory(final String category, final String type) - { - Map<String, ConfiguredObjectTypeFactory> categoryFactories = _allFactories.get(category); - if(categoryFactories == null) - { - throw new ServerScopedRuntimeException("No factory defined for ConfiguredObject of category " + category); - } - ConfiguredObjectTypeFactory factory = categoryFactories.get(type); - if(factory == null) - { - factory = categoryFactories.get(_defaultTypes.get(category)); - } - return factory; - } - - public Collection<String> getSupportedTypes(Class<? extends ConfiguredObject> category) - { - return Collections.unmodifiableCollection(_supportedTypes.get(category.getSimpleName())); - } - - - public Model getModel() - { - return _model; - } + Collection<String> getSupportedTypes(Class<? extends ConfiguredObject> category); + Model getModel(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java new file mode 100644 index 0000000000..57062cb7a2 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java @@ -0,0 +1,159 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.model; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory; +import org.apache.qpid.server.plugin.QpidServiceLoader; +import org.apache.qpid.server.store.ConfiguredObjectRecord; +import org.apache.qpid.server.store.UnresolvedConfiguredObject; +import org.apache.qpid.server.util.ServerScopedRuntimeException; + +public class ConfiguredObjectFactoryImpl implements ConfiguredObjectFactory +{ + private final Map<String, String> _defaultTypes = new HashMap<String, String>(); + private final Map<String, Map<String, ConfiguredObjectTypeFactory>> _allFactories = + new HashMap<String, Map<String, ConfiguredObjectTypeFactory>>(); + private final Map<String, Collection<String>> _supportedTypes = new HashMap<String, Collection<String>>(); + + private final Model _model; + + public ConfiguredObjectFactoryImpl(Model model) + { + _model = model; + + QpidServiceLoader<ConfiguredObjectTypeFactory> serviceLoader = new QpidServiceLoader<ConfiguredObjectTypeFactory>(); + Iterable<ConfiguredObjectTypeFactory> allFactories = serviceLoader.instancesOf(ConfiguredObjectTypeFactory.class); + for(ConfiguredObjectTypeFactory factory : allFactories) + { + final Class<? extends ConfiguredObject> categoryClass = factory.getCategoryClass(); + final String categoryName = categoryClass.getSimpleName(); + + Map<String, ConfiguredObjectTypeFactory> categoryFactories = _allFactories.get(categoryName); + if(categoryFactories == null) + { + categoryFactories = new HashMap<String, ConfiguredObjectTypeFactory>(); + _allFactories.put(categoryName, categoryFactories); + _supportedTypes.put(categoryName, new ArrayList<String>()); + ManagedObject annotation = categoryClass.getAnnotation(ManagedObject.class); + if(annotation != null && !"".equals(annotation.defaultType())) + { + _defaultTypes.put(categoryName, annotation.defaultType()); + } + + } + if(categoryFactories.put(factory.getType(),factory) != null) + { + throw new ServerScopedRuntimeException("Misconfiguration - there is more than one factory defined for class " + categoryName + + " with type " + factory.getType()); + } + if(factory.getType() != null) + { + _supportedTypes.get(categoryName).add(factory.getType()); + } + } + } + + @Override + public <X extends ConfiguredObject<X>> UnresolvedConfiguredObject<X> recover(ConfiguredObjectRecord record, + ConfiguredObject<?>... parents) + { + String category = record.getType(); + + + String type = (String) record.getAttributes().get(ConfiguredObject.TYPE); + + ConfiguredObjectTypeFactory<X> factory = getConfiguredObjectTypeFactory(category, type); + + if(factory == null) + { + throw new ServerScopedRuntimeException("No factory defined for ConfiguredObject of category " + category + " and type " + type); + } + + return factory.recover(record, parents); + } + + @Override + public <X extends ConfiguredObject<X>> ConfiguredObjectTypeFactory<X> getConfiguredObjectTypeFactory(final Class<X> categoryClass, + Map<String, Object> attributes) + { + final String category = categoryClass.getSimpleName(); + Map<String, ConfiguredObjectTypeFactory> categoryFactories = _allFactories.get(category); + if(categoryFactories == null) + { + throw new ServerScopedRuntimeException("No factory defined for ConfiguredObject of category " + category); + } + String type = (String) attributes.get(ConfiguredObject.TYPE); + + ConfiguredObjectTypeFactory<X> factory; + + if(type != null) + { + factory = getConfiguredObjectTypeFactory(category, type); + } + else + { + factory = getConfiguredObjectTypeFactory(category, null); + if(factory == null) + { + ManagedObject annotation = categoryClass.getAnnotation(ManagedObject.class); + factory = getConfiguredObjectTypeFactory(category, annotation.defaultType()); + } + } + return factory; + } + + @Override + public <X extends ConfiguredObject<X>> ConfiguredObjectTypeFactory<X> getConfiguredObjectTypeFactory(final String category, + final String type) + { + Map<String, ConfiguredObjectTypeFactory> categoryFactories = _allFactories.get(category); + if(categoryFactories == null) + { + throw new ServerScopedRuntimeException("No factory defined for ConfiguredObject of category " + category); + } + ConfiguredObjectTypeFactory factory = categoryFactories.get(type); + if(factory == null) + { + factory = categoryFactories.get(_defaultTypes.get(category)); + } + return factory; + } + + @Override + public Collection<String> getSupportedTypes(Class<? extends ConfiguredObject> category) + { + return Collections.unmodifiableCollection(_supportedTypes.get(category.getSimpleName())); + } + + + @Override + public Model getModel() + { + return _model; + } + +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java index 95cb614d44..0ddc392430 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java @@ -56,4 +56,6 @@ public interface Session<X extends Session<X>> extends ConfiguredObject<X> @ManagedStatistic long getUnacknowledgedMessages(); + + void delete(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java index a9e9225a26..d0b4809303 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java @@ -166,11 +166,13 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple String modelVersion = (String) getActualAttributes().get(Broker.MODEL_VERSION); if (modelVersion == null) { + deleted(); throw new IllegalConfigurationException("Broker " + Broker.MODEL_VERSION + " must be specified"); } if (!MODEL_VERSION_PATTERN.matcher(modelVersion).matches()) { + deleted(); throw new IllegalConfigurationException("Broker " + Broker.MODEL_VERSION + " is specified in incorrect format: " + modelVersion); } @@ -182,12 +184,14 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple if (majorModelVersion != Model.MODEL_MAJOR_VERSION || minorModelVersion > Model.MODEL_MINOR_VERSION) { + deleted(); throw new IllegalConfigurationException("The model version '" + modelVersion + "' in configuration is incompatible with the broker model version '" + Model.MODEL_VERSION + "'"); } if(!isDurable()) { + deleted(); throw new IllegalArgumentException(getClass().getSimpleName() + " must be durable"); } } @@ -1203,6 +1207,12 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple } @Override + public ConfiguredObjectFactory getObjectFactory() + { + return _objectFactory; + } + + @Override public void setEventLogger(final EventLogger eventLogger) { _eventLogger = eventLogger; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProvider.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProvider.java index 5dc47f04aa..047ea4ef97 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProvider.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProvider.java @@ -27,6 +27,8 @@ import org.apache.qpid.server.model.ManagedObject; @ManagedObject( category = false, type = "GroupFile" ) public interface FileBasedGroupProvider<X extends FileBasedGroupProvider<X>> extends GroupProvider<X> { + String PATH="path"; + @ManagedAttribute( automate = true, mandatory = true) String getPath(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java index afcfe93618..be14b284b1 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java @@ -23,28 +23,49 @@ import java.io.File; import java.io.IOException; import java.security.AccessControlException; import java.security.Principal; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import org.apache.log4j.Logger; + import org.apache.qpid.server.configuration.IllegalConfigurationException; -import org.apache.qpid.server.model.*; import org.apache.qpid.server.configuration.updater.TaskExecutor; -import org.apache.qpid.server.security.access.Operation; -import org.apache.qpid.server.security.group.FileGroupManager; -import org.apache.qpid.server.security.group.GroupManager; +import org.apache.qpid.server.model.AbstractConfiguredObject; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.Group; +import org.apache.qpid.server.model.GroupMember; +import org.apache.qpid.server.model.GroupProvider; +import org.apache.qpid.server.model.IllegalStateTransitionException; +import org.apache.qpid.server.model.ManagedAttributeField; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.security.access.Operation; +import org.apache.qpid.server.security.auth.UsernamePrincipal; +import org.apache.qpid.server.security.group.FileGroupDatabase; +import org.apache.qpid.server.security.group.GroupPrincipal; import org.apache.qpid.server.util.MapValueConverter; public class FileBasedGroupProviderImpl extends AbstractConfiguredObject<FileBasedGroupProviderImpl> implements FileBasedGroupProvider<FileBasedGroupProviderImpl> { + public static final String RESOURCE_BUNDLE = "org.apache.qpid.server.security.group.FileGroupProviderAttributeDescriptions"; + public static final String GROUP_FILE_PROVIDER_TYPE = "GroupFile"; private static Logger LOGGER = Logger.getLogger(FileBasedGroupProviderImpl.class); - private GroupManager _groupManager; private final Broker<?> _broker; private AtomicReference<State> _state; + private FileGroupDatabase _groupDatabase; + @ManagedAttributeField private String _path; @@ -100,18 +121,74 @@ public class FileBasedGroupProviderImpl protected void onOpen() { super.onOpen(); - if(_groupManager == null) + if(_groupDatabase == null) + { + _groupDatabase = new FileGroupDatabase(); + try + { + _groupDatabase.setGroupFile(getPath()); + } + catch (IOException e) + { + setState(getState(), State.ERRORED); + LOGGER.warn(("Unable to open preferences file at " + _path)); + } + } + Set<Principal> groups = getGroupPrincipals(); + Collection<Group> principals = new ArrayList<Group>(groups.size()); + for (Principal group : groups) { - _groupManager = new FileGroupManager(getPath()); + Map<String,Object> attrMap = new HashMap<String, Object>(); + UUID id = UUIDGenerator.generateGroupUUID(getName(),group.getName()); + attrMap.put(Group.ID, id); + attrMap.put(Group.NAME, group.getName()); + GroupAdapter groupAdapter = new GroupAdapter(attrMap, getTaskExecutor()); + principals.add(groupAdapter); } + } @Override protected void onCreate() { super.onCreate(); - _groupManager = new FileGroupManager(getPath()); - _groupManager.onCreate(); + _groupDatabase = new FileGroupDatabase(); + + File file = new File(_path); + if (!file.exists()) + { + File parent = file.getParentFile(); + if (!parent.exists()) + { + parent.mkdirs(); + } + if (parent.exists()) + { + try + { + file.createNewFile(); + } + catch (IOException e) + { + throw new IllegalConfigurationException("Cannot create group file"); + } + } + else + { + throw new IllegalConfigurationException("Cannot create group file"); + } + } + try + { + _groupDatabase.setGroupFile(getPath()); + } + catch (IOException e) + { + setState(getState(), State.ERRORED); + LOGGER.warn(("Unable to open preferences file at " + _path)); + } + + } @Override @@ -147,12 +224,16 @@ public class FileBasedGroupProviderImpl String groupName = (String) attributes.get(Group.NAME); getSecurityManager().authoriseGroupOperation(Operation.CREATE, groupName); - _groupManager.createGroup(groupName); + + _groupDatabase.createGroup(groupName); + Map<String,Object> attrMap = new HashMap<String, Object>(); UUID id = UUIDGenerator.generateGroupUUID(getName(),groupName); attrMap.put(Group.ID, id); attrMap.put(Group.NAME, groupName); - return (C) new GroupAdapter(attrMap, getTaskExecutor()); + GroupAdapter groupAdapter = new GroupAdapter(attrMap, getTaskExecutor()); + groupAdapter.create(); + return (C) groupAdapter; } @@ -161,35 +242,25 @@ public class FileBasedGroupProviderImpl + childClass); } - @SuppressWarnings("unchecked") - @Override - public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz) + private Set<Principal> getGroupPrincipals() { - if (clazz == Group.class) + + Set<String> groups = _groupDatabase == null ? Collections.<String>emptySet() : _groupDatabase.getAllGroups(); + if (groups.isEmpty()) { - Set<Principal> groups = _groupManager == null ? Collections.<Principal>emptySet() : _groupManager.getGroupPrincipals(); - Collection<Group> principals = new ArrayList<Group>(groups.size()); - for (Principal group : groups) - { - Map<String,Object> attrMap = new HashMap<String, Object>(); - UUID id = UUIDGenerator.generateGroupUUID(getName(),group.getName()); - attrMap.put(Group.ID, id); - attrMap.put(Group.NAME, group.getName()); - principals.add(new GroupAdapter(attrMap, getTaskExecutor())); - } - return (Collection<C>) Collections - .unmodifiableCollection(principals); + return Collections.emptySet(); } else { - return null; + Set<Principal> principals = new HashSet<Principal>(); + for (String groupName : groups) + { + principals.add(new GroupPrincipal(groupName)); + } + return principals; } } - public GroupManager getGroupManager() - { - return _groupManager; - } private SecurityManager getSecurityManager() { @@ -207,7 +278,15 @@ public class FileBasedGroupProviderImpl { try { - _groupManager.open(); + try + { + _groupDatabase.setGroupFile(getPath()); + } + catch (IOException e) + { + throw new IllegalConfigurationException("Unable to set group file " + getPath(), e); + } + return true; } catch(RuntimeException e) @@ -232,7 +311,6 @@ public class FileBasedGroupProviderImpl { if (_state.compareAndSet(state, State.STOPPED)) { - _groupManager.close(); return true; } else @@ -245,8 +323,15 @@ public class FileBasedGroupProviderImpl if ((state == State.INITIALISING || state == State.ACTIVE || state == State.STOPPED || state == State.QUIESCED || state == State.ERRORED) && _state.compareAndSet(state, State.DELETED)) { - _groupManager.close(); - _groupManager.onDelete(); + File file = new File(getPath()); + if (file.exists()) + { + if (!file.delete()) + { + throw new IllegalConfigurationException("Cannot delete group file"); + } + } + deleted(); return true; } @@ -267,7 +352,20 @@ public class FileBasedGroupProviderImpl public Set<Principal> getGroupPrincipalsForUser(String username) { - return _groupManager.getGroupPrincipalsForUser(username); + Set<String> groups = _groupDatabase.getGroupsForUser(username); + if (groups.isEmpty()) + { + return Collections.emptySet(); + } + else + { + Set<Principal> principals = new HashSet<Principal>(); + for (String groupName : groups) + { + principals.add(new GroupPrincipal(groupName)); + } + return principals; + } } @Override @@ -337,6 +435,24 @@ public class FileBasedGroupProviderImpl } @Override + protected void onOpen() + { + super.onOpen(); + Set<Principal> usersInGroup = getUserPrincipalsForGroup(getName()); + Collection<GroupMember> members = new ArrayList<GroupMember>(); + for (Principal principal : usersInGroup) + { + UUID id = UUIDGenerator.generateGroupMemberUUID(FileBasedGroupProviderImpl.this.getName(), getName(), principal.getName()); + Map<String,Object> attrMap = new HashMap<String, Object>(); + attrMap.put(GroupMember.ID,id); + attrMap.put(GroupMember.NAME, principal.getName()); + GroupMemberAdapter groupMemberAdapter = new GroupMemberAdapter(attrMap, getTaskExecutor()); + groupMemberAdapter.open(); + members.add(groupMemberAdapter); + } + } + + @Override protected void validateChange(final ConfiguredObject<?> proxyForValidation, final Set<String> changedAttributes) { super.validateChange(proxyForValidation, changedAttributes); @@ -346,33 +462,25 @@ public class FileBasedGroupProviderImpl } } - @Override - public <C extends ConfiguredObject> Collection<C> getChildren( - Class<C> clazz) + private Set<Principal> getUserPrincipalsForGroup(String group) { - if (clazz == GroupMember.class) + Set<String> users = _groupDatabase.getUsersInGroup(group); + if (users.isEmpty()) { - Set<Principal> usersInGroup = _groupManager - .getUserPrincipalsForGroup(getName()); - Collection<GroupMember> members = new ArrayList<GroupMember>(); - for (Principal principal : usersInGroup) - { - UUID id = UUIDGenerator.generateGroupMemberUUID(FileBasedGroupProviderImpl.this.getName(), getName(), principal.getName()); - Map<String,Object> attrMap = new HashMap<String, Object>(); - attrMap.put(GroupMember.ID,id); - attrMap.put(GroupMember.NAME, principal.getName()); - members.add(new GroupMemberAdapter(attrMap, getTaskExecutor())); - } - return (Collection<C>) Collections - .unmodifiableCollection(members); + return Collections.emptySet(); } else { - return null; + Set<Principal> principals = new HashSet<Principal>(); + for (String user : users) + { + principals.add(new UsernamePrincipal(user)); + } + return principals; } - } + @Override public <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, @@ -384,12 +492,14 @@ public class FileBasedGroupProviderImpl getSecurityManager().authoriseGroupOperation(Operation.UPDATE, getName()); - _groupManager.addUserToGroup(memberName, getName()); + _groupDatabase.addUserToGroup(memberName, getName()); UUID id = UUIDGenerator.generateGroupMemberUUID(FileBasedGroupProviderImpl.this.getName(), getName(), memberName); Map<String,Object> attrMap = new HashMap<String, Object>(); attrMap.put(GroupMember.ID,id); attrMap.put(GroupMember.NAME, memberName); - return (C) new GroupMemberAdapter(attrMap, getTaskExecutor()); + GroupMemberAdapter groupMemberAdapter = new GroupMemberAdapter(attrMap, getTaskExecutor()); + groupMemberAdapter.create(); + return (C) groupMemberAdapter; } @@ -405,7 +515,8 @@ public class FileBasedGroupProviderImpl if (desiredState == State.DELETED) { getSecurityManager().authoriseGroupOperation(Operation.DELETE, getName()); - _groupManager.removeGroup(getName()); + _groupDatabase.removeGroup(getName()); + deleted(); return true; } @@ -479,7 +590,8 @@ public class FileBasedGroupProviderImpl { getSecurityManager().authoriseGroupOperation(Operation.UPDATE, GroupAdapter.this.getName()); - _groupManager.removeUserFromGroup(getName(), GroupAdapter.this.getName()); + _groupDatabase.removeUserFromGroup(getName(), GroupAdapter.this.getName()); + deleted(); return true; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java index 13d80bbe0e..3b55fcb1bd 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java @@ -24,12 +24,17 @@ import java.security.AccessControlException; import java.util.Collection; import java.util.Collections; import java.util.HashMap; - import java.util.Map; import java.util.UUID; -import org.apache.qpid.server.model.*; import org.apache.qpid.server.configuration.updater.TaskExecutor; +import org.apache.qpid.server.model.AbstractConfiguredObject; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.Consumer; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Publisher; +import org.apache.qpid.server.model.Session; +import org.apache.qpid.server.model.State; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.ConsumerListener; @@ -59,8 +64,10 @@ final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> impl public void consumerRemoved(final Consumer<?> consumer) { childRemoved(consumer); + } }); + session.setModelObject(this); open(); } @@ -159,6 +166,12 @@ final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> impl return _session.getUnacknowledgedMessageCount(); } + @Override + public void delete() + { + deleted(); + } + @Override protected boolean setState(State currentState, State desiredState) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java index be9240c64c..fd8e680bef 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java @@ -29,6 +29,7 @@ import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.ConfiguredObjectFactory; +import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl; import org.apache.qpid.server.model.Model; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Protocol; @@ -133,7 +134,7 @@ public class PortFactory<X extends Port<X>> implements ConfiguredObjectTypeFacto { if(_configuredObjectFactory == null) { - _configuredObjectFactory = new ConfiguredObjectFactory(Model.getInstance()); + _configuredObjectFactory = new ConfiguredObjectFactoryImpl(Model.getInstance()); } } return _configuredObjectFactory.getConfiguredObjectTypeFactory(Port.class.getSimpleName(), type); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java index e7c14a7a4d..0df40bfff6 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java @@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentSkipListSet; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.model.Consumer; +import org.apache.qpid.server.model.Session; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.util.Deletable; @@ -94,4 +95,8 @@ public interface AMQSessionModel<T extends AMQSessionModel<T,C>, C extends AMQCo void addConsumerListener(ConsumerListener listener); void removeConsumerListener(ConsumerListener listener); + + void setModelObject(Session<?> session); + + Session<?> getModelObject(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index 8b5ea1e964..9a06569397 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -29,7 +29,6 @@ import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.server.configuration.BrokerProperties; -import org.apache.qpid.server.exchange.DefaultExchangeFactory; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.store.DurableConfigurationStoreHelper; @@ -191,7 +190,7 @@ public class AMQQueueFactory implements QueueFactory { - args.put(Queue.ID, UUIDGenerator.generateQueueUUID(dlQueueName, _virtualHost.getName())); + args.put(Queue.ID, UUID.randomUUID()); args.put(Queue.NAME, dlQueueName); args.put(Queue.DURABLE, true); dlQueue = _virtualHost.createQueue(args); @@ -269,7 +268,7 @@ public class AMQQueueFactory implements QueueFactory private static String getDeadLetterExchangeName(String name) { - return name + System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX); + return name + System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index fb5b5dc8bf..7738281034 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -826,6 +826,7 @@ public abstract class AbstractQueue // we need to manually fire the event to the removed consumer (which was the last one left for this // queue. This is because the delete method uses the consumer set which has just been cleared consumer.queueDeleted(); + } } @@ -1645,7 +1646,7 @@ public abstract class AbstractQueue _deleteTaskList.clear(); stop(); - + deleted(); //Log Queue Deletion getEventLogger().message(_logSubject, QueueMessages.DELETED()); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java index c2778085d1..a54b3ae7f0 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.queue; -import java.util.ArrayList; import java.util.Collection; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -33,8 +32,6 @@ public class DefaultQueueRegistry implements QueueRegistry private ConcurrentMap<String, AMQQueue<?>> _queueMap = new ConcurrentHashMap<String, AMQQueue<?>>(); private final VirtualHostImpl _virtualHost; - private final Collection<RegistryChangeListener> _listeners = - new ArrayList<RegistryChangeListener>(); public DefaultQueueRegistry(VirtualHostImpl virtualHost) { @@ -49,28 +46,11 @@ public class DefaultQueueRegistry implements QueueRegistry public void registerQueue(AMQQueue queue) { _queueMap.put(queue.getName(), queue); - synchronized (_listeners) - { - for(RegistryChangeListener listener : _listeners) - { - listener.queueRegistered(queue); - } - } } public void unregisterQueue(String name) { AMQQueue q = _queueMap.remove(name); - if(q != null) - { - synchronized (_listeners) - { - for(RegistryChangeListener listener : _listeners) - { - listener.queueUnregistered(q); - } - } - } } @@ -84,31 +64,12 @@ public class DefaultQueueRegistry implements QueueRegistry return queue == null ? null : _queueMap.get(queue); } - public void addRegistryChangeListener(RegistryChangeListener listener) - { - synchronized(_listeners) - { - _listeners.add(listener); - } - } - @Override - public void stopAllAndUnregisterMBeans() + public void close() { for (final AMQQueue queue : getQueues()) { queue.stop(); - - //TODO: this is a bit of a hack, what if the listeners aren't aware - //that we are just unregistering the MBean because of HA, and aren't - //actually removing the queue as such. - synchronized (_listeners) - { - for(RegistryChangeListener listener : _listeners) - { - listener.queueUnregistered(queue); - } - } } _queueMap.clear(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java index ae1963d2a3..b3d4e4368f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java @@ -115,7 +115,7 @@ class QueueConsumerImpl final Class<? extends ServerMessage> messageClass, EnumSet<Option> optionSet) { - super(parentsMap(queue), + super(parentsMap(queue, target.getSessionModel().getModelObject()), createAttributeMap(consumerName, filters, optionSet), queue.getVirtualHost().getTaskExecutor()); _messageClass = messageClass; @@ -255,6 +255,7 @@ class QueueConsumerImpl _target.close(); _target.consumerRemoved(this); _queue.unregisterConsumer(this); + deleted(); } finally { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java index 747e0d8959..5a90536476 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java @@ -37,9 +37,7 @@ public interface QueueRegistry AMQQueue<?> getQueue(String queue); - void addRegistryChangeListener(RegistryChangeListener listener); - - void stopAllAndUnregisterMBeans(); + void close(); AMQQueue<?> getQueue(UUID queueId); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java index 01281676b1..d3eea96e16 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java @@ -37,6 +37,7 @@ import org.apache.qpid.server.model.AuthenticationProvider; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.ConfiguredObjectFactory; +import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl; import org.apache.qpid.server.model.IllegalStateTransitionException; import org.apache.qpid.server.model.IntegrityViolationException; import org.apache.qpid.server.model.Model; @@ -158,7 +159,7 @@ public abstract class AbstractAuthenticationManager<T extends AbstractAuthentica if(childClass == PreferencesProvider.class) { // TODO RG - get the configured object factory from parents - ConfiguredObjectFactory factory = new ConfiguredObjectFactory(Model.getInstance()); + ConfiguredObjectFactory factory = new ConfiguredObjectFactoryImpl(Model.getInstance()); attributes = new HashMap<String, Object>(attributes); attributes.put(ConfiguredObject.ID, UUID.randomUUID()); final ConfiguredObjectTypeFactory preferencesFactory = @@ -236,7 +237,6 @@ public abstract class AbstractAuthenticationManager<T extends AbstractAuthentica { try { - initialise(); if (_preferencesProvider != null) { _preferencesProvider.setDesiredState(_preferencesProvider.getState(), State.ACTIVE); @@ -256,6 +256,10 @@ public abstract class AbstractAuthenticationManager<T extends AbstractAuthentica } } } + if(state == State.ERRORED) + { + return false; + } else { throw new IllegalStateException("Cannot activate authentication provider in state: " + state); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java index 3e756f5210..cb3729e4e3 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java @@ -25,26 +25,35 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.security.AccessControlException; import java.security.Principal; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import javax.security.auth.login.AccountNotFoundException; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; import org.apache.log4j.Logger; + import org.apache.qpid.server.configuration.IllegalConfigurationException; -import org.apache.qpid.server.model.*; import org.apache.qpid.server.model.AbstractConfiguredObject; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.ExternalFileBasedAuthenticationManager; +import org.apache.qpid.server.model.IllegalStateTransitionException; +import org.apache.qpid.server.model.ManagedAttributeField; +import org.apache.qpid.server.model.PreferencesProvider; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.model.User; import org.apache.qpid.server.security.access.Operation; import org.apache.qpid.server.security.auth.AuthenticationResult; -import org.apache.qpid.server.security.auth.UsernamePrincipal; import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus; +import org.apache.qpid.server.security.auth.UsernamePrincipal; import org.apache.qpid.server.security.auth.database.PrincipalDatabase; public abstract class PrincipalDatabaseAuthenticationManager<T extends PrincipalDatabaseAuthenticationManager<T>> @@ -55,6 +64,8 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal private static final Logger LOGGER = Logger.getLogger(PrincipalDatabaseAuthenticationManager.class); + private final Map<Principal, PrincipalAdapter> _userMap = new ConcurrentHashMap<Principal, PrincipalAdapter>(); + private PrincipalDatabase _principalDatabase; @ManagedAttributeField private String _path; @@ -92,6 +103,21 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal { super.onOpen(); _principalDatabase = createDatabase(); + try + { + initialise(); + List<Principal> users = + _principalDatabase == null ? Collections.<Principal>emptyList() : _principalDatabase.getUsers(); + for (Principal user : users) + { + _userMap.put(user, new PrincipalAdapter(user)); + } + } + catch(IllegalConfigurationException e) + { + updateState(getState(), State.ERRORED); + + } } protected abstract PrincipalDatabase createDatabase(); @@ -202,16 +228,41 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal public boolean createUser(String username, String password, Map<String, String> attributes) { getSecurityManager().authoriseUserOperation(Operation.CREATE, username); - return getPrincipalDatabase().createPrincipal(new UsernamePrincipal(username), password.toCharArray()); + Principal principal = new UsernamePrincipal(username); + boolean created = + getPrincipalDatabase().createPrincipal(principal, password.toCharArray()); + if(created) + { + principal = getPrincipalDatabase().getUser(username); + + _userMap.put(principal, new PrincipalAdapter(principal)); + } + return created; } - @Override - public void deleteUser(String username) throws AccountNotFoundException + + private void deleteUserFromDatabase(String username) throws AccountNotFoundException { getSecurityManager().authoriseUserOperation(Operation.DELETE, username); - getPrincipalDatabase().deletePrincipal(new UsernamePrincipal(username)); + UsernamePrincipal principal = new UsernamePrincipal(username); + getPrincipalDatabase().deletePrincipal(principal); + _userMap.remove(principal); + } + @Override + public void deleteUser(String username) throws AccountNotFoundException + { + UsernamePrincipal principal = new UsernamePrincipal(username); + PrincipalAdapter user = _userMap.get(principal); + if(user != null) + { + user.setState(user.getState(), State.DELETED); + } + else + { + deleteUserFromDatabase(username); + } } private org.apache.qpid.server.security.SecurityManager getSecurityManager() @@ -258,9 +309,7 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal if(createUser(username, password,null)) { - @SuppressWarnings("unchecked") - C principalAdapter = (C) new PrincipalAdapter(p); - return principalAdapter; + return (C) _userMap.get(p); } else { @@ -276,23 +325,7 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal @Override public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz) { - if(clazz == User.class) - { - PrincipalDatabase principalDatabase = getPrincipalDatabase(); - List<Principal> users = principalDatabase == null ? Collections.<Principal>emptyList() : principalDatabase.getUsers(); - Collection<User> principals = new ArrayList<User>(users.size()); - for(Principal user : users) - { - principals.add(new PrincipalAdapter(user)); - } - @SuppressWarnings("unchecked") - Collection<C> unmodifiablePrincipals = (Collection<C>) Collections.unmodifiableCollection(principals); - return unmodifiablePrincipals; - } - else - { - return super.getChildren(clazz); - } + return super.getChildren(clazz); } @Override @@ -426,12 +459,13 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal try { String userName = _user.getName(); - deleteUser(userName); + deleteUserFromDatabase(userName); PreferencesProvider preferencesProvider = getPreferencesProvider(); if (preferencesProvider != null) { preferencesProvider.deletePreferences(userName); } + deleted(); } catch (AccountNotFoundException e) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManager.java index f9d25e3ec0..5628d49949 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManager.java @@ -478,8 +478,8 @@ public class ScramSHA1AuthenticationManager { _authenticationManager.getSecurityManager().authoriseUserOperation(Operation.DELETE, getName()); _authenticationManager._users.remove(getName()); - _authenticationManager.unregisterChild(this); - _authenticationManager.childRemoved(this); + _authenticationManager.deleted(); + deleted(); return true; } else diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/FileGroupManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/FileGroupManager.java deleted file mode 100644 index e11a4f83db..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/FileGroupManager.java +++ /dev/null @@ -1,239 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.security.group; - -import java.io.File; -import java.io.IOException; -import java.security.Principal; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; - -import org.apache.qpid.server.configuration.IllegalConfigurationException; -import org.apache.qpid.server.security.auth.UsernamePrincipal; - -/** - * Implementation of a group manager whose implementation is backed by a flat group file. - * <p> - * This plugin is configured in the following manner: - * </p> - * <pre> - * "groupproviders":[ - * ... - * { - * "name" : "...", - * "type" : "GroupFile", - * "path" : "path/to/file/with/groups", - * } - * ... - * ] - * </pre> - */ -public class FileGroupManager implements GroupManager -{ - private final FileGroupDatabase _groupDatabase; - private final String _groupFile; - - public FileGroupManager(String groupFile) - { - _groupFile = groupFile; - _groupDatabase = new FileGroupDatabase(); - } - - @Override - public Set<Principal> getGroupPrincipalsForUser(String userId) - { - Set<String> groups = _groupDatabase.getGroupsForUser(userId); - if (groups.isEmpty()) - { - return Collections.emptySet(); - } - else - { - Set<Principal> principals = new HashSet<Principal>(); - for (String groupName : groups) - { - principals.add(new GroupPrincipal(groupName)); - } - return principals; - } - } - - @Override - public Set<Principal> getUserPrincipalsForGroup(String group) - { - Set<String> users = _groupDatabase.getUsersInGroup(group); - if (users.isEmpty()) - { - return Collections.emptySet(); - } - else - { - Set<Principal> principals = new HashSet<Principal>(); - for (String user : users) - { - principals.add(new UsernamePrincipal(user)); - } - return principals; - } - } - - @Override - public Set<Principal> getGroupPrincipals() - { - Set<String> groups = _groupDatabase.getAllGroups(); - if (groups.isEmpty()) - { - return Collections.emptySet(); - } - else - { - Set<Principal> principals = new HashSet<Principal>(); - for (String groupName : groups) - { - principals.add(new GroupPrincipal(groupName)); - } - return principals; - } - } - - @Override - public void createGroup(String group) - { - _groupDatabase.createGroup(group); - } - - @Override - public void removeGroup(String group) - { - _groupDatabase.removeGroup(group); - } - - @Override - public void addUserToGroup(String user, String group) - { - _groupDatabase.addUserToGroup(user, group); - } - - @Override - public void removeUserFromGroup(String user, String group) - { - _groupDatabase.removeUserFromGroup(user, group); - - } - - @Override - public void onDelete() - { - File file = new File(_groupFile); - if (file.exists()) - { - if (!file.delete()) - { - throw new IllegalConfigurationException("Cannot delete group file"); - } - } - } - - @Override - public void onCreate() - { - File file = new File(_groupFile); - if (!file.exists()) - { - File parent = file.getParentFile(); - if (!parent.exists()) - { - parent.mkdirs(); - } - if (parent.exists()) - { - try - { - file.createNewFile(); - } - catch (IOException e) - { - throw new IllegalConfigurationException("Cannot create group file"); - } - } - else - { - throw new IllegalConfigurationException("Cannot create group file"); - } - } - } - - @Override - public void open() - { - try - { - _groupDatabase.setGroupFile(_groupFile); - } - catch (IOException e) - { - throw new IllegalConfigurationException("Unable to set group file " + _groupFile, e); - } - } - - @Override - public void close() - { - // no-op - } - - @Override - public int hashCode() - { - return ((_groupFile == null) ? 0 : _groupFile.hashCode()); - } - - @Override - public boolean equals(Object obj) - { - if (this == obj) - { - return true; - } - if (obj == null) - { - return false; - } - if (getClass() != obj.getClass()) - { - return false; - } - FileGroupManager other = (FileGroupManager) obj; - if (_groupFile == null) - { - if (other._groupFile != null) - { - return false; - } - else - { - return true; - } - } - return _groupFile.equals(other._groupFile); - } - -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/FileGroupManagerFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/FileGroupManagerFactory.java deleted file mode 100644 index 50f08623cd..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/FileGroupManagerFactory.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.server.security.group; - -import static org.apache.qpid.server.util.MapValueConverter.getStringAttribute; - -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Map; - -import org.apache.qpid.server.configuration.IllegalConfigurationException; -import org.apache.qpid.server.plugin.GroupManagerFactory; -import org.apache.qpid.server.util.ResourceBundleLoader; - -public class FileGroupManagerFactory implements GroupManagerFactory -{ - public static final String RESOURCE_BUNDLE = "org.apache.qpid.server.security.group.FileGroupProviderAttributeDescriptions"; - - public static final String GROUP_FILE_PROVIDER_TYPE = "GroupFile"; - public static final String PATH = "path"; - - public static final Collection<String> ATTRIBUTES = Collections.<String> unmodifiableList(Arrays.asList( - ATTRIBUTE_TYPE, - PATH - )); - - @Override - public GroupManager createInstance(Map<String, Object> attributes) - { - if(attributes == null || !GROUP_FILE_PROVIDER_TYPE.equals(attributes.get(ATTRIBUTE_TYPE))) - { - return null; - } - - String groupFile = getStringAttribute(PATH, attributes, null); - if (groupFile == null || "".equals(groupFile.trim())) - { - throw new IllegalConfigurationException("Path to file containing groups is not specified!"); - } - - return new FileGroupManager(groupFile); - } - - @Override - public String getType() - { - return GROUP_FILE_PROVIDER_TYPE; - } - - @Override - public Collection<String> getAttributeNames() - { - return ATTRIBUTES; - } - - @Override - public Map<String, String> getAttributeDescriptions() - { - return ResourceBundleLoader.getResources(RESOURCE_BUNDLE); - } - -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index c28c440a87..c49f626242 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -42,15 +42,13 @@ import javax.security.auth.Subject; import org.apache.log4j.Logger; +import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.connection.ConnectionRegistry; import org.apache.qpid.server.connection.IConnectionRegistry; import org.apache.qpid.server.exchange.AMQUnknownExchangeType; -import org.apache.qpid.server.exchange.DefaultExchangeFactory; -import org.apache.qpid.server.exchange.DefaultExchangeRegistry; -import org.apache.qpid.server.exchange.ExchangeFactory; +import org.apache.qpid.server.exchange.DefaultDestination; import org.apache.qpid.server.exchange.ExchangeImpl; -import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.messages.ConfigStoreMessages; import org.apache.qpid.server.logging.messages.MessageStoreMessages; @@ -64,7 +62,7 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.*; import org.apache.qpid.server.model.adapter.ConnectionAdapter; import org.apache.qpid.server.model.adapter.VirtualHostAliasAdapter; -import org.apache.qpid.server.plugin.ExchangeType; +import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory; import org.apache.qpid.server.plugin.QpidServiceLoader; import org.apache.qpid.server.plugin.SystemNodeCreator; import org.apache.qpid.server.protocol.AMQConnectionModel; @@ -78,6 +76,7 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.access.Operation; import org.apache.qpid.server.stats.StatisticsCounter; +import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.DurableConfigurationStoreHelper; import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer; @@ -85,6 +84,7 @@ import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreException; +import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; import org.apache.qpid.server.txn.DtxRegistry; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; @@ -106,10 +106,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte private final QueueRegistry _queueRegistry; - private final ExchangeRegistry _exchangeRegistry; - - private final ExchangeFactory _exchangeFactory; - private final ConnectionRegistry _connectionRegistry; private final DtxRegistry _dtxRegistry; @@ -166,6 +162,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte @ManagedAttributeField private String _securityAcl; + private MessageDestination _defaultDestination; public AbstractVirtualHost(final Map<String, Object> attributes, Broker<?> broker) @@ -186,9 +183,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte _queueFactory = new AMQQueueFactory(this, _queueRegistry); - _exchangeFactory = new DefaultExchangeFactory(this); - - _exchangeRegistry = new DefaultExchangeRegistry(this, _queueRegistry); + _defaultDestination = new DefaultDestination(this); } @@ -294,6 +289,40 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte abstract protected void initialiseStorage(org.apache.qpid.server.model.VirtualHost<?,?,?> virtualHost); + protected boolean isStoreEmpty() + { + final IsStoreEmptyHandler isStoreEmptyHandler = new IsStoreEmptyHandler(); + + getDurableConfigurationStore().visitConfiguredObjectRecords(isStoreEmptyHandler); + + return isStoreEmptyHandler.isEmpty(); + } + + protected void createDefaultExchanges() + { + Subject.doAs(getSecurityManager().getSubjectWithAddedSystemRights(), new PrivilegedAction<Void>() + { + @Override + public Void run() + { + addStandardExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); + addStandardExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS); + addStandardExchange(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); + addStandardExchange(ExchangeDefaults.FANOUT_EXCHANGE_NAME, ExchangeDefaults.FANOUT_EXCHANGE_CLASS); + return null; + } + + void addStandardExchange(String name, String type) + { + Map<String, Object> attributes = new HashMap<String, Object>(); + attributes.put(Exchange.NAME, name); + attributes.put(Exchange.TYPE, type); + attributes.put(Exchange.ID, UUIDGenerator.generateExchangeUUID(name, getName())); + childAdded(addExchange(attributes)); + } + }); + } + abstract protected MessageStoreLogSubject getMessageStoreLogSubject(); public IConnectionRegistry getConnectionRegistry() @@ -387,11 +416,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte @Override public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz) { - if(clazz == Exchange.class) - { - return (Collection<C>) getExchanges(); - } - else if(clazz == Queue.class) + if(clazz == Queue.class) { return (Collection<C>) getQueues(); } @@ -405,12 +430,12 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } else { - return Collections.emptySet(); + return super.getChildren(clazz); } } @Override - public <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) + protected <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) { checkVHostStateIsActive(); if(childClass == Exchange.class) @@ -436,13 +461,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte public Collection<String> getExchangeTypeNames() { - Collection<String> exchangeTypes = new ArrayList<String>(); - - for(ExchangeType<? extends ExchangeImpl> type : getExchangeTypes()) - { - exchangeTypes.add(type.getType()); - } - return Collections.unmodifiableCollection(exchangeTypes); + return getObjectFactory().getSupportedTypes(Exchange.class); } @@ -540,20 +559,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } - protected void initialiseModel() - { - Subject.doAs(getSecurityManager().getSubjectWithAddedSystemRights(), new PrivilegedAction<Object>() - { - @Override - public Object run() - { - _exchangeRegistry.initialise(_exchangeFactory); - return null; - } - }); - } - - public long getCreateTime() { return _createTime; @@ -564,63 +569,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte return _queueRegistry; } - protected ExchangeRegistry getExchangeRegistry() - { - return _exchangeRegistry; - } - - protected ExchangeFactory getExchangeFactory() - { - return _exchangeFactory; - } - - @Override - public void addVirtualHostListener(final VirtualHostListener listener) - { - _exchangeRegistry.addRegistryChangeListener(new ExchangeRegistry.RegistryChangeListener() - { - @Override - public void exchangeRegistered(ExchangeImpl exchange) - { - listener.exchangeRegistered(exchange); - } - - @Override - public void exchangeUnregistered(ExchangeImpl exchange) - { - listener.exchangeUnregistered(exchange); - } - }); - _queueRegistry.addRegistryChangeListener(new QueueRegistry.RegistryChangeListener() - { - @Override - public void queueRegistered(AMQQueue queue) - { - listener.queueRegistered(queue); - } - - @Override - public void queueUnregistered(AMQQueue queue) - { - listener.queueUnregistered(queue); - } - }); - _connectionRegistry.addRegistryChangeListener(new IConnectionRegistry.RegistryChangeListener() - { - @Override - public void connectionRegistered(AMQConnectionModel connection) - { - listener.connectionRegistered(connection); - } - - @Override - public void connectionUnregistered(AMQConnectionModel connection) - { - listener.connectionUnregistered(connection); - } - }); - } - @Override public AMQQueue<?> getQueue(String name) { @@ -716,7 +664,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte if(!attributes.containsKey(Queue.ID)) { - UUID id = UUIDGenerator.generateQueueUUID(queueName, getName()); + UUID id = UUID.randomUUID(); while(_queueRegistry.getQueue(id) != null) { id = UUID.randomUUID(); @@ -749,31 +697,26 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte @Override public ExchangeImpl getExchange(String name) { - return _exchangeRegistry.getExchange(name); + return getChildByName(ExchangeImpl.class,name); } @Override public ExchangeImpl getExchange(UUID id) { - return _exchangeRegistry.getExchange(id); + return getChildById(ExchangeImpl.class, id); } @Override public MessageDestination getDefaultDestination() { - return _exchangeRegistry.getDefaultExchange(); + return _defaultDestination; } @Override public Collection<ExchangeImpl<?>> getExchanges() { - return Collections.unmodifiableCollection(_exchangeRegistry.getExchanges()); - } - - @Override - public Collection<ExchangeType<? extends ExchangeImpl>> getExchangeTypes() - { - return _exchangeFactory.getRegisteredTypes(); + Collection children = getChildren(Exchange.class); + return children; } public ExchangeImpl<?> createExchange(final String name, @@ -889,69 +832,27 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte throws ExchangeExistsException, ReservedExchangeNameException, UnknownExchangeException, AMQUnknownExchangeType { - String name = MapValueConverter.getStringAttribute(org.apache.qpid.server.model.Exchange.NAME, attributes); - if(attributes.get(Exchange.DURABLE) == null) - { - attributes = new HashMap<String, Object>(attributes); - attributes.put(Exchange.DURABLE, false); - } - boolean durable = - MapValueConverter.getBooleanAttribute(Exchange.DURABLE, attributes); + Broker<?> broker = getParent(Broker.class); + ConfiguredObjectTypeFactory<? extends Exchange> factory = + broker.getObjectFactory().getConfiguredObjectTypeFactory(Exchange.class, attributes); - synchronized (_exchangeRegistry) + try { - ExchangeImpl existing; - if((existing = _exchangeRegistry.getExchange(name)) !=null) - { - throw new ExchangeExistsException(name,existing); - } - if(_exchangeRegistry.isReservedExchangeName(name)) - { - throw new ReservedExchangeNameException(name); - } - - - if(attributes.get(org.apache.qpid.server.model.Exchange.ID) == null) - { - attributes = new LinkedHashMap<String, Object>(attributes); - attributes.put(org.apache.qpid.server.model.Exchange.ID, - UUIDGenerator.generateExchangeUUID(name, getName())); - } - - ExchangeImpl exchange = _exchangeFactory.createExchange(attributes); - - _exchangeRegistry.registerExchange(exchange); - if(durable) - { - DurableConfigurationStoreHelper.createExchange(getDurableConfigurationStore(), exchange); - } - return exchange; + return (ExchangeImpl) factory.create(attributes, this); + } + catch (DuplicateNameException e) + { + throw new ExchangeExistsException(getExchange(e.getName())); } + } @Override public void removeExchange(ExchangeImpl exchange, boolean force) throws ExchangeIsAlternateException, RequiredExchangeException { - if(exchange.hasReferrers()) - { - throw new ExchangeIsAlternateException(exchange.getName()); - } - - for(ExchangeType type : getExchangeTypes()) - { - if(type.getDefaultExchangeName().equals( exchange.getName() )) - { - throw new RequiredExchangeException(exchange.getName()); - } - } - _exchangeRegistry.unregisterExchange(exchange.getName(), !force); - if (exchange.isDurable() && !exchange.isAutoDelete()) - { - DurableConfigurationStoreHelper.removeExchange(getDurableConfigurationStore(), exchange); - } - + exchange.delete(); } public SecurityManager getSecurityManager() @@ -959,18 +860,21 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte return _broker.getSecurityManager(); } + @Override + public ConfiguredObjectFactory getObjectFactory() + { + return _broker.getObjectFactory(); + } + public void close() { //Stop Connections _connectionRegistry.close(); - _queueRegistry.stopAllAndUnregisterMBeans(); + _queueRegistry.close(); _dtxRegistry.close(); closeStorage(); shutdownHouseKeeping(); - // clear exchange objects - _exchangeRegistry.clearAndUnregisterMbeans(); - _state = VirtualHostState.STOPPED; _eventLogger.message(VirtualHostMessages.CLOSED(getName())); @@ -1220,9 +1124,9 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte protected Map<String, DurableConfiguredObjectRecoverer> getDurableConfigurationRecoverers() { DurableConfiguredObjectRecoverer[] recoverers = { - new QueueRecoverer(this, getExchangeRegistry(), _queueFactory), - new ExchangeRecoverer(getExchangeRegistry(), getExchangeFactory()), - new BindingRecoverer(this, getExchangeRegistry()) + new QueueRecoverer(this, _queueFactory), + new ExchangeRecoverer(this), + new BindingRecoverer(this) }; final Map<String, DurableConfiguredObjectRecoverer> recovererMap= new HashMap<String, DurableConfiguredObjectRecoverer>(); @@ -1233,6 +1137,35 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte return recovererMap; } + private static class IsStoreEmptyHandler implements ConfiguredObjectRecordHandler + { + private boolean _empty = true; + + @Override + public void begin() + { + } + + @Override + public boolean handle(final ConfiguredObjectRecord record) + { + // if there is a non vhost record then the store is not empty and we can stop looking at the records + _empty = record.getType().equals(VirtualHost.class.getSimpleName()); + return _empty; + } + + @Override + public void end() + { + + } + + public boolean isEmpty() + { + return _empty; + } + } + private class VirtualHostHouseKeepingTask extends HouseKeepingTask { public VirtualHostHouseKeepingTask() @@ -1425,12 +1358,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } else if(SUPPORTED_EXCHANGE_TYPES.equals(name)) { - List<String> types = new ArrayList<String>(); - for(ExchangeType<?> type : getExchangeTypes()) - { - types.add(type.getType()); - } - return Collections.unmodifiableCollection(types); + return getObjectFactory().getSupportedTypes(Exchange.class); } else if(SUPPORTED_QUEUE_TYPES.equals(name)) { @@ -1443,12 +1371,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte @Override public Collection<String> getSupportedExchangeTypes() { - List<String> types = new ArrayList<String>(); - for(ExchangeType<?> type : getExchangeTypes()) - { - types.add(type.getType()); - } - return Collections.unmodifiableCollection(types); + return getObjectFactory().getSupportedTypes(Exchange.class); } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java index 11623bd36b..a976db05f6 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java @@ -24,10 +24,11 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.UUID; + import org.apache.log4j.Logger; + import org.apache.qpid.server.binding.BindingImpl; import org.apache.qpid.server.exchange.ExchangeImpl; -import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.queue.AMQQueue; @@ -40,13 +41,10 @@ public class BindingRecoverer extends AbstractDurableConfiguredObjectRecoverer<B { private static final Logger _logger = Logger.getLogger(BindingRecoverer.class); - private final ExchangeRegistry _exchangeRegistry; private final VirtualHostImpl _virtualHost; - public BindingRecoverer(final VirtualHostImpl virtualHost, - final ExchangeRegistry exchangeRegistry) + public BindingRecoverer(final VirtualHostImpl virtualHost) { - _exchangeRegistry = exchangeRegistry; _virtualHost = virtualHost; } @@ -81,7 +79,7 @@ public class BindingRecoverer extends AbstractDurableConfiguredObjectRecoverer<B _bindingId = record.getId(); _exchangeId = record.getParents().get(Exchange.class.getSimpleName()).getId(); _queueId = record.getParents().get(Queue.class.getSimpleName()).getId(); - _exchange = _exchangeRegistry.getExchange(_exchangeId); + _exchange = _virtualHost.getExchange(_exchangeId); if(_exchange == null) { _unresolvedDependencies.add(new ExchangeDependency()); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeExistsException.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeExistsException.java index 695242726d..109bf85aef 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeExistsException.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeExistsException.java @@ -25,9 +25,9 @@ public class ExchangeExistsException extends RuntimeException { private final ExchangeImpl _existing; - public ExchangeExistsException(String name, ExchangeImpl existing) + public ExchangeExistsException(ExchangeImpl existing) { - super(name); + super(existing.getName()); _existing = existing; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeIsAlternateException.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeIsAlternateException.java index 4674a4a534..3e70cd865a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeIsAlternateException.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeIsAlternateException.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.virtualhost; -public class ExchangeIsAlternateException extends Exception +public class ExchangeIsAlternateException extends RuntimeException { public ExchangeIsAlternateException(String name) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java index 4431fc786d..4bf7635513 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java @@ -22,26 +22,32 @@ package org.apache.qpid.server.virtualhost; import java.util.HashMap; import java.util.Map; -import java.util.UUID; + import org.apache.qpid.server.exchange.AMQUnknownExchangeType; import org.apache.qpid.server.exchange.ExchangeImpl; -import org.apache.qpid.server.exchange.ExchangeFactory; -import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.ConfiguredObjectFactory; +import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.SystemContext; +import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory; import org.apache.qpid.server.store.AbstractDurableConfiguredObjectRecoverer; import org.apache.qpid.server.store.ConfiguredObjectRecord; +import org.apache.qpid.server.store.UnresolvedConfiguredObject; import org.apache.qpid.server.store.UnresolvedDependency; import org.apache.qpid.server.store.UnresolvedObject; import org.apache.qpid.server.util.ServerScopedRuntimeException; public class ExchangeRecoverer extends AbstractDurableConfiguredObjectRecoverer<ExchangeImpl> { - private final ExchangeRegistry _exchangeRegistry; - private final ExchangeFactory _exchangeFactory; + private final VirtualHostImpl<?,?,?> _vhost; + private final ConfiguredObjectFactory _objectFactory; - public ExchangeRecoverer(final ExchangeRegistry exchangeRegistry, final ExchangeFactory exchangeFactory) + public ExchangeRecoverer(final VirtualHostImpl vhost) { - _exchangeRegistry = exchangeRegistry; - _exchangeFactory = exchangeFactory; + _vhost = vhost; + Broker<?> broker = _vhost.getParent(Broker.class); + SystemContext<?> systemContext = broker.getParent(SystemContext.class); + _objectFactory = systemContext.getObjectFactory(); } @Override @@ -53,31 +59,36 @@ public class ExchangeRecoverer extends AbstractDurableConfiguredObjectRecoverer< @Override public UnresolvedObject<ExchangeImpl> createUnresolvedObject(final ConfiguredObjectRecord record) { - return new UnresolvedExchange(record.getId(), record.getAttributes()); + return new UnresolvedExchange(record); } private class UnresolvedExchange implements UnresolvedObject<ExchangeImpl> { private ExchangeImpl<?> _exchange; - public UnresolvedExchange(final UUID id, - final Map<String, Object> attributeMap) + public UnresolvedExchange(ConfiguredObjectRecord record) { + Map<String,Object> attributeMap = record.getAttributes(); String exchangeName = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.NAME); try { - _exchange = _exchangeRegistry.getExchange(id); + _exchange = _vhost.getExchange(record.getId()); if(_exchange == null) { - _exchange = _exchangeRegistry.getExchange(exchangeName); + _exchange = _vhost.getExchange(exchangeName); } if (_exchange == null) { Map<String,Object> attributesWithId = new HashMap<String,Object>(attributeMap); - attributesWithId.put(org.apache.qpid.server.model.Exchange.ID,id); + attributesWithId.put(org.apache.qpid.server.model.Exchange.ID,record.getId()); attributesWithId.put(org.apache.qpid.server.model.Exchange.DURABLE,true); - _exchange = _exchangeFactory.restoreExchange(attributesWithId); - _exchangeRegistry.registerExchange(_exchange); + + ConfiguredObjectTypeFactory<? extends Exchange> configuredObjectTypeFactory = + _objectFactory.getConfiguredObjectTypeFactory(Exchange.class, attributesWithId); + UnresolvedConfiguredObject<? extends Exchange> unresolvedConfiguredObject = + configuredObjectTypeFactory.recover(record, _vhost); + _exchange = (ExchangeImpl<?>) unresolvedConfiguredObject.resolve(); + } } catch (AMQUnknownExchangeType e) @@ -101,6 +112,7 @@ public class ExchangeRecoverer extends AbstractDurableConfiguredObjectRecoverer< @Override public ExchangeImpl resolve() { + _exchange.open(); return _exchange; } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java index d7fc08f249..788ae8ac9f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java @@ -20,14 +20,15 @@ */ package org.apache.qpid.server.virtualhost; -import java.util.LinkedHashMap; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.UUID; + import org.apache.log4j.Logger; + import org.apache.qpid.server.exchange.ExchangeImpl; -import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueFactory; @@ -40,15 +41,12 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQ { private static final Logger _logger = Logger.getLogger(QueueRecoverer.class); private final VirtualHostImpl _virtualHost; - private final ExchangeRegistry _exchangeRegistry; private final QueueFactory _queueFactory; public QueueRecoverer(final VirtualHostImpl virtualHost, - final ExchangeRegistry exchangeRegistry, final QueueFactory queueFactory) { _virtualHost = virtualHost; - _exchangeRegistry = exchangeRegistry; _queueFactory = queueFactory; } @@ -82,7 +80,7 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQ _id = id; if (_alternateExchangeId != null) { - _alternateExchange = _exchangeRegistry.getExchange(_alternateExchangeId); + _alternateExchange = _virtualHost.getExchange(_alternateExchangeId); if(_alternateExchange == null) { _dependencies.add(new AlternateExchangeDependency()); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/RequiredExchangeException.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/RequiredExchangeException.java index 5073c558da..35824233c9 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/RequiredExchangeException.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/RequiredExchangeException.java @@ -20,10 +20,10 @@ */ package org.apache.qpid.server.virtualhost; -public class RequiredExchangeException extends Exception +public class RequiredExchangeException extends RuntimeException { public RequiredExchangeException(String name) { - super(name); + super("'" + name + "' is a reserved exchange and can't be deleted"); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java index 6f75a67197..e2454b0d18 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java @@ -126,18 +126,21 @@ public class StandardVirtualHost extends AbstractVirtualHost<StandardVirtualHost getEventLogger().message(_messageStoreLogSubject, MessageStoreMessages.CREATED()); Map<String, Object> configurationStoreSettings = virtualHost.getConfigurationStoreSettings(); - String configurationStoreType = configurationStoreSettings == null ? null : (String) configurationStoreSettings.get(DurableConfigurationStore.STORE_TYPE); + String configurationStoreType = configurationStoreSettings == null + ? null + : (String) configurationStoreSettings.get(DurableConfigurationStore.STORE_TYPE); _durableConfigurationStore = initialiseConfigurationStore(configurationStoreType); boolean combinedStores = _durableConfigurationStore == _messageStore; if (combinedStores) { - configurationStoreSettings = new HashMap<String,Object>(messageStoreSettings); + configurationStoreSettings = new HashMap<String, Object>(messageStoreSettings); configurationStoreSettings.put(DurableConfigurationStore.IS_MESSAGE_STORE_TOO, true); } if (!combinedStores) { - _configurationStoreLogSubject = new MessageStoreLogSubject(getName(), _durableConfigurationStore.getClass().getSimpleName()); + _configurationStoreLogSubject = + new MessageStoreLogSubject(getName(), _durableConfigurationStore.getClass().getSimpleName()); getEventLogger().message(_configurationStoreLogSubject, ConfigStoreMessages.CREATED()); } @@ -145,26 +148,33 @@ public class StandardVirtualHost extends AbstractVirtualHost<StandardVirtualHost _messageStore.openMessageStore(virtualHost, virtualHost.getMessageStoreSettings()); - getEventLogger().message(_messageStoreLogSubject, MessageStoreMessages.STORE_LOCATION(_messageStore.getStoreLocation())); + getEventLogger().message(_messageStoreLogSubject, + MessageStoreMessages.STORE_LOCATION(_messageStore.getStoreLocation())); if (_configurationStoreLogSubject != null) { - getEventLogger().message(_configurationStoreLogSubject, ConfigStoreMessages.STORE_LOCATION(configurationStoreSettings.toString())); + getEventLogger().message(_configurationStoreLogSubject, + ConfigStoreMessages.STORE_LOCATION(configurationStoreSettings.toString())); getEventLogger().message(_configurationStoreLogSubject, ConfigStoreMessages.RECOVERY_START()); } - ConfiguredObjectRecordHandler upgraderRecoverer = new ConfiguredObjectRecordRecoveverAndUpgrader(this, getDurableConfigurationRecoverers()); - _durableConfigurationStore.visitConfiguredObjectRecords(upgraderRecoverer); + if (isStoreEmpty()) + { + createDefaultExchanges(); + } + else + { + ConfiguredObjectRecordHandler upgraderRecoverer = + new ConfiguredObjectRecordRecoveverAndUpgrader(this, getDurableConfigurationRecoverers()); + _durableConfigurationStore.visitConfiguredObjectRecords(upgraderRecoverer); + } if (_configurationStoreLogSubject != null) { getEventLogger().message(_configurationStoreLogSubject, ConfigStoreMessages.RECOVERY_COMPLETE()); } - // If store does not have entries for standard exchanges (amq.*), the following will create them. - initialiseModel(); - new MessageStoreRecoverer(this, getMessageStoreLogSubject()).recover(); attainActivation(); @@ -193,4 +203,5 @@ public class StandardVirtualHost extends AbstractVirtualHost<StandardVirtualHost { return _configurationStoreLogSubject; } + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index 9a1390d2e8..91b1a9e408 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -34,8 +34,8 @@ import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.EventLoggerProvider; import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageSource; +import org.apache.qpid.server.model.ConfiguredObjectFactory; import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.security.SecurityManager; @@ -50,6 +50,8 @@ public interface VirtualHostImpl< X extends VirtualHostImpl<X,Q,E>, Q extends AM EventLoggerProvider, VirtualHost<X,Q,E> { + String DEFAULT_DLE_NAME_SUFFIX = "_DLE"; + IConnectionRegistry getConnectionRegistry(); String getName(); @@ -82,15 +84,13 @@ public interface VirtualHostImpl< X extends VirtualHostImpl<X,Q,E>, Q extends AM Collection<E> getExchanges(); - Collection<ExchangeType<? extends ExchangeImpl>> getExchangeTypes(); - DurableConfigurationStore getDurableConfigurationStore(); MessageStore getMessageStore(); SecurityManager getSecurityManager(); - void addVirtualHostListener(VirtualHostListener listener); + ConfiguredObjectFactory getObjectFactory(); void close(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostListener.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostListener.java deleted file mode 100644 index af8b0c8f29..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostListener.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost; - -import org.apache.qpid.server.exchange.ExchangeImpl; -import org.apache.qpid.server.protocol.AMQConnectionModel; -import org.apache.qpid.server.queue.AMQQueue; - -public interface VirtualHostListener -{ - - public void queueRegistered(AMQQueue queue); - - public void queueUnregistered(AMQQueue queue); - - public void connectionRegistered(AMQConnectionModel connection); - - public void connectionUnregistered(AMQConnectionModel connection); - - public void exchangeRegistered(ExchangeImpl exchange); - - public void exchangeUnregistered(ExchangeImpl exchange); -} diff --git a/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory b/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory index c2f1353671..a5dfec8a29 100644 --- a/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory +++ b/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory @@ -35,6 +35,10 @@ org.apache.qpid.server.model.adapter.BrokerAdapterFactory org.apache.qpid.server.model.adapter.StandardVirtualHostFactory org.apache.qpid.server.model.adapter.FileBasedGroupProviderFactory org.apache.qpid.server.model.adapter.FileSystemPreferencesProviderFactory +org.apache.qpid.server.exchange.DirectExchangeFactory +org.apache.qpid.server.exchange.FanoutExchangeFactory +org.apache.qpid.server.exchange.HeadersExchangeFactory +org.apache.qpid.server.exchange.TopicExchangeFactory diff --git a/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.GroupManagerFactory b/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.GroupManagerFactory deleted file mode 100644 index 6bfb55ff18..0000000000 --- a/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.GroupManagerFactory +++ /dev/null @@ -1,19 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -org.apache.qpid.server.security.group.FileGroupManagerFactory diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/BrokerConfigurationStoreCreatorTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/BrokerConfigurationStoreCreatorTest.java index 8df5576715..43b1ed508a 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/BrokerConfigurationStoreCreatorTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/BrokerConfigurationStoreCreatorTest.java @@ -40,7 +40,7 @@ import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.LogRecorder; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.ConfiguredObjectFactory; +import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl; import org.apache.qpid.server.model.Model; import org.apache.qpid.server.model.SystemContext; import org.apache.qpid.server.model.SystemContextImpl; @@ -53,6 +53,7 @@ public class BrokerConfigurationStoreCreatorTest extends QpidTestCase private File _userStoreLocation; private BrokerConfigurationStoreCreator _storeCreator; private SystemContext _systemContext; + private TaskExecutor _taskExecutor; public void setUp() throws Exception { @@ -69,8 +70,10 @@ public class BrokerConfigurationStoreCreatorTest extends QpidTestCase _userStoreLocation = new File(TMP_FOLDER, "_store_" + System.currentTimeMillis() + "_" + getTestName()); final BrokerOptions brokerOptions = mock(BrokerOptions.class); when(brokerOptions.getConfigurationStoreLocation()).thenReturn(_userStoreLocation.getAbsolutePath()); - _systemContext = new SystemContextImpl(new TaskExecutor(), - new ConfiguredObjectFactory(Model.getInstance()), + _taskExecutor = new TaskExecutor(); + _taskExecutor.start(); + _systemContext = new SystemContextImpl(_taskExecutor, + new ConfiguredObjectFactoryImpl(Model.getInstance()), mock(EventLogger.class), mock(LogRecorder.class), brokerOptions); @@ -81,6 +84,7 @@ public class BrokerConfigurationStoreCreatorTest extends QpidTestCase try { super.tearDown(); + _taskExecutor.stop(); } finally { diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java index 04e41aa584..2a718b5c15 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java @@ -43,6 +43,7 @@ import org.apache.qpid.server.model.AuthenticationProvider; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.ConfiguredObjectFactory; +import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl; import org.apache.qpid.server.model.GroupProvider; import org.apache.qpid.server.model.Model; import org.apache.qpid.server.model.Port; @@ -66,14 +67,17 @@ public class BrokerRecovererTest extends TestCase private UUID _authenticationProvider1Id = UUID.randomUUID(); private SystemContext _systemContext; private ConfiguredObjectFactory _configuredObjectFactory; + private TaskExecutor _taskExecutor; @Override protected void setUp() throws Exception { super.setUp(); - _configuredObjectFactory = new ConfiguredObjectFactory(Model.getInstance()); - _systemContext = new SystemContextImpl(mock(TaskExecutor.class), + _configuredObjectFactory = new ConfiguredObjectFactoryImpl(Model.getInstance()); + _taskExecutor = new TaskExecutor(); + _taskExecutor.start(); + _systemContext = new SystemContextImpl(_taskExecutor, _configuredObjectFactory, mock(EventLogger.class), mock(LogRecorder.class), mock(BrokerOptions.class)); when(_brokerEntry.getId()).thenReturn(_brokerId); @@ -93,6 +97,13 @@ public class BrokerRecovererTest extends TestCase _brokerEntryChildren.put(AuthenticationProvider.class.getSimpleName(), Arrays.asList(_authenticationProviderEntry1)); } + @Override + protected void tearDown() throws Exception + { + super.tearDown(); + _taskExecutor.stop(); + } + public void testCreateBrokerAttributes() { Map<String, Object> attributes = new HashMap<String, Object>(); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/FileKeyStoreCreationTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/FileKeyStoreCreationTest.java index d9c0220e5f..07e9cecb8b 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/FileKeyStoreCreationTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/FileKeyStoreCreationTest.java @@ -36,6 +36,7 @@ import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.ConfiguredObjectFactory; +import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl; import org.apache.qpid.server.model.KeyStore; import org.apache.qpid.server.model.Model; import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory; @@ -51,7 +52,7 @@ public class FileKeyStoreCreationTest extends TestCase public void setUp() throws Exception { super.setUp(); - _factory = new ConfiguredObjectFactory(Model.getInstance()); + _factory = new ConfiguredObjectFactoryImpl(Model.getInstance()); } public void testCreateWithAllAttributesProvided() diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostCreationTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostCreationTest.java index 42b66cbb85..ea1d22f9ef 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostCreationTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostCreationTest.java @@ -35,6 +35,9 @@ import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.ConfiguredObjectFactory; +import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl; +import org.apache.qpid.server.model.Model; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.MessageStore; @@ -49,8 +52,11 @@ public class VirtualHostCreationTest extends TestCase { SecurityManager securityManager = mock(SecurityManager.class); ConfigurationEntry entry = mock(ConfigurationEntry.class); + ConfiguredObjectFactory objectFactory = new ConfiguredObjectFactoryImpl(Model.getInstance()); Broker parent = mock(Broker.class); + when(parent.getObjectFactory()).thenReturn(objectFactory); when(parent.getSecurityManager()).thenReturn(securityManager); + when(parent.getCategoryClass()).thenReturn(Broker.class); VirtualHostRegistry virtualHostRegistry = mock(VirtualHostRegistry.class); when(virtualHostRegistry.getEventLogger()).thenReturn(mock(EventLogger.class)); when(parent.getVirtualHostRegistry()).thenReturn(virtualHostRegistry); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ConfigurationEntryStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ConfigurationEntryStoreTestCase.java index 4f8e0d99dc..82ced3c274 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ConfigurationEntryStoreTestCase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ConfigurationEntryStoreTestCase.java @@ -28,6 +28,7 @@ import java.util.UUID; import org.apache.qpid.server.configuration.ConfigurationEntry; import org.apache.qpid.server.configuration.ConfigurationEntryImpl; +import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.model.AuthenticationProvider; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObject; @@ -58,10 +59,15 @@ public abstract class ConfigurationEntryStoreTestCase extends QpidTestCase private Map<String, Object> _virtualHostAttributes; private Map<String, Object> _authenticationProviderAttributes; + private TaskExecutor _taskExecutor; + public void setUp() throws Exception { super.setUp(); + _taskExecutor = new TaskExecutor(); + _taskExecutor.start(); + _brokerId = UUID.randomUUID(); _brokerAttributes = new HashMap<String, Object>(); _brokerAttributes.put(Broker.DEFAULT_VIRTUAL_HOST, "test"); @@ -85,6 +91,18 @@ public abstract class ConfigurationEntryStoreTestCase extends QpidTestCase addConfiguration(_authenticationProviderId, AuthenticationProvider.class.getSimpleName(), _authenticationProviderAttributes); } + @Override + public void tearDown() throws Exception + { + super.tearDown(); + _taskExecutor.stop(); + } + + protected TaskExecutor getTaskExecutor() + { + return _taskExecutor; + } + // ??? perhaps it should not be abstract protected abstract MemoryConfigurationEntryStore createStore(UUID brokerId, Map<String, Object> brokerAttributes) throws Exception; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/JsonConfigurationEntryStoreTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/JsonConfigurationEntryStoreTest.java index e6d47e6966..d3dc996d3e 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/JsonConfigurationEntryStoreTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/JsonConfigurationEntryStoreTest.java @@ -42,11 +42,10 @@ import org.apache.qpid.server.BrokerOptions; import org.apache.qpid.server.configuration.ConfigurationEntry; import org.apache.qpid.server.configuration.ConfigurationEntryImpl; import org.apache.qpid.server.configuration.IllegalConfigurationException; -import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.LogRecorder; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.ConfiguredObjectFactory; +import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl; import org.apache.qpid.server.model.Model; import org.apache.qpid.server.model.PreferencesProvider; import org.apache.qpid.server.model.SystemContext; @@ -100,8 +99,8 @@ public class JsonConfigurationEntryStoreTest extends ConfigurationEntryStoreTest { final BrokerOptions brokerOptions = mock(BrokerOptions.class); when(brokerOptions.getConfigurationStoreLocation()).thenReturn(absolutePath); - SystemContext context = new SystemContextImpl(new TaskExecutor(), - new ConfiguredObjectFactory(Model.getInstance()), + SystemContext context = new SystemContextImpl(getTaskExecutor(), + new ConfiguredObjectFactoryImpl(Model.getInstance()), mock(EventLogger.class), mock(LogRecorder.class), brokerOptions); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java index 7859a4110b..50169d9f4f 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java @@ -44,7 +44,7 @@ import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.LogRecorder; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.ConfiguredObjectFactory; +import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl; import org.apache.qpid.server.model.Model; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Protocol; @@ -67,6 +67,7 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase private ConfiguredObjectRecord _portEntry; private UUID _rootId, _portEntryId; private SystemContext _systemContext; + private TaskExecutor _taskExecutor; protected void setUp() throws Exception { @@ -74,9 +75,10 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase _rootId = UUID.randomUUID(); _portEntryId = UUID.randomUUID(); _store = mock(DurableConfigurationStore.class); + _taskExecutor = new TaskExecutor(); + _taskExecutor.start(); - - _systemContext = new SystemContextImpl(new TaskExecutor(), new ConfiguredObjectFactory(Model.getInstance()), mock( + _systemContext = new SystemContextImpl(_taskExecutor, new ConfiguredObjectFactoryImpl(Model.getInstance()), mock( EventLogger.class), mock(LogRecorder.class), new BrokerOptions()); @@ -113,6 +115,13 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase _handler.openConfigurationStore(_systemContext,Collections.<String,Object>emptyMap()); } + @Override + public void tearDown() throws Exception + { + _taskExecutor.stop(); + super.tearDown(); + } + private ConfiguredObjectRecord getRootEntry() { BrokerFinder brokerFinder = new BrokerFinder(); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStoreTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStoreTest.java index bf8da6f364..92d42d779f 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStoreTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStoreTest.java @@ -36,11 +36,10 @@ import org.apache.qpid.server.BrokerOptions; import org.apache.qpid.server.configuration.ConfigurationEntry; import org.apache.qpid.server.configuration.ConfigurationEntryImpl; import org.apache.qpid.server.configuration.IllegalConfigurationException; -import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.LogRecorder; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.ConfiguredObjectFactory; +import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl; import org.apache.qpid.server.model.Model; import org.apache.qpid.server.model.SystemContext; import org.apache.qpid.server.model.SystemContextImpl; @@ -53,7 +52,7 @@ public class MemoryConfigurationEntryStoreTest extends ConfigurationEntryStoreTe public void setUp() throws Exception { super.setUp(); - _systemContext = new SystemContextImpl(new TaskExecutor(), new ConfiguredObjectFactory(Model.getInstance()), + _systemContext = new SystemContextImpl(getTaskExecutor(), new ConfiguredObjectFactoryImpl(Model.getInstance()), mock(EventLogger.class), mock(LogRecorder.class), new BrokerOptions()); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java index 7fd4f05a14..269d36e5c1 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java @@ -21,6 +21,9 @@ package org.apache.qpid.server.consumer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.net.SocketAddress; import java.security.Principal; import java.util.ArrayList; @@ -40,6 +43,7 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Consumer; import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.Session; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.protocol.AMQConnectionModel; @@ -229,6 +233,13 @@ public class MockConsumer implements ConsumerTarget private static class MockSessionModel implements AMQSessionModel { private final UUID _id = UUID.randomUUID(); + private Session _modelObject; + + private MockSessionModel() + { + _modelObject = mock(Session.class); + when(_modelObject.getCategoryClass()).thenReturn(Session.class); + } @Override public UUID getId() @@ -352,6 +363,18 @@ public class MockConsumer implements ConsumerTarget } @Override + public void setModelObject(final Session session) + { + _modelObject = session; + } + + @Override + public Session<?> getModelObject() + { + return _modelObject; + } + + @Override public void removeConsumerListener(final ConsumerListener listener) { diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/DefaultExchangeFactoryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/DefaultExchangeFactoryTest.java deleted file mode 100644 index 090e8eead3..0000000000 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/DefaultExchangeFactoryTest.java +++ /dev/null @@ -1,229 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.exchange; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; - -import org.apache.qpid.server.plugin.ExchangeType; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import org.apache.qpid.test.utils.QpidTestCase; - -@SuppressWarnings("rawtypes") -public class DefaultExchangeFactoryTest extends QpidTestCase -{ - private DirectExchangeType _directExchangeType; - private TopicExchangeType _topicExchangeType; - private FanoutExchangeType _fanoutExchangeType; - private HeadersExchangeType _headersExchangeType; - - private List<ExchangeType> _stubbedExchangeTypes; - - protected void setUp() throws Exception - { - super.setUp(); - - _directExchangeType = new DirectExchangeType(); - _topicExchangeType = new TopicExchangeType(); - _fanoutExchangeType = new FanoutExchangeType(); - _headersExchangeType = new HeadersExchangeType(); - _stubbedExchangeTypes = new ArrayList<ExchangeType>(); - } - - public void testCreateDefaultExchangeFactory() - { - _stubbedExchangeTypes.add(_directExchangeType); - _stubbedExchangeTypes.add(_topicExchangeType); - _stubbedExchangeTypes.add(_fanoutExchangeType); - _stubbedExchangeTypes.add(_headersExchangeType); - - DefaultExchangeFactory factory = new TestExchangeFactory(); - - Collection<ExchangeType<? extends ExchangeImpl>> registeredTypes = factory.getRegisteredTypes(); - assertEquals("Unexpected number of exchange types", _stubbedExchangeTypes.size(), registeredTypes.size()); - assertTrue("Direct exchange type is not found", registeredTypes.contains(_directExchangeType)); - assertTrue("Fanout exchange type is not found", registeredTypes.contains(_fanoutExchangeType)); - assertTrue("Topic exchange type is not found", registeredTypes.contains(_topicExchangeType)); - assertTrue("Headers exchange type is not found", registeredTypes.contains(_headersExchangeType)); - } - - public void testCreateDefaultExchangeFactoryWithoutAllBaseExchangeTypes() - { - try - { - new TestExchangeFactory(); - fail("Cannot create factory without all base classes"); - } - catch (IllegalStateException e) - { - // pass - } - } - - public void testCreateDefaultExchangeFactoryWithoutDirectExchangeType() - { - _stubbedExchangeTypes.add(_topicExchangeType); - _stubbedExchangeTypes.add(_fanoutExchangeType); - _stubbedExchangeTypes.add(_headersExchangeType); - - try - { - new TestExchangeFactory(); - fail("Cannot create factory without all base classes"); - } - catch (IllegalStateException e) - { - assertEquals("Unexpected exception message", "Did not find expected exchange type: " + _directExchangeType.getType(), e.getMessage()); - } - } - - public void testCreateDefaultExchangeFactoryWithoutTopicExchangeType() - { - _stubbedExchangeTypes.add(_directExchangeType); - _stubbedExchangeTypes.add(_fanoutExchangeType); - _stubbedExchangeTypes.add(_headersExchangeType); - - try - { - new TestExchangeFactory(); - fail("Cannot create factory without all base classes"); - } - catch (IllegalStateException e) - { - assertEquals("Unexpected exception message", "Did not find expected exchange type: " + _topicExchangeType.getType(), e.getMessage()); - } - } - - public void testCreateDefaultExchangeFactoryWithoutFanoutExchangeType() - { - _stubbedExchangeTypes.add(_directExchangeType); - _stubbedExchangeTypes.add(_topicExchangeType); - _stubbedExchangeTypes.add(_headersExchangeType); - - try - { - new TestExchangeFactory(); - fail("Cannot create factory without all base classes"); - } - catch (IllegalStateException e) - { - assertEquals("Unexpected exception message", "Did not find expected exchange type: " + _fanoutExchangeType.getType(), e.getMessage()); - } - } - - public void testCreateDefaultExchangeFactoryWithoutHeadersExchangeType() - { - _stubbedExchangeTypes.add(_directExchangeType); - _stubbedExchangeTypes.add(_topicExchangeType); - _stubbedExchangeTypes.add(_fanoutExchangeType); - - try - { - new TestExchangeFactory(); - fail("Cannot create factory without all base classes"); - } - catch (IllegalStateException e) - { - assertEquals("Unexpected exception message", "Did not find expected exchange type: " + _headersExchangeType.getType(), e.getMessage()); - } - } - - public void testCreateDefaultExchangeFactoryWithDuplicateExchangeTypeName() - { - _stubbedExchangeTypes.add(_directExchangeType); - _stubbedExchangeTypes.add(_directExchangeType); - - try - { - new TestExchangeFactory(); - fail("Cannot create factory with duplicate exchange type names"); - } - catch (IllegalStateException e) - { - assertTrue( "Unexpected exception message", e.getMessage().contains("ExchangeType with type name '" - + _directExchangeType.getType() + "' is already registered using class '" - + DirectExchangeType.class.getName())); - } - } - - public void testCreateDefaultExchangeFactoryWithCustomExchangeType() - { - ExchangeType<?> customExchangeType = new CustomExchangeType(); - - _stubbedExchangeTypes.add(customExchangeType); - _stubbedExchangeTypes.add(_directExchangeType); - _stubbedExchangeTypes.add(_topicExchangeType); - _stubbedExchangeTypes.add(_fanoutExchangeType); - _stubbedExchangeTypes.add(_headersExchangeType); - - DefaultExchangeFactory factory = new TestExchangeFactory(); - - Collection<ExchangeType<? extends ExchangeImpl>> registeredTypes = factory.getRegisteredTypes(); - assertEquals("Unexpected number of exchange types", _stubbedExchangeTypes.size(), registeredTypes.size()); - assertTrue("Direct exchange type is not found", registeredTypes.contains(_directExchangeType)); - assertTrue("Fanout exchange type is not found", registeredTypes.contains(_fanoutExchangeType)); - assertTrue("Topic exchange type is not found", registeredTypes.contains(_topicExchangeType)); - assertTrue("Headers exchange type is not found", registeredTypes.contains(_headersExchangeType)); - assertTrue("Custom exchange type is not found", registeredTypes.contains(customExchangeType)); - } - - public static abstract class CustomExchange implements ExchangeImpl<CustomExchange> - { - } - - private static class CustomExchangeType implements ExchangeType<CustomExchange> - { - @Override - public String getType() - { - return "my-custom-exchange"; - } - - @Override - public CustomExchange newInstance(VirtualHostImpl host, Map<String,Object> attributes) - { - return null; - } - - @Override - public String getDefaultExchangeName() - { - return null; - } - } - - private final class TestExchangeFactory extends DefaultExchangeFactory - { - private TestExchangeFactory() - { - super(null); - } - - @Override - protected Iterable<ExchangeType> loadExchangeTypes() - { - return _stubbedExchangeTypes; - } - } - -} diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java index ddc9f5edf8..33c333c407 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java @@ -30,27 +30,30 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; + import junit.framework.TestCase; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.virtualhost.UnknownExchangeException; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - public class FanoutExchangeTest extends TestCase { private FanoutExchange _exchange; private VirtualHostImpl _virtualHost; + private TaskExecutor _taskExecutor; public void setUp() throws UnknownExchangeException { @@ -59,11 +62,21 @@ public class FanoutExchangeTest extends TestCase attributes.put(Exchange.NAME, "test"); attributes.put(Exchange.DURABLE, false); + _taskExecutor = new TaskExecutor(); + _taskExecutor.start(); _virtualHost = mock(VirtualHostImpl.class); SecurityManager securityManager = mock(SecurityManager.class); when(_virtualHost.getSecurityManager()).thenReturn(securityManager); when(_virtualHost.getEventLogger()).thenReturn(new EventLogger()); + when(_virtualHost.getTaskExecutor()).thenReturn(_taskExecutor); _exchange = new FanoutExchange(_virtualHost, attributes); + _exchange.open(); + } + + public void tearDown() throws Exception + { + super.tearDown(); + _taskExecutor.stop(); } public void testIsBoundStringMapAMQQueueWhenQueueIsNull() @@ -115,6 +128,7 @@ public class FanoutExchangeTest extends TestCase { AMQQueue queue = mock(AMQQueue.class); when(queue.getVirtualHost()).thenReturn(_virtualHost); + when(queue.getCategoryClass()).thenReturn(Queue.class); return queue; } @@ -123,6 +137,7 @@ public class FanoutExchangeTest extends TestCase AMQQueue queue1 = mockQueue(); AMQQueue queue2 = mockQueue(); + _exchange.addBinding("key",queue1, null); _exchange.addBinding("key",queue2, null); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java index 34c1487861..a450c942e6 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java @@ -20,6 +20,11 @@ */ package org.apache.qpid.server.exchange; +import static org.mockito.Matchers.anySet; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -28,40 +33,44 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; + import junit.framework.TestCase; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import static org.mockito.Matchers.anySet; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - public class HeadersExchangeTest extends TestCase { private HeadersExchange _exchange; private VirtualHostImpl _virtualHost; + private TaskExecutor _taskExecutor; @Override public void setUp() throws Exception { super.setUp(); + _taskExecutor = new TaskExecutor(); + _taskExecutor.start(); _virtualHost = mock(VirtualHostImpl.class); SecurityManager securityManager = mock(SecurityManager.class); when(_virtualHost.getSecurityManager()).thenReturn(securityManager); when(_virtualHost.getEventLogger()).thenReturn(new EventLogger()); + when(_virtualHost.getCategoryClass()).thenReturn(VirtualHost.class); + when(_virtualHost.getTaskExecutor()).thenReturn(_taskExecutor); Map<String,Object> attributes = new HashMap<String, Object>(); attributes.put(Exchange.ID, UUID.randomUUID()); attributes.put(Exchange.NAME, "test"); @@ -71,6 +80,12 @@ public class HeadersExchangeTest extends TestCase } + public void tearDown() throws Exception + { + super.tearDown(); + _taskExecutor.stop(); + } + protected void routeAndTest(ServerMessage msg, AMQQueue... expected) throws Exception { List<? extends BaseQueue> results = _exchange.route(msg, "", InstanceProperties.EMPTY); @@ -127,6 +142,7 @@ public class HeadersExchangeTest extends TestCase AMQQueue q = mock(AMQQueue.class); when(q.toString()).thenReturn(name); when(q.getVirtualHost()).thenReturn(_virtualHost); + when(q.getCategoryClass()).thenReturn(Queue.class); return q; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java index 8b3a5336e8..ed2de21b6b 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java @@ -39,7 +39,6 @@ import org.mockito.stubbing.Answer; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.server.configuration.BrokerProperties; -import org.apache.qpid.server.exchange.DefaultExchangeFactory; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.model.LifetimePolicy; @@ -237,7 +236,7 @@ public class AMQQueueFactoryTest extends QpidTestCase { String queueName = "testDeadLetterQueueEnabled"; - String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX; + String dlExchangeName = queueName + VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX; String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX; assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName)); @@ -277,7 +276,7 @@ public class AMQQueueFactoryTest extends QpidTestCase { String queueName = "testDeadLetterQueueEnabled"; - String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX; + String dlExchangeName = queueName + VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX; String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX; assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName)); @@ -320,7 +319,7 @@ public class AMQQueueFactoryTest extends QpidTestCase String queueName = "testDeadLetterQueueDisabled"; - String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX; + String dlExchangeName = queueName + VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX; String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX; assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName)); @@ -350,7 +349,7 @@ public class AMQQueueFactoryTest extends QpidTestCase { String queueName = "testDeadLetterQueueNotCreatedForAutodeleteQueues"; - String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX; + String dlExchangeName = queueName + VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX; String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX; assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName)); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java index e38e4daea0..37519e7a0b 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java @@ -44,6 +44,7 @@ public class StandardQueueTest extends AbstractQueueTestBase public void testAutoDeleteQueue() throws Exception { getQueue().stop(); + getQueue().delete(); Map<String,Object> queueAttributes = new HashMap<String, Object>(); queueAttributes.put(Queue.ID, UUID.randomUUID()); queueAttributes.put(Queue.NAME, getQname()); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/group/FileGroupManagerFactoryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/group/FileGroupManagerFactoryTest.java deleted file mode 100644 index 90308d316b..0000000000 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/group/FileGroupManagerFactoryTest.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.server.security.group; - -import java.util.HashMap; -import java.util.Map; - -import junit.framework.TestCase; - -import org.apache.qpid.server.model.GroupProvider; -import org.apache.qpid.test.utils.TestFileUtils; - -public class FileGroupManagerFactoryTest extends TestCase -{ - - private FileGroupManagerFactory _factory = new FileGroupManagerFactory(); - private Map<String, Object> _configuration = new HashMap<String, Object>(); - private String _emptyButValidGroupFile = TestFileUtils.createTempFile(this).getAbsolutePath(); - - public void testInstanceCreated() throws Exception - { - _configuration.put(GroupProvider.TYPE, FileGroupManagerFactory.GROUP_FILE_PROVIDER_TYPE); - _configuration.put(FileGroupManagerFactory.PATH, _emptyButValidGroupFile); - - GroupManager manager = _factory.createInstance(_configuration); - assertNotNull(manager); - assertTrue(manager instanceof FileGroupManager); - } - - public void testReturnsNullWhenNoConfig() throws Exception - { - GroupManager manager = _factory.createInstance(_configuration); - assertNull(manager); - } - - public void testReturnsNullWhenConfigNotForThisPlugin() throws Exception - { - _configuration.put(GroupProvider.TYPE, "other-group-manager"); - - GroupManager manager = _factory.createInstance(_configuration); - assertNull(manager); - } - - - public void testRejectsConfigThatIsMissingAttributeValue() throws Exception - { - _configuration.put(GroupProvider.TYPE, FileGroupManagerFactory.GROUP_FILE_PROVIDER_TYPE); - _configuration.put(FileGroupManagerFactory.PATH, null); - - try - { - _factory.createInstance(_configuration); - fail("Exception not thrown"); - } - catch (RuntimeException re) - { - // PASS - } - } - -} diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/group/FileGroupManagerTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/group/FileGroupManagerTest.java deleted file mode 100644 index 152703d548..0000000000 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/group/FileGroupManagerTest.java +++ /dev/null @@ -1,204 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.security.group; - -import java.io.File; -import java.io.FileOutputStream; -import java.security.Principal; -import java.util.Properties; -import java.util.Set; - -import org.apache.qpid.server.security.auth.UsernamePrincipal; -import org.apache.qpid.test.utils.QpidTestCase; - -public class FileGroupManagerTest extends QpidTestCase -{ - private static final String MYGROUP_USERS = "user1"; - private static final String MY_GROUP = "myGroup.users"; - private static final String MY_GROUP2 = "myGroup2.users"; - private File _tmpGroupFile; - private FileGroupManager _manager; - - @Override - public void tearDown() throws Exception - { - super.tearDown(); - - if (_tmpGroupFile != null) - { - if (_tmpGroupFile.exists()) - { - _tmpGroupFile.delete(); - } - } - } - - public void testValidGroupFile() throws Exception - { - final String groupFileName = writeGroupFile(); - - _manager = new FileGroupManager(groupFileName); - assertNotNull(_manager); - } - - public void testNonExistentGroupFile() throws Exception - { - final String filePath = TMP_FOLDER + File.separator + "non.existing"; - File file = new File(filePath); - if (file.exists()) - { - file.delete(); - } - assertFalse("File should not exist", file.exists()); - try - { - _manager = new FileGroupManager(filePath); - assertFalse("File should be created", file.exists()); - _manager.onCreate(); - assertTrue("File should be created", file.exists()); - _manager.open(); - Set<Principal> groups = _manager.getGroupPrincipals(); - assertTrue("No group should exist", groups.isEmpty()); - } - finally - { - file.delete(); - } - } - - public void testGetGroupPrincipalsForUser() throws Exception - { - final String groupFileName = writeGroupFile(); - _manager = new FileGroupManager(groupFileName); - _manager.open(); - Set<Principal> principals = _manager.getGroupPrincipalsForUser("user1"); - assertEquals(1, principals.size()); - assertTrue(principals.contains(new GroupPrincipal("myGroup"))); - } - - public void testGetUserPrincipalsForGroup() throws Exception - { - final String groupFileName = writeGroupFile(); - _manager = new FileGroupManager(groupFileName); - _manager.open(); - Set<Principal> principals = _manager.getUserPrincipalsForGroup("myGroup"); - assertEquals(1, principals.size()); - assertTrue(principals.contains(new UsernamePrincipal("user1"))); - } - - public void testGetGroupPrincipals() throws Exception - { - final String groupFileName = writeGroupFile(MY_GROUP, MYGROUP_USERS, MY_GROUP2, MYGROUP_USERS); - _manager = new FileGroupManager(groupFileName); - _manager.open(); - Set<Principal> principals = _manager.getGroupPrincipals(); - assertEquals(2, principals.size()); - assertTrue(principals.contains(new GroupPrincipal("myGroup"))); - assertTrue(principals.contains(new GroupPrincipal("myGroup2"))); - } - - public void testCreateGroup() throws Exception - { - final String groupFileName = writeGroupFile(); - _manager = new FileGroupManager(groupFileName); - _manager.open(); - Set<Principal> principals = _manager.getGroupPrincipals(); - assertEquals(1, principals.size()); - - _manager.createGroup("myGroup2"); - - principals = _manager.getGroupPrincipals(); - assertEquals(2, principals.size()); - assertTrue(principals.contains(new GroupPrincipal("myGroup2"))); - } - - public void testRemoveGroup() throws Exception - { - final String groupFileName = writeGroupFile(MY_GROUP, MYGROUP_USERS); - _manager = new FileGroupManager(groupFileName); - _manager.open(); - Set<Principal> principals = _manager.getGroupPrincipals(); - assertEquals(1, principals.size()); - - _manager.removeGroup("myGroup"); - - principals = _manager.getGroupPrincipals(); - assertEquals(0, principals.size()); - } - - public void testAddUserToGroup() throws Exception - { - final String groupFileName = writeGroupFile(MY_GROUP, MYGROUP_USERS); - _manager = new FileGroupManager(groupFileName); - _manager.open(); - Set<Principal> principals = _manager.getUserPrincipalsForGroup("myGroup"); - assertEquals(1, principals.size()); - assertFalse(principals.contains(new UsernamePrincipal("user2"))); - - _manager.addUserToGroup("user2", "myGroup"); - - principals = _manager.getUserPrincipalsForGroup("myGroup"); - assertEquals(2, principals.size()); - assertTrue(principals.contains(new UsernamePrincipal("user2"))); - } - - public void testRemoveUserInGroup() throws Exception - { - final String groupFileName = writeGroupFile(MY_GROUP, MYGROUP_USERS); - _manager = new FileGroupManager(groupFileName); - _manager.open(); - Set<Principal> principals = _manager.getUserPrincipalsForGroup("myGroup"); - assertEquals(1, principals.size()); - assertTrue(principals.contains(new UsernamePrincipal("user1"))); - - _manager.removeUserFromGroup("user1", "myGroup"); - - principals = _manager.getUserPrincipalsForGroup("myGroup"); - assertEquals(0, principals.size()); - } - - private String writeGroupFile() throws Exception - { - return writeGroupFile(MY_GROUP, MYGROUP_USERS); - } - - private String writeGroupFile(String... groupAndUsers) throws Exception - { - if (groupAndUsers.length % 2 != 0) - { - throw new IllegalArgumentException("Number of groupAndUsers must be even"); - } - - _tmpGroupFile = File.createTempFile("groups", "grp"); - _tmpGroupFile.deleteOnExit(); - - Properties props = new Properties(); - for (int i = 0 ; i < groupAndUsers.length; i=i+2) - { - String group = groupAndUsers[i]; - String users = groupAndUsers[i+1]; - props.put(group, users); - } - - props.store(new FileOutputStream(_tmpGroupFile), "test group file"); - - return _tmpGroupFile.getCanonicalPath(); - } -} diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java index 1113f8699b..01f4ed4299 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java @@ -21,24 +21,33 @@ package org.apache.qpid.server.util; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.net.SocketAddress; +import java.security.PrivilegedAction; import java.util.HashMap; import java.util.Map; import java.util.UUID; +import javax.security.auth.Subject; + +import org.apache.qpid.server.BrokerOptions; import org.apache.qpid.server.configuration.store.JsonConfigurationEntryStore; import org.apache.qpid.server.configuration.updater.TaskExecutor; -import org.apache.qpid.server.exchange.DefaultExchangeFactory; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.logging.EventLogger; +import org.apache.qpid.server.logging.LogRecorder; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObjectFactory; +import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl; +import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.Model; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.SystemContext; +import org.apache.qpid.server.model.SystemContextImpl; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory; import org.apache.qpid.server.protocol.AMQConnectionModel; @@ -46,6 +55,7 @@ import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.SubjectCreator; +import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TestMemoryMessageStore; import org.apache.qpid.server.virtualhost.AbstractVirtualHost; @@ -68,6 +78,7 @@ public class BrokerTestHelper public static Broker createBrokerMock() { + ConfiguredObjectFactory objectFactory = new ConfiguredObjectFactoryImpl(Model.getInstance()); SubjectCreator subjectCreator = mock(SubjectCreator.class); when(subjectCreator.getMechanisms()).thenReturn(""); Broker broker = mock(Broker.class); @@ -77,7 +88,9 @@ public class BrokerTestHelper when(broker.getSubjectCreator(any(SocketAddress.class))).thenReturn(subjectCreator); when(broker.getVirtualHostRegistry()).thenReturn(new VirtualHostRegistry(new EventLogger())); when(broker.getSecurityManager()).thenReturn(new SecurityManager(mock(Broker.class), false)); + when(broker.getObjectFactory()).thenReturn(objectFactory); when(broker.getEventLogger()).thenReturn(new EventLogger()); + when(broker.getCategoryClass()).thenReturn(Broker.class); return broker; } @@ -94,14 +107,20 @@ public class BrokerTestHelper { //VirtualHostFactory factory = new PluggableFactoryLoader<VirtualHostFactory>(VirtualHostFactory.class).get(hostType); - + SystemContext systemContext = new SystemContextImpl(TASK_EXECUTOR, + new ConfiguredObjectFactoryImpl(Model.getInstance()), + mock(EventLogger.class), + mock(LogRecorder.class), + new BrokerOptions()); + ConfiguredObjectFactory objectFactory = new ConfiguredObjectFactoryImpl(Model.getInstance()); Broker broker = mock(Broker.class); + when(broker.getParent(eq(SystemContext.class))).thenReturn(systemContext); when(broker.getVirtualHostRegistry()).thenReturn(virtualHostRegistry); when(broker.getTaskExecutor()).thenReturn(TASK_EXECUTOR); SecurityManager securityManager = new SecurityManager(broker, false); when(broker.getSecurityManager()).thenReturn(securityManager); - - ConfiguredObjectFactory objectFactory = new ConfiguredObjectFactory(Model.getInstance()); + when(broker.getCategoryClass()).thenReturn(Broker.class); + when(broker.getObjectFactory()).thenReturn(objectFactory); ConfiguredObjectTypeFactory factory = objectFactory.getConfiguredObjectTypeFactory(org.apache.qpid.server.model.VirtualHost.class, attributes); @@ -165,17 +184,29 @@ public class BrokerTestHelper public static ExchangeImpl createExchange(String hostName, final boolean durable, final EventLogger eventLogger) throws Exception { SecurityManager securityManager = new SecurityManager(mock(Broker.class), false); - VirtualHostImpl virtualHost = mock(VirtualHostImpl.class); + final VirtualHostImpl virtualHost = mock(VirtualHostImpl.class); when(virtualHost.getName()).thenReturn(hostName); when(virtualHost.getSecurityManager()).thenReturn(securityManager); when(virtualHost.getEventLogger()).thenReturn(eventLogger); - DefaultExchangeFactory factory = new DefaultExchangeFactory(virtualHost); - Map<String,Object> attributes = new HashMap<String, Object>(); + when(virtualHost.getDurableConfigurationStore()).thenReturn(mock(DurableConfigurationStore.class)); + ConfiguredObjectFactory objectFactory = new ConfiguredObjectFactoryImpl(Model.getInstance()); + final Map<String,Object> attributes = new HashMap<String, Object>(); attributes.put(org.apache.qpid.server.model.Exchange.ID, UUIDGenerator.generateExchangeUUID("amp.direct", virtualHost.getName())); attributes.put(org.apache.qpid.server.model.Exchange.NAME, "amq.direct"); attributes.put(org.apache.qpid.server.model.Exchange.TYPE, "direct"); attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, durable); - return factory.createExchange(attributes); + final ConfiguredObjectTypeFactory<? extends Exchange> exchangeFactory = + objectFactory.getConfiguredObjectTypeFactory(Exchange.class, attributes); + return Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), new PrivilegedAction<ExchangeImpl>() + { + @Override + public ExchangeImpl run() + { + + return (ExchangeImpl) exchangeFactory.create(attributes, virtualHost); + } + }); + } public static AMQQueue createQueue(String queueName, VirtualHostImpl virtualHost) diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java index e6b57d8039..8cbf999dfb 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java @@ -20,6 +20,15 @@ */ package org.apache.qpid.server.virtualhost; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyMap; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -28,43 +37,39 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.UUID; -import org.apache.qpid.server.exchange.ExchangeImpl; -import org.apache.qpid.server.logging.EventLogger; -import org.apache.qpid.server.store.ConfiguredObjectRecordImpl; -import org.apache.qpid.server.store.StoreException; +import org.mockito.ArgumentCaptor; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + import org.apache.qpid.server.configuration.IllegalConfigurationException; -import org.apache.qpid.server.exchange.AMQUnknownExchangeType; import org.apache.qpid.server.exchange.DirectExchange; -import org.apache.qpid.server.exchange.ExchangeFactory; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.exchange.FanoutExchange; +import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.exchange.HeadersExchange; import org.apache.qpid.server.exchange.TopicExchange; +import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.model.Binding; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.ConfiguredObjectFactory; +import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.Model; import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.SystemContext; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueFactory; import org.apache.qpid.server.store.ConfiguredObjectRecord; +import org.apache.qpid.server.store.ConfiguredObjectRecordImpl; import org.apache.qpid.server.store.DurableConfigurationRecoverer; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer; +import org.apache.qpid.server.store.StoreException; +import org.apache.qpid.server.store.UnresolvedConfiguredObject; import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.test.utils.QpidTestCase; -import org.mockito.ArgumentCaptor; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyBoolean; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; public class DurableConfigurationRecovererTest extends QpidTestCase { @@ -76,50 +81,55 @@ public class DurableConfigurationRecovererTest extends QpidTestCase private static final String CUSTOM_EXCHANGE_NAME = "customExchange"; private DurableConfigurationRecoverer _durableConfigurationRecoverer; - private ExchangeImpl<?> _directExchange; - private ExchangeImpl<?> _topicExchange; - private ExchangeImpl<?> _matchExchange; - private ExchangeImpl<?> _fanoutExchange; private VirtualHostImpl _vhost; private DurableConfigurationStore _store; - private ExchangeFactory _exchangeFactory; - private ExchangeRegistry _exchangeRegistry; private QueueFactory _queueFactory; + private ConfiguredObjectFactory _configuredObjectFactory; + private ConfiguredObjectTypeFactory _exchangeFactory; @Override public void setUp() throws Exception { super.setUp(); - - _exchangeFactory = mock(ExchangeFactory.class); - - _directExchange = createAndRegisterDefaultExchangeWithFactory(DirectExchange.TYPE); - _topicExchange = createAndRegisterDefaultExchangeWithFactory(TopicExchange.TYPE); - _matchExchange = createAndRegisterDefaultExchangeWithFactory(HeadersExchange.TYPE); - _fanoutExchange = createAndRegisterDefaultExchangeWithFactory(FanoutExchange.TYPE); + _configuredObjectFactory = mock(ConfiguredObjectFactory.class); + _exchangeFactory = mock(ConfiguredObjectTypeFactory.class); AMQQueue<?> queue = mock(AMQQueue.class); _vhost = mock(VirtualHostImpl.class); when(_vhost.getName()).thenReturn(VIRTUAL_HOST_NAME); - - _exchangeRegistry = mock(ExchangeRegistry.class); + final Broker<?> broker = mock(Broker.class); + final SystemContext systemContext = mock(SystemContext.class); + when(systemContext.getObjectFactory()).thenReturn(_configuredObjectFactory); + when(broker.getObjectFactory()).thenReturn(_configuredObjectFactory); + when(broker.getParent(eq(SystemContext.class))).thenReturn(systemContext); + when(_vhost.getParent(eq(Broker.class))).thenReturn(broker); when(_vhost.getQueue(eq(QUEUE_ID))).thenReturn(queue); - final ArgumentCaptor<ExchangeImpl> registeredExchange = ArgumentCaptor.forClass(ExchangeImpl.class); + when(_configuredObjectFactory.getConfiguredObjectTypeFactory(eq(Exchange.class), anyMap())).thenReturn(_exchangeFactory); + + final ArgumentCaptor<ConfiguredObjectRecord> recoveredExchange = ArgumentCaptor.forClass(ConfiguredObjectRecord.class); doAnswer(new Answer() { @Override public Object answer(final InvocationOnMock invocation) throws Throwable { - ExchangeImpl exchange = registeredExchange.getValue(); - when(_exchangeRegistry.getExchange(eq(exchange.getId()))).thenReturn(exchange); - when(_exchangeRegistry.getExchange(eq(exchange.getName()))).thenReturn(exchange); - return null; + ConfiguredObjectRecord exchangeRecord = recoveredExchange.getValue(); + ExchangeImpl exchange = mock(ExchangeImpl.class); + UUID id = exchangeRecord.getId(); + String name = (String) exchangeRecord.getAttributes().get("name"); + when(exchange.getId()).thenReturn(id); + when(exchange.getName()).thenReturn(name); + when(_vhost.getExchange(eq(id))).thenReturn(exchange); + when(_vhost.getExchange(eq(name))).thenReturn(exchange); + + UnresolvedConfiguredObject unresolved = mock(UnresolvedConfiguredObject.class); + when(unresolved.resolve()).thenReturn(exchange); + return unresolved; } - }).when(_exchangeRegistry).registerExchange(registeredExchange.capture()); + }).when(_exchangeFactory).recover(recoveredExchange.capture(), any(ConfiguredObject.class)); @@ -164,7 +174,7 @@ public class DurableConfigurationRecovererTest extends QpidTestCase { final UUID exchangeId = UUID.fromString(args.get(Queue.ALTERNATE_EXCHANGE).toString()); final ExchangeImpl exchange = - (ExchangeImpl) _exchangeRegistry.getExchange(exchangeId); + (ExchangeImpl) _vhost.getExchange(exchangeId); queue.setAlternateExchange(exchange); } return queue; @@ -174,9 +184,9 @@ public class DurableConfigurationRecovererTest extends QpidTestCase DurableConfiguredObjectRecoverer[] recoverers = { - new QueueRecoverer(_vhost, _exchangeRegistry, _queueFactory), - new ExchangeRecoverer(_exchangeRegistry, _exchangeFactory), - new BindingRecoverer(_vhost, _exchangeRegistry) + new QueueRecoverer(_vhost, _queueFactory), + new ExchangeRecoverer(_vhost), + new BindingRecoverer(_vhost) }; final Map<String, DurableConfiguredObjectRecoverer> recovererMap= new HashMap<String, DurableConfiguredObjectRecoverer>(); @@ -192,19 +202,6 @@ public class DurableConfigurationRecovererTest extends QpidTestCase } - private ExchangeImpl<?> createAndRegisterDefaultExchangeWithFactory(ExchangeType<?> exchangeType) throws AMQUnknownExchangeType, UnknownExchangeException - { - ExchangeImpl exchange = mock(ExchangeImpl.class); - when(exchange.getExchangeType()).thenReturn(exchangeType); - Map<String, Object> directExchangeAttrsWithId = new HashMap<String, Object>(); - directExchangeAttrsWithId.put(org.apache.qpid.server.model.Exchange.ID, UUIDGenerator.generateExchangeUUID(exchangeType.getDefaultExchangeName(), VIRTUAL_HOST_NAME)); - directExchangeAttrsWithId.put(org.apache.qpid.server.model.Exchange.DURABLE, true); - directExchangeAttrsWithId.put(org.apache.qpid.server.model.Exchange.TYPE, exchangeType.getType()); - directExchangeAttrsWithId.put(org.apache.qpid.server.model.Exchange.NAME, exchangeType.getDefaultExchangeName()); - when(_exchangeFactory.restoreExchange(directExchangeAttrsWithId)).thenReturn(exchange); - return exchange; - } - public void testUpgradeEmptyStore() throws Exception { _durableConfigurationRecoverer.beginConfigurationRecovery(_store); @@ -295,47 +292,6 @@ public class DurableConfigurationRecovererTest extends QpidTestCase "org.apache.qpid.server.model.Exchange", createExchange(CUSTOM_EXCHANGE_NAME, HeadersExchange.TYPE))); - final ExchangeImpl customExchange = mock(ExchangeImpl.class); - - final ArgumentCaptor<Map> attributesCaptor = ArgumentCaptor.forClass(Map.class); - when(_exchangeFactory.restoreExchange(attributesCaptor.capture())).thenAnswer(new Answer<ExchangeImpl>() - { - @Override - public ExchangeImpl answer(final InvocationOnMock invocation) throws Throwable - { - Map arguments = attributesCaptor.getValue(); - String exchangeName = (String) arguments.get(org.apache.qpid.server.model.Exchange.NAME); - if(CUSTOM_EXCHANGE_NAME.equals(exchangeName) - && HeadersExchange.TYPE.getType().equals(arguments.get(org.apache.qpid.server.model.Exchange.TYPE)) - && customExchangeId.equals((UUID) arguments.get(org.apache.qpid.server.model.Exchange.ID))) - { - return customExchange; - } - else if ("amq.topic".equals(exchangeName)) - { - return _topicExchange; - } - else if ("amq.direct".equals(exchangeName)) - { - return _directExchange; - } - else if ("amq.fanout".equals(exchangeName)) - { - return _fanoutExchange; - } - else if ("amq.match".equals(exchangeName)) - { - return _matchExchange; - } - else - { - return null; - } - } - }); - - - final ConfiguredObjectRecord[] expected = { new ConfiguredObjectRecordImpl(new UUID(1, 0), "org.apache.qpid.server.model.Binding", createBinding("key", "not-a-selector", "moo")), @@ -443,49 +399,6 @@ public class DurableConfigurationRecovererTest extends QpidTestCase final UUID queueId = new UUID(1, 0); final UUID exchangeId = new UUID(2, 0); - final ExchangeImpl customExchange = mock(ExchangeImpl.class); - - when(customExchange.getId()).thenReturn(exchangeId); - when(customExchange.getName()).thenReturn(CUSTOM_EXCHANGE_NAME); - - final ArgumentCaptor<Map> attributesCaptor = ArgumentCaptor.forClass(Map.class); - - when(_exchangeFactory.restoreExchange(attributesCaptor.capture())).thenAnswer(new Answer<ExchangeImpl>() - { - @Override - public ExchangeImpl answer(final InvocationOnMock invocation) throws Throwable - { - Map arguments = attributesCaptor.getValue(); - String exchangeName = (String) arguments.get(org.apache.qpid.server.model.Exchange.NAME); - if(CUSTOM_EXCHANGE_NAME.equals(exchangeName) - && HeadersExchange.TYPE.getType().equals(arguments.get(org.apache.qpid.server.model.Exchange.TYPE)) - && exchangeId.equals(arguments.get(org.apache.qpid.server.model.Exchange.ID))) - { - return customExchange; - } - else if ("amq.topic".equals(exchangeName)) - { - return _topicExchange; - } - else if ("amq.direct".equals(exchangeName)) - { - return _directExchange; - } - else if ("amq.fanout".equals(exchangeName)) - { - return _fanoutExchange; - } - else if ("amq.match".equals(exchangeName)) - { - return _matchExchange; - } - else - { - return null; - } - } - }); - _durableConfigurationRecoverer.beginConfigurationRecovery(_store); _durableConfigurationRecoverer.configuredObject(new ConfiguredObjectRecordImpl(queueId, Queue.class.getSimpleName(), @@ -496,7 +409,7 @@ public class DurableConfigurationRecovererTest extends QpidTestCase _durableConfigurationRecoverer.completeConfigurationRecovery(); - assertEquals(customExchange, _vhost.getQueue(queueId).getAlternateExchange()); + assertEquals(CUSTOM_EXCHANGE_NAME, _vhost.getQueue(queueId).getAlternateExchange().getName()); } private void verifyCorrectUpdates(final ConfiguredObjectRecord[] expected) throws StoreException @@ -578,4 +491,5 @@ public class DurableConfigurationRecovererTest extends QpidTestCase } + } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java index 37b464be40..85eede527a 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java @@ -35,12 +35,12 @@ import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.model.ConfigurationChangeListener; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.ConfiguredObjectAttribute; +import org.apache.qpid.server.model.ConfiguredObjectFactory; import org.apache.qpid.server.model.Connection; import org.apache.qpid.server.model.IllegalStateTransitionException; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.VirtualHostAlias; -import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; @@ -433,20 +433,15 @@ public class MockVirtualHost implements VirtualHostImpl<MockVirtualHost, AMQQueu return null; } - @Override - public Collection<ExchangeType<? extends ExchangeImpl>> getExchangeTypes() - { - return null; - } - public SecurityManager getSecurityManager() { return null; } @Override - public void addVirtualHostListener(VirtualHostListener listener) + public ConfiguredObjectFactory getObjectFactory() { + return null; } public LinkRegistry getLinkRegistry(String remoteContainerId) @@ -535,6 +530,18 @@ public class MockVirtualHost implements VirtualHostImpl<MockVirtualHost, AMQQueu } @Override + public <C extends ConfiguredObject> C getChildById(final Class<C> clazz, final UUID id) + { + return null; + } + + @Override + public <C extends ConfiguredObject> C getChildByName(final Class<C> clazz, final String name) + { + return null; + } + + @Override public <C extends ConfiguredObject> C createChild(final Class<C> childClass, final Map<String, Object> attributes, final ConfiguredObject... otherParents) diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index 718a1d0b9b..275ab4416e 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -20,6 +20,10 @@ */ package org.apache.qpid.server.protocol.v0_10; +import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT; +import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT; +import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT; + import java.net.SocketAddress; import java.security.Principal; import java.security.PrivilegedAction; @@ -29,7 +33,9 @@ import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; + import javax.security.auth.Subject; + import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.connection.ConnectionPrincipal; import org.apache.qpid.server.logging.EventLogger; @@ -55,10 +61,6 @@ import org.apache.qpid.transport.Method; import org.apache.qpid.transport.ProtocolEvent; import org.apache.qpid.transport.Session; -import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT; -import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT; -import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT; - public class ServerConnection extends Connection implements AMQConnectionModel<ServerConnection, ServerSession>, LogSubject, AuthorizationHolder { diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java index 0dc1a822e9..c2ef68d812 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java @@ -299,6 +299,7 @@ public class ServerConnectionDelegate extends ServerDelegate stopAllSubscriptions(conn, dtc); Session ssn = conn.getSession(dtc.getChannel()); ((ServerSession)ssn).setClose(true); + ((ServerSession)ssn).getModelObject().delete(); super.sessionDetach(conn, dtc); } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index 07345e7f0a..187b2bf569 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -130,6 +130,7 @@ public class ServerSession extends Session private final CheckCapacityAction _checkCapacityAction = new CheckCapacityAction(); private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>(); private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener(); + private org.apache.qpid.server.model.Session<?> _modelObject; public static interface MessageDispositionChangeListener @@ -859,6 +860,10 @@ public class ServerSession extends Session // unregister subscriptions in order to prevent sending of new messages // to subscriptions with closing session unregisterSubscriptions(); + if(_modelObject != null) + { + _modelObject.delete(); + } super.close(); } @@ -1003,6 +1008,18 @@ public class ServerSession extends Session _consumerListeners.remove(listener); } + @Override + public void setModelObject(final org.apache.qpid.server.model.Session<?> session) + { + _modelObject = session; + } + + @Override + public org.apache.qpid.server.model.Session<?> getModelObject() + { + return _modelObject; + } + private void consumerAdded(Consumer<?> consumer) { for(ConsumerListener l : _consumerListeners) diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 4fa4dcaa11..200519a285 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -20,20 +20,22 @@ */ package org.apache.qpid.server.protocol.v0_10; +import java.nio.ByteBuffer; import java.security.AccessControlException; +import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; import java.util.UUID; + import org.apache.log4j.Logger; import org.apache.qpid.server.consumer.ConsumerImpl; +import org.apache.qpid.server.exchange.AMQUnknownExchangeType; import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.exchange.ExchangeImpl; -import org.apache.qpid.server.model.ExclusivityPolicy; -import org.apache.qpid.server.model.LifetimePolicy; -import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.exchange.AMQUnknownExchangeType; import org.apache.qpid.server.exchange.HeadersExchange; import org.apache.qpid.server.filter.AMQInvalidArgumentException; import org.apache.qpid.server.filter.FilterManager; @@ -43,13 +45,15 @@ import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.MessageSource; +import org.apache.qpid.server.model.ExclusivityPolicy; +import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.txn.AlreadyKnownDtxException; @@ -63,14 +67,15 @@ import org.apache.qpid.server.txn.SuspendAndFailDtxException; import org.apache.qpid.server.txn.TimeoutDtxException; import org.apache.qpid.server.txn.UnknownDtxBranchException; import org.apache.qpid.server.util.ServerScopedRuntimeException; -import org.apache.qpid.server.virtualhost.*; +import org.apache.qpid.server.virtualhost.ExchangeExistsException; +import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException; +import org.apache.qpid.server.virtualhost.QueueExistsException; +import org.apache.qpid.server.virtualhost.RequiredExchangeException; +import org.apache.qpid.server.virtualhost.ReservedExchangeNameException; +import org.apache.qpid.server.virtualhost.UnknownExchangeException; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.transport.*; -import java.nio.ByteBuffer; -import java.util.Collection; -import java.util.List; -import java.util.Map; - public class ServerSessionDelegate extends SessionDelegate { private static final Logger LOGGER = Logger.getLogger(ServerSessionDelegate.class); @@ -1278,7 +1283,7 @@ public class ServerSessionDelegate extends SessionDelegate arguments.put(Queue.ALTERNATE_EXCHANGE, alternateExchangeName); } - final UUID id = UUIDGenerator.generateQueueUUID(queueName, virtualHost.getName()); + final UUID id = UUID.randomUUID(); arguments.put(Queue.ID, id); arguments.put(Queue.NAME, queueName); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index b852b22abb..70094ea7c7 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -86,6 +86,7 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.ConfigurationChangeListener; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Consumer; +import org.apache.qpid.server.model.Session; import org.apache.qpid.server.model.State; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.CapacityChecker; @@ -195,6 +196,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> private final CopyOnWriteArrayList<Consumer<?>> _consumers = new CopyOnWriteArrayList<Consumer<?>>(); private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener(); private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>(); + private Session<?> _modelObject; public AMQChannel(T session, int channelId, final MessageStore messageStore) @@ -737,6 +739,10 @@ public class AMQChannel<T extends AMQProtocolSession<T>> _transaction.rollback(); + if(_modelObject != null) + { + _modelObject.delete(); + } try { @@ -1759,4 +1765,16 @@ public class AMQChannel<T extends AMQProtocolSession<T>> { _consumerListeners.remove(listener); } + + @Override + public void setModelObject(final Session<?> session) + { + _modelObject = session; + } + + @Override + public Session<?> getModelObject() + { + return _modelObject; + } } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java index 42cb66ce7e..ef8d01d89f 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java @@ -36,7 +36,6 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.model.ExclusivityPolicy; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; @@ -192,7 +191,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar QueueArgumentsConverter.convertWireArgsToModel(FieldTable.convertToMap(body.getArguments())); final String queueNameString = AMQShortString.toString(queueName); attributes.put(Queue.NAME, queueNameString); - attributes.put(Queue.ID, UUIDGenerator.generateQueueUUID(queueNameString, virtualHost.getName())); + attributes.put(Queue.ID, UUID.randomUUID()); attributes.put(Queue.DURABLE, durable); LifetimePolicy lifetimePolicy; diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java index 2322865c80..7f4a3701cd 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java @@ -272,11 +272,13 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr } } + private static final AtomicInteger portNumber = new AtomicInteger(0); + private static class TestNetworkConnection implements NetworkConnection { private String _remoteHost = "127.0.0.1"; private String _localHost = "127.0.0.1"; - private int _port = 1; + private int _port = portNumber.incrementAndGet(); private final Sender<ByteBuffer> _sender; public TestNetworkConnection() diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index aa0c3f2e4b..f6823824fd 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -21,16 +21,18 @@ package org.apache.qpid.server.protocol.v1_0; import java.security.AccessControlException; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import org.apache.log4j.Logger; -import org.apache.qpid.server.binding.BindingImpl; -import org.apache.qpid.server.consumer.ConsumerImpl; -import org.apache.qpid.server.exchange.ExchangeImpl; -import org.apache.qpid.server.model.ExclusivityPolicy; -import org.apache.qpid.server.model.LifetimePolicy; -import org.apache.qpid.server.model.Queue; + import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler; import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint; @@ -41,26 +43,41 @@ import org.apache.qpid.amqp_1_0.type.DeliveryState; import org.apache.qpid.amqp_1_0.type.Outcome; import org.apache.qpid.amqp_1_0.type.Symbol; import org.apache.qpid.amqp_1_0.type.UnsignedInteger; -import org.apache.qpid.amqp_1_0.type.messaging.*; +import org.apache.qpid.amqp_1_0.type.messaging.Accepted; +import org.apache.qpid.amqp_1_0.type.messaging.ExactSubjectFilter; +import org.apache.qpid.amqp_1_0.type.messaging.Filter; +import org.apache.qpid.amqp_1_0.type.messaging.MatchingSubjectFilter; +import org.apache.qpid.amqp_1_0.type.messaging.Modified; +import org.apache.qpid.amqp_1_0.type.messaging.NoLocalFilter; +import org.apache.qpid.amqp_1_0.type.messaging.Released; +import org.apache.qpid.amqp_1_0.type.messaging.Source; +import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode; +import org.apache.qpid.amqp_1_0.type.messaging.Target; +import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability; import org.apache.qpid.amqp_1_0.type.transport.AmqpError; import org.apache.qpid.amqp_1_0.type.transport.Detach; import org.apache.qpid.amqp_1_0.type.transport.Error; import org.apache.qpid.amqp_1_0.type.transport.Transfer; import org.apache.qpid.filter.SelectorParsingException; import org.apache.qpid.filter.selector.ParseException; +import org.apache.qpid.server.binding.BindingImpl; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.DirectExchange; +import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.exchange.TopicExchange; import org.apache.qpid.server.filter.JMSSelectorFilter; import org.apache.qpid.server.filter.SimpleFilterManager; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageSource; -import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.model.ExclusivityPolicy; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.server.virtualhost.QueueExistsException; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryStateHandler { @@ -204,7 +221,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS if(queue == null) { Map<String,Object> attributes = new HashMap<String,Object>(); - attributes.put(Queue.ID, UUIDGenerator.generateQueueUUID(name, _vhost.getName())); + attributes.put(Queue.ID, UUID.randomUUID()); attributes.put(Queue.NAME, name); attributes.put(Queue.DURABLE, isDurable); attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS); diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index 36aab5ddac..2c7884b3ce 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -78,7 +78,7 @@ import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.model.ConfigurationChangeListener; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Consumer; -import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.model.Session; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.ConsumerListener; import org.apache.qpid.server.protocol.LinkRegistry; @@ -112,6 +112,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio private final CopyOnWriteArrayList<Consumer<?>> _consumers = new CopyOnWriteArrayList<Consumer<?>>(); private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener(); private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>(); + private Session<?> _modelObject; public Session_1_0(final Connection_1_0 connection, final SessionEndpoint endpoint) @@ -433,7 +434,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio ? null : (LifetimePolicy) properties.get(LIFETIME_POLICY); Map<String,Object> attributes = new HashMap<String,Object>(); - attributes.put(org.apache.qpid.server.model.Queue.ID, UUIDGenerator.generateQueueUUID(queueName, getVirtualHost().getName())); + attributes.put(org.apache.qpid.server.model.Queue.ID, UUID.randomUUID()); attributes.put(org.apache.qpid.server.model.Queue.NAME, queueName); attributes.put(org.apache.qpid.server.model.Queue.DURABLE, false); @@ -570,6 +571,10 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio { performCloseTasks(); _endpoint.end(); + if(_modelObject != null) + { + _modelObject.delete(); + } } protected void performCloseTasks() @@ -844,6 +849,18 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio _consumerListeners.remove(listener); } + @Override + public void setModelObject(final Session<?> session) + { + _modelObject = session; + } + + @Override + public Session<?> getModelObject() + { + return _modelObject; + } + private void consumerAdded(Consumer<?> consumer) { for(ConsumerListener l : _consumerListeners) diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java index 8f282c0d50..0f19b097e7 100644 --- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java +++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java @@ -49,6 +49,7 @@ import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.QueueArgumentsConverter; +import org.apache.qpid.server.virtualhost.RequiredExchangeException; @MBeanDescription("This MBean exposes the broker level management features") public class VirtualHostManagerMBean extends AbstractStatisticsGatheringMBean<VirtualHost> implements ManagedBroker @@ -185,6 +186,10 @@ public class VirtualHostManagerMBean extends AbstractStatisticsGatheringMBean<Vi { theExchange.delete(); } + catch(RequiredExchangeException e) + { + throw new UnsupportedOperationException(e.getMessage(), e); + } catch (IllegalStateException ex) { final JMException jme = new JMException(ex.toString()); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/GroupProviderRestTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/GroupProviderRestTest.java index d9320a93d0..7a954c0185 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/GroupProviderRestTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/GroupProviderRestTest.java @@ -36,7 +36,8 @@ import org.apache.qpid.server.model.GroupProvider; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.security.group.FileGroupManagerFactory; +import org.apache.qpid.server.model.adapter.FileBasedGroupProvider; +import org.apache.qpid.server.model.adapter.FileBasedGroupProviderImpl; import org.apache.qpid.test.utils.TestBrokerConfiguration; import org.apache.qpid.test.utils.TestFileUtils; @@ -76,11 +77,11 @@ public class GroupProviderRestTest extends QpidRestTestCase assertEquals("Unexpected number of providers", 1, providerDetails.size()); for (Map<String, Object> provider : providerDetails) { - assertProvider(FILE_GROUP_MANAGER, FileGroupManagerFactory.GROUP_FILE_PROVIDER_TYPE, provider); + assertProvider(FILE_GROUP_MANAGER, FileBasedGroupProviderImpl.GROUP_FILE_PROVIDER_TYPE, provider); Map<String, Object> data = getRestTestHelper().getJsonAsSingletonList("/rest/groupprovider/" + provider.get(GroupProvider.NAME)); assertNotNull("Cannot load data for " + provider.get(GroupProvider.NAME), data); - assertProvider(FILE_GROUP_MANAGER, FileGroupManagerFactory.GROUP_FILE_PROVIDER_TYPE, data); + assertProvider(FILE_GROUP_MANAGER, FileBasedGroupProviderImpl.GROUP_FILE_PROVIDER_TYPE, data); } } @@ -127,16 +128,16 @@ public class GroupProviderRestTest extends QpidRestTestCase String providerName = getTestName(); Map<String, Object> attributes = new HashMap<String, Object>(); attributes.put(GroupProvider.NAME, providerName); - attributes.put(GroupProvider.TYPE, FileGroupManagerFactory.GROUP_FILE_PROVIDER_TYPE); - attributes.put(FileGroupManagerFactory.PATH, groupFile.getAbsolutePath()); + attributes.put(GroupProvider.TYPE, FileBasedGroupProviderImpl.GROUP_FILE_PROVIDER_TYPE); + attributes.put(FileBasedGroupProvider.PATH, groupFile.getAbsolutePath()); int responseCode = getRestTestHelper().submitRequest("/rest/groupprovider/" + providerName, "PUT", attributes); assertEquals("Group provider was not created", 201, responseCode); Map<String, Object> data = getRestTestHelper().getJsonAsSingletonList("/rest/groupprovider/" + providerName + "?depth=2"); - assertProvider(providerName, FileGroupManagerFactory.GROUP_FILE_PROVIDER_TYPE, data); + assertProvider(providerName, FileBasedGroupProviderImpl.GROUP_FILE_PROVIDER_TYPE, data); assertEquals("Unexpected name", providerName, data.get(GroupProvider.NAME)); - assertEquals("Unexpected path", groupFile.getAbsolutePath(), data.get(FileGroupManagerFactory.PATH)); + assertEquals("Unexpected path", groupFile.getAbsolutePath(), data.get(FileBasedGroupProvider.PATH)); @SuppressWarnings("unchecked") List<Map<String, Object>> groups = (List<Map<String, Object>>) data.get("groups"); @@ -174,7 +175,7 @@ public class GroupProviderRestTest extends QpidRestTestCase String providerName = getTestName(); Map<String, Object> attributes = new HashMap<String, Object>(); attributes.put(GroupProvider.NAME, providerName); - attributes.put(GroupProvider.TYPE, FileGroupManagerFactory.GROUP_FILE_PROVIDER_TYPE); + attributes.put(GroupProvider.TYPE, FileBasedGroupProviderImpl.GROUP_FILE_PROVIDER_TYPE); int responseCode = getRestTestHelper().submitRequest("/rest/groupprovider/" + providerName, "PUT", attributes); assertEquals("Group provider was created", 409, responseCode); @@ -189,15 +190,15 @@ public class GroupProviderRestTest extends QpidRestTestCase String providerName = getTestName(); Map<String, Object> attributes = new HashMap<String, Object>(); attributes.put(GroupProvider.NAME, providerName); - attributes.put(GroupProvider.TYPE, FileGroupManagerFactory.GROUP_FILE_PROVIDER_TYPE); - attributes.put(FileGroupManagerFactory.PATH, groupFile.getAbsolutePath()); + attributes.put(GroupProvider.TYPE, FileBasedGroupProviderImpl.GROUP_FILE_PROVIDER_TYPE); + attributes.put(FileBasedGroupProvider.PATH, groupFile.getAbsolutePath()); int responseCode = getRestTestHelper().submitRequest("/rest/groupprovider/" + providerName, "PUT", attributes); assertEquals("Group provider was not created", 201, responseCode); Map<String, Object> data = getRestTestHelper().getJsonAsSingletonList("/rest/groupprovider/" + providerName); assertEquals("Unexpected name", providerName, data.get(GroupProvider.NAME)); - assertEquals("Unexpected path", groupFile.getAbsolutePath(), data.get(FileGroupManagerFactory.PATH)); + assertEquals("Unexpected path", groupFile.getAbsolutePath(), data.get(FileBasedGroupProvider.PATH)); @SuppressWarnings("unchecked") List<Map<String, Object>> groups = (List<Map<String, Object>>) data.get("groups"); @@ -220,8 +221,8 @@ public class GroupProviderRestTest extends QpidRestTestCase { Map<String, Object> attributes = new HashMap<String, Object>(); attributes.put(GroupProvider.NAME, providerName); - attributes.put(GroupProvider.TYPE, FileGroupManagerFactory.GROUP_FILE_PROVIDER_TYPE); - attributes.put(FileGroupManagerFactory.PATH, groupFile.getAbsolutePath()); + attributes.put(GroupProvider.TYPE, FileBasedGroupProviderImpl.GROUP_FILE_PROVIDER_TYPE); + attributes.put(FileBasedGroupProvider.PATH, groupFile.getAbsolutePath()); int responseCode = getRestTestHelper().submitRequest("/rest/groupprovider/" + providerName, "PUT", attributes); assertEquals("Group provider was not created", 201, responseCode); @@ -244,8 +245,8 @@ public class GroupProviderRestTest extends QpidRestTestCase { Map<String, Object> attributes = new HashMap<String, Object>(); attributes.put(GroupProvider.NAME, providerName); - attributes.put(GroupProvider.TYPE, FileGroupManagerFactory.GROUP_FILE_PROVIDER_TYPE); - attributes.put(FileGroupManagerFactory.PATH, groupFile.getAbsolutePath()); + attributes.put(GroupProvider.TYPE, FileBasedGroupProviderImpl.GROUP_FILE_PROVIDER_TYPE); + attributes.put(FileBasedGroupProvider.PATH, groupFile.getAbsolutePath()); int responseCode = getRestTestHelper().submitRequest("/rest/groupprovider/" + providerName, "PUT", attributes); assertEquals("Expected to fail because we can have only one password provider", 201, responseCode); @@ -271,14 +272,14 @@ public class GroupProviderRestTest extends QpidRestTestCase { Map<String, Object> attributes = new HashMap<String, Object>(); attributes.put(GroupProvider.NAME, providerName); - attributes.put(GroupProvider.TYPE, FileGroupManagerFactory.GROUP_FILE_PROVIDER_TYPE); - attributes.put(FileGroupManagerFactory.PATH, groupFile.getAbsolutePath()); + attributes.put(GroupProvider.TYPE, FileBasedGroupProviderImpl.GROUP_FILE_PROVIDER_TYPE); + attributes.put(FileBasedGroupProvider.PATH, groupFile.getAbsolutePath()); int responseCode = getRestTestHelper().submitRequest("/rest/groupprovider/" + providerName, "PUT", attributes); assertEquals("Expected to fail because we can have only one password provider", 201, responseCode); File newGroupFile = new File(TMP_FOLDER + File.separator + getTestName() + File.separator + "groups"); - attributes.put(FileGroupManagerFactory.PATH, newGroupFile.getAbsolutePath()); + attributes.put(FileBasedGroupProvider.PATH, newGroupFile.getAbsolutePath()); responseCode = getRestTestHelper().submitRequest("/rest/groupprovider/" + providerName, "PUT", attributes); assertEquals("Expected to fail because we can have only one password provider", 409, responseCode); @@ -310,7 +311,7 @@ public class GroupProviderRestTest extends QpidRestTestCase Map<String, Object> groupProvider = getRestTestHelper().getJsonAsSingletonList("/rest/groupprovider/" + TestBrokerConfiguration.ENTRY_NAME_GROUP_FILE); assertEquals("Unexpected id", id.toString(), groupProvider.get(GroupProvider.ID)); - assertEquals("Unexpected path", file.getAbsolutePath() , groupProvider.get(FileGroupManagerFactory.PATH)); + assertEquals("Unexpected path", file.getAbsolutePath() , groupProvider.get(FileBasedGroupProvider.PATH)); assertEquals("Unexpected state", State.ERRORED.name() , groupProvider.get(GroupProvider.STATE)); int status = getRestTestHelper().submitRequest("/rest/groupprovider/" + TestBrokerConfiguration.ENTRY_NAME_GROUP_FILE, "DELETE", null); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java index 1a39d9c3b0..541160cd80 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java @@ -41,13 +41,14 @@ import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.TrustStore; import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.model.adapter.FileBasedGroupProvider; +import org.apache.qpid.server.model.adapter.FileBasedGroupProviderImpl; import org.apache.qpid.server.security.FileKeyStore; import org.apache.qpid.server.security.FileTrustStore; import org.apache.qpid.server.security.access.FileAccessControlProviderConstants; import org.apache.qpid.server.security.acl.AbstractACLTestCase; import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManagerFactory; import org.apache.qpid.server.security.auth.manager.PlainPasswordFileAuthenticationManagerFactory; -import org.apache.qpid.server.security.group.FileGroupManagerFactory; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.virtualhost.StandardVirtualHost; import org.apache.qpid.systest.rest.QpidRestTestCase; @@ -738,8 +739,8 @@ public class BrokerACLTest extends QpidRestTestCase Map<String, Object> attributes = new HashMap<String, Object>(); attributes.put(GroupProvider.NAME, groupProviderName); - attributes.put(GroupProvider.TYPE, FileGroupManagerFactory.GROUP_FILE_PROVIDER_TYPE); - attributes.put(FileGroupManagerFactory.PATH, "/path/to/file"); + attributes.put(GroupProvider.TYPE, FileBasedGroupProviderImpl.GROUP_FILE_PROVIDER_TYPE); + attributes.put(FileBasedGroupProvider.PATH, "/path/to/file"); responseCode = getRestTestHelper().submitRequest("/rest/groupprovider/" + groupProviderName, "PUT", attributes); assertEquals("Setting of group provider attributes should be allowed but not supported", 409, responseCode); } @@ -761,8 +762,8 @@ public class BrokerACLTest extends QpidRestTestCase Map<String, Object> attributes = new HashMap<String, Object>(); attributes.put(GroupProvider.NAME, groupProviderName); - attributes.put(GroupProvider.TYPE, FileGroupManagerFactory.GROUP_FILE_PROVIDER_TYPE); - attributes.put(FileGroupManagerFactory.PATH, "/path/to/file"); + attributes.put(GroupProvider.TYPE, FileBasedGroupProviderImpl.GROUP_FILE_PROVIDER_TYPE); + attributes.put(FileBasedGroupProvider.PATH, "/path/to/file"); responseCode = getRestTestHelper().submitRequest("/rest/groupprovider/" + groupProviderName, "PUT", attributes); assertEquals("Setting of group provider attributes should be denied", 403, responseCode); } @@ -852,8 +853,8 @@ public class BrokerACLTest extends QpidRestTestCase Map<String, Object> attributes = new HashMap<String, Object>(); attributes.put(GroupProvider.NAME, accessControlProviderName); - attributes.put(GroupProvider.TYPE, FileGroupManagerFactory.GROUP_FILE_PROVIDER_TYPE); - attributes.put(FileGroupManagerFactory.PATH, "/path/to/file"); + attributes.put(GroupProvider.TYPE, FileBasedGroupProviderImpl.GROUP_FILE_PROVIDER_TYPE); + attributes.put(FileBasedGroupProvider.PATH, "/path/to/file"); responseCode = getRestTestHelper().submitRequest("/rest/accesscontrolprovider/" + accessControlProviderName, "PUT", attributes); assertEquals("Setting of access control provider attributes should be allowed but not supported", 409, responseCode); } @@ -875,8 +876,8 @@ public class BrokerACLTest extends QpidRestTestCase Map<String, Object> attributes = new HashMap<String, Object>(); attributes.put(GroupProvider.NAME, accessControlProviderName); - attributes.put(GroupProvider.TYPE, FileGroupManagerFactory.GROUP_FILE_PROVIDER_TYPE); - attributes.put(FileGroupManagerFactory.PATH, "/path/to/file"); + attributes.put(GroupProvider.TYPE, FileBasedGroupProviderImpl.GROUP_FILE_PROVIDER_TYPE); + attributes.put(FileBasedGroupProvider.PATH, "/path/to/file"); responseCode = getRestTestHelper().submitRequest("/rest/accesscontrolprovider/" + accessControlProviderName, "PUT", attributes); assertEquals("Setting of access control provider attributes should be denied", 403, responseCode); } @@ -1073,8 +1074,8 @@ public class BrokerACLTest extends QpidRestTestCase File file = TestFileUtils.createTempFile(this, ".groups"); Map<String, Object> attributes = new HashMap<String, Object>(); attributes.put(GroupProvider.NAME, groupProviderName); - attributes.put(GroupProvider.TYPE, FileGroupManagerFactory.GROUP_FILE_PROVIDER_TYPE); - attributes.put(FileGroupManagerFactory.PATH, file.getAbsoluteFile()); + attributes.put(GroupProvider.TYPE, FileBasedGroupProviderImpl.GROUP_FILE_PROVIDER_TYPE); + attributes.put(FileBasedGroupProvider.PATH, file.getAbsoluteFile()); return getRestTestHelper().submitRequest("/rest/groupprovider/" + groupProviderName, "PUT", attributes); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java index c31f2eed3d..0e9256e1c6 100755 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java @@ -62,6 +62,7 @@ import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.server.Broker; import org.apache.qpid.server.BrokerOptions; import org.apache.qpid.server.configuration.BrokerProperties; +import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.VirtualHost; @@ -77,6 +78,8 @@ import org.apache.qpid.util.SystemUtils; */ public class QpidBrokerTestCase extends QpidTestCase { + private TaskExecutor _taskExecutor; + public enum BrokerType { EXTERNAL /** Test case relies on a Broker started independently of the test-suite */, @@ -226,7 +229,12 @@ public class QpidBrokerTestCase extends QpidTestCase public TestBrokerConfiguration createBrokerConfiguration(int port) { int actualPort = getPort(port); - TestBrokerConfiguration configuration = new TestBrokerConfiguration(System.getProperty(_brokerStoreType), _configFile.getAbsolutePath()); + if(_taskExecutor == null) + { + _taskExecutor = new TaskExecutor(); + _taskExecutor.start(); + } + TestBrokerConfiguration configuration = new TestBrokerConfiguration(System.getProperty(_brokerStoreType), _configFile.getAbsolutePath(), _taskExecutor); synchronized (_brokerConfigurations) { _brokerConfigurations.put(actualPort, configuration); @@ -341,7 +349,8 @@ public class QpidBrokerTestCase extends QpidTestCase protected void setUp() throws Exception { super.setUp(); - + _taskExecutor = new TaskExecutor(); + _taskExecutor.start(); if (!_configFile.exists()) { fail("Unable to test without config file:" + _configFile); @@ -1192,6 +1201,10 @@ public class QpidBrokerTestCase extends QpidTestCase { c.close(); } + if(_taskExecutor != null) + { + _taskExecutor.stop(); + } } /** diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java index fd62ce75b9..3feb2eaab9 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java @@ -37,15 +37,16 @@ import org.apache.qpid.server.model.AccessControlProvider; import org.apache.qpid.server.model.AuthenticationProvider; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.model.ConfiguredObjectFactory; +import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl; import org.apache.qpid.server.model.GroupProvider; import org.apache.qpid.server.model.Model; import org.apache.qpid.server.model.Plugin; import org.apache.qpid.server.model.PreferencesProvider; import org.apache.qpid.server.model.SystemContextImpl; import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.model.adapter.FileBasedGroupProvider; +import org.apache.qpid.server.model.adapter.FileBasedGroupProviderImpl; import org.apache.qpid.server.security.access.FileAccessControlProviderConstants; -import org.apache.qpid.server.security.group.FileGroupManagerFactory; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.ConfiguredObjectRecordImpl; import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; @@ -73,9 +74,9 @@ public class TestBrokerConfiguration private MemoryConfigurationEntryStore _store; private boolean _saved; - public TestBrokerConfiguration(String storeType, String intialStoreLocation) + public TestBrokerConfiguration(String storeType, String intialStoreLocation, final TaskExecutor taskExecutor) { - _store = new MemoryConfigurationEntryStore(new SystemContextImpl(new TaskExecutor(), new ConfiguredObjectFactory( + _store = new MemoryConfigurationEntryStore(new SystemContextImpl(taskExecutor, new ConfiguredObjectFactoryImpl( Model.getInstance()), mock(EventLogger.class), mock(LogRecorder.class), mock(BrokerOptions.class)), @@ -163,8 +164,8 @@ public class TestBrokerConfiguration { Map<String, Object> attributes = new HashMap<String, Object>(); attributes.put(GroupProvider.NAME, ENTRY_NAME_GROUP_FILE); - attributes.put(GroupProvider.TYPE, FileGroupManagerFactory.GROUP_FILE_PROVIDER_TYPE); - attributes.put(FileGroupManagerFactory.PATH, groupFilePath); + attributes.put(GroupProvider.TYPE, FileBasedGroupProviderImpl.GROUP_FILE_PROVIDER_TYPE); + attributes.put(FileBasedGroupProvider.PATH, groupFilePath); return addObjectConfiguration(GroupProvider.class, attributes); } |
