diff options
Diffstat (limited to 'qpid/java')
14 files changed, 337 insertions, 540 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java index de17acabab..97d6355fa4 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java @@ -163,7 +163,6 @@ public class BDBHAVirtualHost extends AbstractVirtualHost<BDBHAVirtualHost> getConnectionRegistry().close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT); removeHouseKeepingTasks(); - getQueueRegistry().close(); getDtxRegistry().close(); finalState = VirtualHostState.PASSIVE; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java deleted file mode 100644 index 9a051be324..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ /dev/null @@ -1,274 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; - -import org.apache.qpid.server.exchange.AMQUnknownExchangeType; -import org.apache.qpid.server.exchange.ExchangeImpl; -import org.apache.qpid.server.model.LifetimePolicy; -import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.server.configuration.BrokerProperties; -import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.store.DurableConfigurationStoreHelper; -import org.apache.qpid.server.util.ConnectionScopedRuntimeException; -import org.apache.qpid.server.util.MapValueConverter; -import org.apache.qpid.server.util.ServerScopedRuntimeException; -import org.apache.qpid.server.virtualhost.ExchangeExistsException; -import org.apache.qpid.server.virtualhost.ReservedExchangeNameException; -import org.apache.qpid.server.virtualhost.UnknownExchangeException; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import org.apache.qpid.server.virtualhost.QueueExistsException; - -public class AMQQueueFactory implements QueueFactory -{ - - - public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ"; - public static final String DLQ_ROUTING_KEY = "dlq"; - private static final int MAX_LENGTH = 255; - - private final VirtualHostImpl _virtualHost; - private final QueueRegistry _queueRegistry; - - public AMQQueueFactory(VirtualHostImpl virtualHost, QueueRegistry queueRegistry) - { - _virtualHost = virtualHost; - _queueRegistry = queueRegistry; - } - - @Override - public AMQQueue restoreQueue(Map<String, Object> attributes) - { - return createOrRestoreQueue(attributes, false); - - } - - @Override - public AMQQueue createQueue(Map<String, Object> attributes) - { - return createOrRestoreQueue(attributes, true); - } - - private AMQQueue createOrRestoreQueue(Map<String, Object> attributes, boolean createInStore) - { - String queueName = MapValueConverter.getStringAttribute(Queue.NAME,attributes); - boolean createDLQ = createInStore && shouldCreateDLQ(attributes, _virtualHost.getDefaultDeadLetterQueueEnabled()); - if (createDLQ) - { - validateDLNames(queueName); - } - - AMQQueue queue; - - if(attributes.containsKey(SortedQueue.SORT_KEY)) - { - queue = new SortedQueueImpl(_virtualHost, attributes); - } - else if(attributes.containsKey(LastValueQueue.LVQ_KEY)) - { - queue = new LastValueQueueImpl(_virtualHost, attributes); - } - else if(attributes.containsKey(PriorityQueue.PRIORITIES)) - { - queue = new PriorityQueueImpl(_virtualHost, attributes); - } - else - { - queue = new StandardQueueImpl(_virtualHost, attributes); - } - queue.open(); - //Register the new queue - _queueRegistry.registerQueue(queue); - - if(createDLQ) - { - createDLQ(queue); - } - else if(attributes != null && attributes.get(Queue.ALTERNATE_EXCHANGE) instanceof String) - { - - final String altExchangeAttr = (String) attributes.get(Queue.ALTERNATE_EXCHANGE); - ExchangeImpl altExchange; - try - { - altExchange = _virtualHost.getExchange(UUID.fromString(altExchangeAttr)); - } - catch(IllegalArgumentException e) - { - altExchange = _virtualHost.getExchange(altExchangeAttr); - } - queue.setAlternateExchange(altExchange); - } - - if (createInStore && queue.isDurable() && !(queue.getLifetimePolicy() - == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE - || queue.getLifetimePolicy() - == LifetimePolicy.DELETE_ON_SESSION_END)) - { - DurableConfigurationStoreHelper.createQueue(_virtualHost.getDurableConfigurationStore(), queue); - } - - return queue; - } - - private void createDLQ(final AMQQueue queue) - { - final String queueName = queue.getName(); - final String dlExchangeName = getDeadLetterExchangeName(queueName); - final String dlQueueName = getDeadLetterQueueName(queueName); - - ExchangeImpl dlExchange = null; - final UUID dlExchangeId = UUIDGenerator.generateExchangeUUID(dlExchangeName, _virtualHost.getName()); - - try - { - Map<String,Object> attributes = new HashMap<String, Object>(); - - attributes.put(org.apache.qpid.server.model.Exchange.ID, dlExchangeId); - attributes.put(org.apache.qpid.server.model.Exchange.NAME, dlExchangeName); - attributes.put(org.apache.qpid.server.model.Exchange.TYPE, ExchangeDefaults.FANOUT_EXCHANGE_CLASS); - attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, true); - attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, - false ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT); - attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null); - dlExchange = _virtualHost.createExchange(attributes); - } - catch(ExchangeExistsException e) - { - // We're ok if the exchange already exists - dlExchange = e.getExistingExchange(); - } - catch (ReservedExchangeNameException e) - { - throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e); - } - catch (AMQUnknownExchangeType e) - { - throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e); - } - catch (UnknownExchangeException e) - { - throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e); - } - - AMQQueue dlQueue = null; - - synchronized(_queueRegistry) - { - dlQueue = _queueRegistry.getQueue(dlQueueName); - - if(dlQueue == null) - { - //set args to disable DLQ-ing/MDC from the DLQ itself, preventing loops etc - final Map<String, Object> args = new HashMap<String, Object>(); - args.put(Queue.CREATE_DLQ_ON_CREATION, false); - args.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 0); - - try - { - - - args.put(Queue.ID, UUID.randomUUID()); - args.put(Queue.NAME, dlQueueName); - args.put(Queue.DURABLE, true); - dlQueue = _virtualHost.createQueue(args); - } - catch (QueueExistsException e) - { - throw new ServerScopedRuntimeException("Attempt to create a queue failed because the " + - "queue already exists, however this occurred within " + - "a block where the queue existence had previously been " + - "checked, and no queue creation should have been " + - "possible from another thread", e); - } - } - } - - //ensure the queue is bound to the exchange - if(!dlExchange.isBound(DLQ_ROUTING_KEY, dlQueue)) - { - //actual routing key used does not matter due to use of fanout exchange, - //but we will make the key 'dlq' as it can be logged at creation. - dlExchange.addBinding(DLQ_ROUTING_KEY, dlQueue, null); - } - queue.setAlternateExchange(dlExchange); - } - - private static void validateDLNames(String name) - { - // check if DLQ name and DLQ exchange name do not exceed 255 - String exchangeName = getDeadLetterExchangeName(name); - if (exchangeName.length() > MAX_LENGTH) - { - throw new IllegalArgumentException("DL exchange name '" + exchangeName - + "' length exceeds limit of " + MAX_LENGTH + " characters for queue " + name); - } - String queueName = getDeadLetterQueueName(name); - if (queueName.length() > MAX_LENGTH) - { - throw new IllegalArgumentException("DLQ queue name '" + queueName + "' length exceeds limit of " - + MAX_LENGTH + " characters for queue " + name); - } - } - - private static boolean shouldCreateDLQ(Map<String, Object> arguments, boolean virtualHostDefaultDeadLetterQueueEnabled) - { - boolean autoDelete = MapValueConverter.getEnumAttribute(LifetimePolicy.class, - Queue.LIFETIME_POLICY, - arguments, - LifetimePolicy.PERMANENT) != LifetimePolicy.PERMANENT; - - //feature is not to be enabled for temporary queues or when explicitly disabled by argument - if (!(autoDelete || (arguments != null && arguments.containsKey(Queue.ALTERNATE_EXCHANGE)))) - { - boolean dlqArgumentPresent = arguments != null - && arguments.containsKey(Queue.CREATE_DLQ_ON_CREATION); - if (dlqArgumentPresent) - { - boolean dlqEnabled = true; - if (dlqArgumentPresent) - { - Object argument = arguments.get(Queue.CREATE_DLQ_ON_CREATION); - dlqEnabled = (argument instanceof Boolean && ((Boolean)argument).booleanValue()) - || (argument instanceof String && Boolean.parseBoolean(argument.toString())); - } - return dlqEnabled; - } - return virtualHostDefaultDeadLetterQueueEnabled; - } - return false; - } - - private static String getDeadLetterQueueName(String name) - { - return name + System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, DEFAULT_DLQ_NAME_SUFFIX); - } - - private static String getDeadLetterExchangeName(String name) - { - return name + System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX); - } - -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index c015bd6d7c..f6a667e612 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -313,7 +313,16 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> _logSubject = new QueueLogSubject(this); - _virtualHost.getSecurityManager().authoriseCreateQueue(this); + try + { + + _virtualHost.getSecurityManager().authoriseCreateQueue(this); + } + catch(AccessControlException e) + { + deleted(); + throw e; + } Subject activeSubject = Subject.getSubject(AccessController.getContext()); Set<SessionPrincipal> sessionPrincipals = activeSubject == null ? Collections.<SessionPrincipal>emptySet() : activeSubject.getPrincipals(SessionPrincipal.class); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java index 9ee9125bcf..079f04f92f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java @@ -31,7 +31,7 @@ public class LastValueQueueImpl extends AbstractQueue<LastValueQueueImpl> implem public static final String DEFAULT_LVQ_KEY = "qpid.LVQ_key"; - protected LastValueQueueImpl(VirtualHostImpl virtualHost, + public LastValueQueueImpl(VirtualHostImpl virtualHost, Map<String, Object> attributes) { super(virtualHost, attributes, entryList(attributes)); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java index 4f73bea8e6..89d542aded 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java @@ -30,7 +30,7 @@ public class PriorityQueueImpl extends OutOfOrderQueue<PriorityQueueImpl> implem public static final int DEFAULT_PRIORITY_LEVELS = 10; - protected PriorityQueueImpl(VirtualHostImpl virtualHost, + public PriorityQueueImpl(VirtualHostImpl virtualHost, Map<String, Object> attributes) { super(virtualHost, attributes, entryList(attributes)); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java deleted file mode 100644 index 5b13c4c574..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import java.util.Map; - -import org.apache.qpid.server.protocol.AMQSessionModel; - -public interface QueueFactory -{ - AMQQueue createQueue(Map<String, Object> arguments); - - AMQQueue restoreQueue(Map<String, Object> arguments); - -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java index 3115f6f581..c495b09d4a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java @@ -44,7 +44,7 @@ public class SortedQueueImpl extends OutOfOrderQueue<SortedQueueImpl> implements } - protected SortedQueueImpl(VirtualHostImpl virtualHost, + public SortedQueueImpl(VirtualHostImpl virtualHost, Map<String, Object> attributes) { this(virtualHost, diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index b4a54aa5cb..0484841aa9 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -43,6 +43,7 @@ import javax.security.auth.Subject; import org.apache.log4j.Logger; import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.connection.ConnectionRegistry; import org.apache.qpid.server.connection.IConnectionRegistry; @@ -69,13 +70,13 @@ import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.LastValueQueue; import org.apache.qpid.server.queue.LastValueQueueImpl; -import org.apache.qpid.server.queue.DefaultQueueRegistry; import org.apache.qpid.server.queue.PriorityQueue; -import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.queue.PriorityQueueImpl; import org.apache.qpid.server.queue.SortedQueue; +import org.apache.qpid.server.queue.SortedQueueImpl; +import org.apache.qpid.server.queue.StandardQueueImpl; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.access.Operation; import org.apache.qpid.server.stats.StatisticsCounter; @@ -91,12 +92,17 @@ import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; import org.apache.qpid.server.txn.DtxRegistry; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.util.MapValueConverter; public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> extends AbstractConfiguredObject<X> implements VirtualHostImpl<X, AMQQueue<?>, ExchangeImpl<?>>, IConnectionRegistry.RegistryChangeListener, EventListener, VirtualHost<X,AMQQueue<?>, ExchangeImpl<?>> { + public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ"; + public static final String DLQ_ROUTING_KEY = "dlq"; + private static final int MAX_LENGTH = 255; + private static final Logger _logger = Logger.getLogger(AbstractVirtualHost.class); private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5; @@ -107,12 +113,10 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte private final Broker<?> _broker; - private final QueueRegistry _queueRegistry; - private final ConnectionRegistry _connectionRegistry; private final DtxRegistry _dtxRegistry; - private final AMQQueueFactory _queueFactory; + private final SystemNodeRegistry _systemNodeRegistry = new SystemNodeRegistry(); private volatile VirtualHostState _state = VirtualHostState.INITIALISING; @@ -182,10 +186,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte _connectionRegistry = new ConnectionRegistry(); _connectionRegistry.addRegistryChangeListener(this); - _queueRegistry = new DefaultQueueRegistry(this); - - _queueFactory = new AMQQueueFactory(this, _queueRegistry); - _defaultDestination = new DefaultDestination(this); } @@ -419,11 +419,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte @Override public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz) { - if(clazz == Queue.class) - { - return (Collection<C>) getQueues(); - } - else if(clazz == Connection.class) + if(clazz == Connection.class) { return (Collection<C>) getConnections(); } @@ -567,15 +563,10 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte return _createTime; } - public QueueRegistry getQueueRegistry() - { - return _queueRegistry; - } - @Override public AMQQueue<?> getQueue(String name) { - return _queueRegistry.getQueue(name); + return (AMQQueue<?>) getChildByName(Queue.class, name); } @Override @@ -588,34 +579,31 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte @Override public AMQQueue<?> getQueue(UUID id) { - return _queueRegistry.getQueue(id); + return (AMQQueue<?>) getChildById(Queue.class, id); } @Override public Collection<AMQQueue<?>> getQueues() { - return _queueRegistry.getQueues(); + Collection children = getChildren(Queue.class); + return children; } @Override public int removeQueue(AMQQueue<?> queue) { - synchronized (getQueueRegistry()) - { - int purged = queue.delete(); + int purged = queue.delete(); - getQueueRegistry().unregisterQueue(queue.getName()); - if (queue.isDurable() && !(queue.getLifetimePolicy() - == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE - || queue.getLifetimePolicy() - == LifetimePolicy.DELETE_ON_SESSION_END)) - { - DurableConfigurationStore store = getDurableConfigurationStore(); - DurableConfigurationStoreHelper.removeQueue(store, queue); - } - return purged; + if (queue.isDurable() && !(queue.getLifetimePolicy() + == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE + || queue.getLifetimePolicy() + == LifetimePolicy.DELETE_ON_SESSION_END)) + { + DurableConfigurationStore store = getDurableConfigurationStore(); + DurableConfigurationStoreHelper.removeQueue(store, queue); } - } + return purged; +} public AMQQueue<?> createQueue(Map<String, Object> attributes) throws QueueExistsException { @@ -656,36 +644,21 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } } - String queueName = MapValueConverter.getStringAttribute(Queue.NAME, attributes); - - synchronized (_queueRegistry) + if(!attributes.containsKey(Queue.ID)) { - if(_queueRegistry.getQueue(queueName) != null) - { - throw new QueueExistsException("Queue with name " + queueName + " already exists", _queueRegistry.getQueue(queueName)); - } - if(!attributes.containsKey(Queue.ID)) - { - - UUID id = UUID.randomUUID(); - while(_queueRegistry.getQueue(id) != null) - { - id = UUID.randomUUID(); - } - attributes.put(Queue.ID, id); + UUID id = UUID.randomUUID(); + attributes.put(Queue.ID, id); + } - } - else if(_queueRegistry.getQueue(MapValueConverter.getUUIDAttribute(Queue.ID, attributes)) != null) - { - throw new QueueExistsException("Queue with id " - + MapValueConverter.getUUIDAttribute(Queue.ID, - attributes) - + " already exists", _queueRegistry.getQueue(queueName)); - } + boolean createDLQ = shouldCreateDLQ(attributes, getDefaultDeadLetterQueueEnabled()); + if (createDLQ) + { + // TODO - this isn't really correct - what if the name has ${foo} in it? + validateDLNames(String.valueOf(attributes.get(Queue.NAME))); + } + return createOrRestoreQueue(attributes, true); - return _queueFactory.createQueue(attributes); - } } @@ -873,7 +846,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte { //Stop Connections _connectionRegistry.close(); - _queueRegistry.close(); _dtxRegistry.close(); closeStorage(); shutdownHouseKeeping(); @@ -1127,7 +1099,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte protected Map<String, DurableConfiguredObjectRecoverer> getDurableConfigurationRecoverers() { DurableConfiguredObjectRecoverer[] recoverers = { - new QueueRecoverer(this, _queueFactory), + new QueueRecoverer(this), new ExchangeRecoverer(this), new BindingRecoverer(this) }; @@ -1178,7 +1150,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte public void execute() { - for (AMQQueue<?> q : _queueRegistry.getQueues()) + for (AMQQueue<?> q : getQueues()) { if (_logger.isDebugEnabled()) { @@ -1552,4 +1524,220 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte return Collections.unmodifiableCollection(_aliases); } + + // TODO - remove + public AMQQueue restoreQueue(Map<String, Object> attributes) + { + return createOrRestoreQueue(attributes, false); + + } + + + private AMQQueue createOrRestoreQueue(Map<String, Object> attributes, boolean createInStore) + { + String queueName = MapValueConverter.getStringAttribute(Queue.NAME,attributes); + boolean createDLQ = createInStore && shouldCreateDLQ(attributes, getDefaultDeadLetterQueueEnabled()); + if (createDLQ) + { + validateDLNames(queueName); + } + + AMQQueue queue; + + try + { + + + if (attributes.containsKey(SortedQueue.SORT_KEY)) + { + queue = new SortedQueueImpl(this, attributes); + } + else if (attributes.containsKey(LastValueQueue.LVQ_KEY)) + { + queue = new LastValueQueueImpl(this, attributes); + } + else if (attributes.containsKey(PriorityQueue.PRIORITIES)) + { + queue = new PriorityQueueImpl(this, attributes); + } + else + { + queue = new StandardQueueImpl(this, attributes); + } + queue.open(); + } + catch(DuplicateNameException e) + { + + throw new QueueExistsException(e.getName(), getQueue(e.getName())); + } + + if(createDLQ) + { + createDLQ(queue); + } + else if(attributes != null && attributes.get(Queue.ALTERNATE_EXCHANGE) instanceof String) + { + + final String altExchangeAttr = (String) attributes.get(Queue.ALTERNATE_EXCHANGE); + ExchangeImpl altExchange; + try + { + altExchange = getExchange(UUID.fromString(altExchangeAttr)); + } + catch(IllegalArgumentException e) + { + altExchange = getExchange(altExchangeAttr); + } + queue.setAlternateExchange(altExchange); + } + + if (createInStore && queue.isDurable() && !(queue.getLifetimePolicy() + == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE + || queue.getLifetimePolicy() + == LifetimePolicy.DELETE_ON_SESSION_END)) + { + DurableConfigurationStoreHelper.createQueue(getDurableConfigurationStore(), queue); + } + + return queue; + } + + + + private void createDLQ(final AMQQueue queue) + { + final String queueName = queue.getName(); + final String dlExchangeName = getDeadLetterExchangeName(queueName); + final String dlQueueName = getDeadLetterQueueName(queueName); + + ExchangeImpl dlExchange = null; + final UUID dlExchangeId = UUIDGenerator.generateExchangeUUID(dlExchangeName, getName()); + + try + { + Map<String,Object> attributes = new HashMap<String, Object>(); + + attributes.put(org.apache.qpid.server.model.Exchange.ID, dlExchangeId); + attributes.put(org.apache.qpid.server.model.Exchange.NAME, dlExchangeName); + attributes.put(org.apache.qpid.server.model.Exchange.TYPE, ExchangeDefaults.FANOUT_EXCHANGE_CLASS); + attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, true); + attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, + false ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT); + attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null); + dlExchange = createExchange(attributes); + } + catch(ExchangeExistsException e) + { + // We're ok if the exchange already exists + dlExchange = e.getExistingExchange(); + } + catch (ReservedExchangeNameException e) + { + throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e); + } + catch (AMQUnknownExchangeType e) + { + throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e); + } + catch (UnknownExchangeException e) + { + throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e); + } + + AMQQueue dlQueue = null; + + { + dlQueue = getQueue(dlQueueName); + + if(dlQueue == null) + { + //set args to disable DLQ-ing/MDC from the DLQ itself, preventing loops etc + final Map<String, Object> args = new HashMap<String, Object>(); + args.put(Queue.CREATE_DLQ_ON_CREATION, false); + args.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 0); + + try + { + + + args.put(Queue.ID, UUID.randomUUID()); + args.put(Queue.NAME, dlQueueName); + args.put(Queue.DURABLE, true); + dlQueue = createQueue(args); + } + catch (QueueExistsException e) + { + // TODO - currently theoretically for two threads to be creating a queue at the same time. + // All model changing operations should be moved to the task executor of the virtual host + } + } + } + + //ensure the queue is bound to the exchange + if(!dlExchange.isBound(AbstractVirtualHost.DLQ_ROUTING_KEY, dlQueue)) + { + //actual routing key used does not matter due to use of fanout exchange, + //but we will make the key 'dlq' as it can be logged at creation. + dlExchange.addBinding(AbstractVirtualHost.DLQ_ROUTING_KEY, dlQueue, null); + } + queue.setAlternateExchange(dlExchange); + } + + private static void validateDLNames(String name) + { + // check if DLQ name and DLQ exchange name do not exceed 255 + String exchangeName = getDeadLetterExchangeName(name); + if (exchangeName.length() > MAX_LENGTH) + { + throw new IllegalArgumentException("DL exchange name '" + exchangeName + + "' length exceeds limit of " + MAX_LENGTH + " characters for queue " + name); + } + String queueName = getDeadLetterQueueName(name); + if (queueName.length() > MAX_LENGTH) + { + throw new IllegalArgumentException("DLQ queue name '" + queueName + "' length exceeds limit of " + + MAX_LENGTH + " characters for queue " + name); + } + } + + private static boolean shouldCreateDLQ(Map<String, Object> arguments, boolean virtualHostDefaultDeadLetterQueueEnabled) + { + boolean autoDelete = MapValueConverter.getEnumAttribute(LifetimePolicy.class, + Queue.LIFETIME_POLICY, + arguments, + LifetimePolicy.PERMANENT) != LifetimePolicy.PERMANENT; + + //feature is not to be enabled for temporary queues or when explicitly disabled by argument + if (!(autoDelete || (arguments != null && arguments.containsKey(Queue.ALTERNATE_EXCHANGE)))) + { + boolean dlqArgumentPresent = arguments != null + && arguments.containsKey(Queue.CREATE_DLQ_ON_CREATION); + if (dlqArgumentPresent) + { + boolean dlqEnabled = true; + if (dlqArgumentPresent) + { + Object argument = arguments.get(Queue.CREATE_DLQ_ON_CREATION); + dlqEnabled = (argument instanceof Boolean && ((Boolean)argument).booleanValue()) + || (argument instanceof String && Boolean.parseBoolean(argument.toString())); + } + return dlqEnabled; + } + return virtualHostDefaultDeadLetterQueueEnabled; + } + return false; + } + + private static String getDeadLetterQueueName(String name) + { + return name + System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX); + } + + private static String getDeadLetterExchangeName(String name) + { + return name + System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX); + } + + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java index 788ae8ac9f..1841f1e45f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java @@ -31,7 +31,6 @@ import org.apache.log4j.Logger; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueFactory; import org.apache.qpid.server.store.AbstractDurableConfiguredObjectRecoverer; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.UnresolvedDependency; @@ -41,13 +40,10 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQ { private static final Logger _logger = Logger.getLogger(QueueRecoverer.class); private final VirtualHostImpl _virtualHost; - private final QueueFactory _queueFactory; - public QueueRecoverer(final VirtualHostImpl virtualHost, - final QueueFactory queueFactory) + public QueueRecoverer(final VirtualHostImpl virtualHost) { _virtualHost = virtualHost; - _queueFactory = queueFactory; } @Override @@ -110,7 +106,7 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQ Map<String, Object> attributes = new LinkedHashMap<String, Object>(_attributes); attributes.put(Queue.ID, _id); attributes.put(Queue.DURABLE, true); - _queue = _queueFactory.restoreQueue(attributes); + _queue = _virtualHost.restoreQueue(attributes); } return _queue; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index 91b1a9e408..afb65fa326 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -127,4 +127,7 @@ public interface VirtualHostImpl< X extends VirtualHostImpl<X,Q,E>, Q extends AM TaskExecutor getTaskExecutor(); EventLogger getEventLogger(); + + // TODO - remove + public AMQQueue restoreQueue(Map<String, Object> attributes); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java index 8cbf999dfb..95de8fd9e5 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java @@ -60,7 +60,6 @@ import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueFactory; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.ConfiguredObjectRecordImpl; import org.apache.qpid.server.store.DurableConfigurationRecoverer; @@ -83,7 +82,6 @@ public class DurableConfigurationRecovererTest extends QpidTestCase private DurableConfigurationRecoverer _durableConfigurationRecoverer; private VirtualHostImpl _vhost; private DurableConfigurationStore _store; - private QueueFactory _queueFactory; private ConfiguredObjectFactory _configuredObjectFactory; private ConfiguredObjectTypeFactory _exchangeFactory; @@ -135,9 +133,7 @@ public class DurableConfigurationRecovererTest extends QpidTestCase final ArgumentCaptor<Map> attributesArg = ArgumentCaptor.forClass(Map.class); - _queueFactory = mock(QueueFactory.class); - - when(_queueFactory.restoreQueue(attributesArg.capture())).then( + when(_vhost.restoreQueue(attributesArg.capture())).then( new Answer() { @@ -184,7 +180,7 @@ public class DurableConfigurationRecovererTest extends QpidTestCase DurableConfiguredObjectRecoverer[] recoverers = { - new QueueRecoverer(_vhost, _queueFactory), + new QueueRecoverer(_vhost), new ExchangeRecoverer(_vhost), new BindingRecoverer(_vhost) }; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java index 85eede527a..a58a251fbe 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java @@ -652,6 +652,12 @@ public class MockVirtualHost implements VirtualHostImpl<MockVirtualHost, AMQQueu } @Override + public AMQQueue restoreQueue(final Map<String, Object> attributes) + { + return null; + } + + @Override public boolean getDefaultDeadLetterQueueEnabled() { return false; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java index 8ec78e952e..5313b416c0 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java @@ -18,158 +18,65 @@ * under the License. * */ -package org.apache.qpid.server.queue; +package org.apache.qpid.server.virtualhost; -import static org.mockito.Matchers.anyMap; -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.UUID; -import org.mockito.ArgumentCaptor; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.server.configuration.BrokerProperties; +import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.exchange.ExchangeImpl; -import org.apache.qpid.server.logging.EventLogger; +import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.plugin.ExchangeType; -import org.apache.qpid.server.security.SecurityManager; -import org.apache.qpid.server.store.DurableConfigurationStore; -import org.apache.qpid.server.util.MapValueConverter; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.PriorityQueue; +import org.apache.qpid.server.queue.PriorityQueueImpl; +import org.apache.qpid.server.queue.StandardQueueImpl; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.TestMemoryMessageStore; +import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.test.utils.QpidTestCase; -public class AMQQueueFactoryTest extends QpidTestCase +public class VirtualHostQueueCreationTest extends QpidTestCase { - private QueueRegistry _queueRegistry; private VirtualHostImpl _virtualHost; - private AMQQueueFactory _queueFactory; - private List<AMQQueue> _queues; + private Broker _broker; @Override public void setUp() throws Exception { super.setUp(); - _queues = new ArrayList<AMQQueue>(); - - _virtualHost = mock(VirtualHostImpl.class); - when(_virtualHost.getSecurityManager()).thenReturn(mock(SecurityManager.class)); - when(_virtualHost.getEventLogger()).thenReturn(new EventLogger()); - - DurableConfigurationStore store = mock(DurableConfigurationStore.class); - when(_virtualHost.getDurableConfigurationStore()).thenReturn(store); - - mockExchangeCreation(); - mockQueueRegistry(); - delegateVhostQueueCreation(); - - when(_virtualHost.getQueues()).thenReturn(_queues); + _broker = BrokerTestHelper.createBrokerMock(); + TaskExecutor taskExecutor = mock(TaskExecutor.class); + when(taskExecutor.isTaskExecutorThread()).thenReturn(true); + when(_broker.getTaskExecutor()).thenReturn(taskExecutor); - - _queueFactory = new AMQQueueFactory(_virtualHost, _queueRegistry); + _virtualHost = createHost(); + _virtualHost.open(); } - - private void delegateVhostQueueCreation() throws Exception - { - - final ArgumentCaptor<Map> attributes = ArgumentCaptor.forClass(Map.class); - - when(_virtualHost.createQueue(attributes.capture())).then( - new Answer<AMQQueue>() - { - @Override - public AMQQueue answer(InvocationOnMock invocation) throws Throwable - { - return _queueFactory.createQueue(attributes.getValue()); - } - } - ); - } - - private void mockQueueRegistry() - { - _queueRegistry = mock(QueueRegistry.class); - - final ArgumentCaptor<AMQQueue> capturedQueue = ArgumentCaptor.forClass(AMQQueue.class); - doAnswer(new Answer() - { - - @Override - public Object answer(final InvocationOnMock invocation) throws Throwable - { - AMQQueue queue = capturedQueue.getValue(); - when(_queueRegistry.getQueue(eq(queue.getId()))).thenReturn(queue); - when(_queueRegistry.getQueue(eq(queue.getName()))).thenReturn(queue); - when(_virtualHost.getQueue(eq(queue.getId()))).thenReturn(queue); - when(_virtualHost.getQueue(eq(queue.getName()))).thenReturn(queue); - _queues.add(queue); - - return null; - } - }).when(_queueRegistry).registerQueue(capturedQueue.capture()); - } - - private void mockExchangeCreation() throws Exception + private VirtualHostImpl createHost() { - final ArgumentCaptor<Map> attributes = ArgumentCaptor.forClass(Map.class); - - - when(_virtualHost.createExchange(attributes.capture())).then( - new Answer<ExchangeImpl>() - { - @Override - public ExchangeImpl answer(InvocationOnMock invocation) throws Throwable - { - Map attributeValues = attributes.getValue(); - final String name = MapValueConverter.getStringAttribute(org.apache.qpid.server.model.Exchange.NAME, attributeValues); - final UUID id = MapValueConverter.getUUIDAttribute(org.apache.qpid.server.model.Exchange.ID, attributeValues); - - final ExchangeImpl exchange = mock(ExchangeImpl.class); - ExchangeType exType = mock(ExchangeType.class); - - when(exchange.getName()).thenReturn(name); - when(exchange.getId()).thenReturn(id); - when(exchange.getExchangeType()).thenReturn(exType); - - final String typeName = MapValueConverter.getStringAttribute(org.apache.qpid.server.model.Exchange.TYPE, attributeValues); - when(exType.getType()).thenReturn(typeName); - when(exchange.getTypeName()).thenReturn(typeName); - - when(_virtualHost.getExchange(eq(name))).thenReturn(exchange); - when(_virtualHost.getExchange(eq(id))).thenReturn(exchange); - - final ArgumentCaptor<AMQQueue> queue = ArgumentCaptor.forClass(AMQQueue.class); - - when(exchange.addBinding(anyString(), queue.capture(), anyMap())).then(new Answer<Boolean>() - { - - @Override - public Boolean answer(InvocationOnMock invocation) throws Throwable - { - when(exchange.isBound(eq(queue.getValue()))).thenReturn(true); - return true; - } - }); - - return exchange; - } - } - ); + Map<String, Object> attributes = new HashMap<String, Object>(); + attributes.put(VirtualHost.NAME, getName()); + attributes.put(VirtualHost.TYPE, StandardVirtualHost.TYPE); + attributes.put(VirtualHost.MESSAGE_STORE_SETTINGS, Collections.singletonMap(MessageStore.STORE_TYPE, + TestMemoryMessageStore.TYPE)); + + attributes = new HashMap<String, Object>(attributes); + attributes.put(VirtualHost.ID, UUID.randomUUID()); + return new StandardVirtualHost(attributes, _broker); } @Override @@ -198,7 +105,7 @@ public class AMQQueueFactoryTest extends QpidTestCase attributes.put(PriorityQueue.PRIORITIES, 5); - AMQQueue queue = _queueFactory.createQueue(attributes); + AMQQueue queue = _virtualHost.createQueue(attributes); assertEquals("Queue not a priority queue", PriorityQueueImpl.class, queue.getClass()); verifyQueueRegistered("testPriorityQueue"); @@ -209,14 +116,14 @@ public class AMQQueueFactoryTest extends QpidTestCase public void testSimpleQueueRegistration() throws Exception { String queueName = getName(); - String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX; + String dlQueueName = queueName + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX; Map<String,Object> attributes = new HashMap<String, Object>(); attributes.put(Queue.ID, UUID.randomUUID()); attributes.put(Queue.NAME, queueName); - AMQQueue queue = _queueFactory.createQueue(attributes); + AMQQueue queue = _virtualHost.createQueue(attributes); assertEquals("Queue not a simple queue", StandardQueueImpl.class, queue.getClass()); verifyQueueRegistered(queueName); @@ -229,7 +136,7 @@ public class AMQQueueFactoryTest extends QpidTestCase } /** - * Tests that setting the {@link QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument true does + * Tests that setting the {@link org.apache.qpid.server.queue.QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument true does * cause the alternate exchange to be set and DLQ to be produced. */ public void testDeadLetterQueueEnabled() throws Exception @@ -237,7 +144,7 @@ public class AMQQueueFactoryTest extends QpidTestCase String queueName = "testDeadLetterQueueEnabled"; String dlExchangeName = queueName + VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX; - String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX; + String dlQueueName = queueName + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX; assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName)); assertNull("The alternate exchange should not yet exist", _virtualHost.getExchange(dlExchangeName)); @@ -248,7 +155,7 @@ public class AMQQueueFactoryTest extends QpidTestCase attributes.put(Queue.NAME, queueName); attributes.put(Queue.CREATE_DLQ_ON_CREATION, true); - AMQQueue queue = _queueFactory.createQueue(attributes); + AMQQueue queue = _virtualHost.createQueue(attributes); ExchangeImpl altExchange = queue.getAlternateExchange(); assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange); @@ -277,7 +184,7 @@ public class AMQQueueFactoryTest extends QpidTestCase String queueName = "testDeadLetterQueueEnabled"; String dlExchangeName = queueName + VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX; - String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX; + String dlQueueName = queueName + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX; assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName)); assertNull("The alternate exchange should not yet exist", _virtualHost.getExchange(dlExchangeName)); @@ -288,7 +195,7 @@ public class AMQQueueFactoryTest extends QpidTestCase attributes.put(Queue.CREATE_DLQ_ON_CREATION, true); attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 5); - AMQQueue queue = _queueFactory.createQueue(attributes); + AMQQueue queue = _virtualHost.createQueue(attributes); assertEquals("Unexpected maximum delivery count", 5, queue.getMaximumDeliveryAttempts()); ExchangeImpl altExchange = queue.getAlternateExchange(); @@ -310,7 +217,7 @@ public class AMQQueueFactoryTest extends QpidTestCase } /** - * Tests that setting the {@link QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument false does not + * Tests that setting the {@link org.apache.qpid.server.queue.QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument false does not * result in the alternate exchange being set and DLQ being created. */ public void testDeadLetterQueueDisabled() throws Exception @@ -320,7 +227,7 @@ public class AMQQueueFactoryTest extends QpidTestCase String queueName = "testDeadLetterQueueDisabled"; String dlExchangeName = queueName + VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX; - String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX; + String dlQueueName = queueName + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX; assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName)); assertNull("The alternate exchange should not exist", _virtualHost.getExchange(dlExchangeName)); @@ -329,7 +236,7 @@ public class AMQQueueFactoryTest extends QpidTestCase attributes.put(Queue.NAME, queueName); attributes.put(Queue.CREATE_DLQ_ON_CREATION, false); - AMQQueue queue = _queueFactory.createQueue(attributes); + AMQQueue queue = _virtualHost.createQueue(attributes); assertNull("Queue should not have an alternate exchange as DLQ is disabled", queue.getAlternateExchange()); assertNull("The alternate exchange should still not exist", _virtualHost.getExchange(dlExchangeName)); @@ -341,7 +248,7 @@ public class AMQQueueFactoryTest extends QpidTestCase } /** - * Tests that setting the {@link QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument true but + * Tests that setting the {@link org.apache.qpid.server.queue.QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument true but * creating an auto-delete queue, does not result in the alternate exchange * being set and DLQ being created. */ @@ -350,7 +257,7 @@ public class AMQQueueFactoryTest extends QpidTestCase String queueName = "testDeadLetterQueueNotCreatedForAutodeleteQueues"; String dlExchangeName = queueName + VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX; - String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX; + String dlQueueName = queueName + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX; assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName)); assertNull("The alternate exchange should not exist", _virtualHost.getExchange(dlExchangeName)); @@ -363,7 +270,7 @@ public class AMQQueueFactoryTest extends QpidTestCase attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS); //create an autodelete queue - AMQQueue queue = _queueFactory.createQueue(attributes); + AMQQueue queue = _virtualHost.createQueue(attributes); assertEquals("Queue should be autodelete", LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS, queue.getLifetimePolicy()); @@ -378,7 +285,7 @@ public class AMQQueueFactoryTest extends QpidTestCase } /** - * Tests that setting the {@link QueueArgumentsConverter#X_QPID_MAXIMUM_DELIVERY_COUNT} argument has + * Tests that setting the {@link org.apache.qpid.server.queue.QueueArgumentsConverter#X_QPID_MAXIMUM_DELIVERY_COUNT} argument has * the desired effect. */ public void testMaximumDeliveryCount() throws Exception @@ -389,7 +296,7 @@ public class AMQQueueFactoryTest extends QpidTestCase attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, (Object) 5); - final AMQQueue queue = _queueFactory.createQueue(attributes); + final AMQQueue queue = _virtualHost.createQueue(attributes); assertNotNull("The queue was not registered as expected ", queue); assertEquals("Maximum delivery count not as expected", 5, queue.getMaximumDeliveryAttempts()); @@ -398,7 +305,7 @@ public class AMQQueueFactoryTest extends QpidTestCase } /** - * Tests that omitting the {@link QueueArgumentsConverter#X_QPID_MAXIMUM_DELIVERY_COUNT} argument means + * Tests that omitting the {@link org.apache.qpid.server.queue.QueueArgumentsConverter#X_QPID_MAXIMUM_DELIVERY_COUNT} argument means * that queue is created with a default maximumDeliveryCount of zero (unless set in config). */ public void testMaximumDeliveryCountDefault() throws Exception @@ -407,7 +314,7 @@ public class AMQQueueFactoryTest extends QpidTestCase attributes.put(Queue.ID, UUID.randomUUID()); attributes.put(Queue.NAME, "testMaximumDeliveryCountDefault"); - final AMQQueue queue = _queueFactory.createQueue(attributes); + final AMQQueue queue = _virtualHost.createQueue(attributes); assertNotNull("The queue was not registered as expected ", queue); assertEquals("Maximum delivery count not as expected", 0, queue.getMaximumDeliveryAttempts()); @@ -425,7 +332,7 @@ public class AMQQueueFactoryTest extends QpidTestCase Map<String,Object> attributes = new HashMap<String, Object>(); attributes.put(Queue.ID, UUID.randomUUID()); - _queueFactory.createQueue(attributes); + _virtualHost.createQueue(attributes); fail("queue with null name can not be created!"); } catch (Exception e) @@ -454,7 +361,7 @@ public class AMQQueueFactoryTest extends QpidTestCase attributes.put(Queue.CREATE_DLQ_ON_CREATION, true); - _queueFactory.createQueue(attributes); + _virtualHost.createQueue(attributes); fail("queue with DLQ name having more than 255 characters can not be created!"); } catch (Exception e) @@ -484,7 +391,7 @@ public class AMQQueueFactoryTest extends QpidTestCase attributes.put(Queue.CREATE_DLQ_ON_CREATION, (Object) true); - _queueFactory.createQueue(attributes); + _virtualHost.createQueue(attributes); fail("queue with DLE name having more than 255 characters can not be created!"); } catch (Exception e) @@ -504,7 +411,7 @@ public class AMQQueueFactoryTest extends QpidTestCase attributes.put(Queue.MESSAGE_GROUP_KEY,"mykey"); attributes.put(Queue.MESSAGE_GROUP_SHARED_GROUPS, true); - AMQQueue queue = _queueFactory.createQueue(attributes); + AMQQueue queue = _virtualHost.createQueue(attributes); assertEquals("mykey", queue.getAttribute(Queue.MESSAGE_GROUP_KEY)); assertEquals(Boolean.TRUE, queue.getAttribute(Queue.MESSAGE_GROUP_SHARED_GROUPS)); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java index bf36fcbd2e..5e1e38106a 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java @@ -45,7 +45,7 @@ import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.RejectBehaviour; import org.apache.qpid.configuration.ClientProperties; -import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.virtualhost.AbstractVirtualHost; import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.apache.qpid.test.utils.TestBrokerConfiguration; @@ -305,11 +305,11 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase AMQDestination checkQueueDLQ; if(durableSub) { - checkQueueDLQ = new AMQQueue("amq.topic", "clientid" + ":" + getName() + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + checkQueueDLQ = new AMQQueue("amq.topic", "clientid" + ":" + getName() + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX); } else { - checkQueueDLQ = new AMQQueue("amq.direct", getTestQueueName() + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + checkQueueDLQ = new AMQQueue("amq.direct", getTestQueueName() + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX); } assertEquals("The DLQ should have " + expected + " msgs on it", expected, @@ -323,12 +323,12 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase MessageConsumer consumer; if(durableSub) { - consumer = clientSession.createConsumer(clientSession.createQueue("clientid:" +getName() + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX)); + consumer = clientSession.createConsumer(clientSession.createQueue("clientid:" +getName() + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX)); } else { consumer = clientSession.createConsumer( - clientSession.createQueue(destName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX)); + clientSession.createQueue(destName + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX)); } //keep track of the message we expect to still be on the DLQ |
