From 36c6512134a729f2f7abb1fa6469a63b743dad1b Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Thu, 17 Apr 2014 16:17:46 +0000 Subject: QPID-5710 : [Java Broker] Remove AMQQueueFactory and QueueRegistry git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1588301 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/store/berkeleydb/BDBHAVirtualHost.java | 1 - .../apache/qpid/server/queue/AMQQueueFactory.java | 274 ----------- .../apache/qpid/server/queue/AbstractQueue.java | 11 +- .../qpid/server/queue/LastValueQueueImpl.java | 2 +- .../qpid/server/queue/PriorityQueueImpl.java | 2 +- .../org/apache/qpid/server/queue/QueueFactory.java | 33 -- .../apache/qpid/server/queue/SortedQueueImpl.java | 2 +- .../server/virtualhost/AbstractVirtualHost.java | 320 ++++++++++--- .../qpid/server/virtualhost/QueueRecoverer.java | 8 +- .../qpid/server/virtualhost/VirtualHostImpl.java | 3 + .../qpid/server/queue/AMQQueueFactoryTest.java | 523 --------------------- .../DurableConfigurationRecovererTest.java | 8 +- .../qpid/server/virtualhost/MockVirtualHost.java | 6 + .../virtualhost/VirtualHostQueueCreationTest.java | 430 +++++++++++++++++ .../test/unit/client/MaxDeliveryCountTest.java | 10 +- 15 files changed, 715 insertions(+), 918 deletions(-) delete mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java delete mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java delete mode 100644 qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java create mode 100644 qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java (limited to 'qpid/java') diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java index de17acabab..97d6355fa4 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java @@ -163,7 +163,6 @@ public class BDBHAVirtualHost extends AbstractVirtualHost getConnectionRegistry().close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT); removeHouseKeepingTasks(); - getQueueRegistry().close(); getDtxRegistry().close(); finalState = VirtualHostState.PASSIVE; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java deleted file mode 100644 index 9a051be324..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ /dev/null @@ -1,274 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; - -import org.apache.qpid.server.exchange.AMQUnknownExchangeType; -import org.apache.qpid.server.exchange.ExchangeImpl; -import org.apache.qpid.server.model.LifetimePolicy; -import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.server.configuration.BrokerProperties; -import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.store.DurableConfigurationStoreHelper; -import org.apache.qpid.server.util.ConnectionScopedRuntimeException; -import org.apache.qpid.server.util.MapValueConverter; -import org.apache.qpid.server.util.ServerScopedRuntimeException; -import org.apache.qpid.server.virtualhost.ExchangeExistsException; -import org.apache.qpid.server.virtualhost.ReservedExchangeNameException; -import org.apache.qpid.server.virtualhost.UnknownExchangeException; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import org.apache.qpid.server.virtualhost.QueueExistsException; - -public class AMQQueueFactory implements QueueFactory -{ - - - public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ"; - public static final String DLQ_ROUTING_KEY = "dlq"; - private static final int MAX_LENGTH = 255; - - private final VirtualHostImpl _virtualHost; - private final QueueRegistry _queueRegistry; - - public AMQQueueFactory(VirtualHostImpl virtualHost, QueueRegistry queueRegistry) - { - _virtualHost = virtualHost; - _queueRegistry = queueRegistry; - } - - @Override - public AMQQueue restoreQueue(Map attributes) - { - return createOrRestoreQueue(attributes, false); - - } - - @Override - public AMQQueue createQueue(Map attributes) - { - return createOrRestoreQueue(attributes, true); - } - - private AMQQueue createOrRestoreQueue(Map attributes, boolean createInStore) - { - String queueName = MapValueConverter.getStringAttribute(Queue.NAME,attributes); - boolean createDLQ = createInStore && shouldCreateDLQ(attributes, _virtualHost.getDefaultDeadLetterQueueEnabled()); - if (createDLQ) - { - validateDLNames(queueName); - } - - AMQQueue queue; - - if(attributes.containsKey(SortedQueue.SORT_KEY)) - { - queue = new SortedQueueImpl(_virtualHost, attributes); - } - else if(attributes.containsKey(LastValueQueue.LVQ_KEY)) - { - queue = new LastValueQueueImpl(_virtualHost, attributes); - } - else if(attributes.containsKey(PriorityQueue.PRIORITIES)) - { - queue = new PriorityQueueImpl(_virtualHost, attributes); - } - else - { - queue = new StandardQueueImpl(_virtualHost, attributes); - } - queue.open(); - //Register the new queue - _queueRegistry.registerQueue(queue); - - if(createDLQ) - { - createDLQ(queue); - } - else if(attributes != null && attributes.get(Queue.ALTERNATE_EXCHANGE) instanceof String) - { - - final String altExchangeAttr = (String) attributes.get(Queue.ALTERNATE_EXCHANGE); - ExchangeImpl altExchange; - try - { - altExchange = _virtualHost.getExchange(UUID.fromString(altExchangeAttr)); - } - catch(IllegalArgumentException e) - { - altExchange = _virtualHost.getExchange(altExchangeAttr); - } - queue.setAlternateExchange(altExchange); - } - - if (createInStore && queue.isDurable() && !(queue.getLifetimePolicy() - == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE - || queue.getLifetimePolicy() - == LifetimePolicy.DELETE_ON_SESSION_END)) - { - DurableConfigurationStoreHelper.createQueue(_virtualHost.getDurableConfigurationStore(), queue); - } - - return queue; - } - - private void createDLQ(final AMQQueue queue) - { - final String queueName = queue.getName(); - final String dlExchangeName = getDeadLetterExchangeName(queueName); - final String dlQueueName = getDeadLetterQueueName(queueName); - - ExchangeImpl dlExchange = null; - final UUID dlExchangeId = UUIDGenerator.generateExchangeUUID(dlExchangeName, _virtualHost.getName()); - - try - { - Map attributes = new HashMap(); - - attributes.put(org.apache.qpid.server.model.Exchange.ID, dlExchangeId); - attributes.put(org.apache.qpid.server.model.Exchange.NAME, dlExchangeName); - attributes.put(org.apache.qpid.server.model.Exchange.TYPE, ExchangeDefaults.FANOUT_EXCHANGE_CLASS); - attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, true); - attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, - false ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT); - attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null); - dlExchange = _virtualHost.createExchange(attributes); - } - catch(ExchangeExistsException e) - { - // We're ok if the exchange already exists - dlExchange = e.getExistingExchange(); - } - catch (ReservedExchangeNameException e) - { - throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e); - } - catch (AMQUnknownExchangeType e) - { - throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e); - } - catch (UnknownExchangeException e) - { - throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e); - } - - AMQQueue dlQueue = null; - - synchronized(_queueRegistry) - { - dlQueue = _queueRegistry.getQueue(dlQueueName); - - if(dlQueue == null) - { - //set args to disable DLQ-ing/MDC from the DLQ itself, preventing loops etc - final Map args = new HashMap(); - args.put(Queue.CREATE_DLQ_ON_CREATION, false); - args.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 0); - - try - { - - - args.put(Queue.ID, UUID.randomUUID()); - args.put(Queue.NAME, dlQueueName); - args.put(Queue.DURABLE, true); - dlQueue = _virtualHost.createQueue(args); - } - catch (QueueExistsException e) - { - throw new ServerScopedRuntimeException("Attempt to create a queue failed because the " + - "queue already exists, however this occurred within " + - "a block where the queue existence had previously been " + - "checked, and no queue creation should have been " + - "possible from another thread", e); - } - } - } - - //ensure the queue is bound to the exchange - if(!dlExchange.isBound(DLQ_ROUTING_KEY, dlQueue)) - { - //actual routing key used does not matter due to use of fanout exchange, - //but we will make the key 'dlq' as it can be logged at creation. - dlExchange.addBinding(DLQ_ROUTING_KEY, dlQueue, null); - } - queue.setAlternateExchange(dlExchange); - } - - private static void validateDLNames(String name) - { - // check if DLQ name and DLQ exchange name do not exceed 255 - String exchangeName = getDeadLetterExchangeName(name); - if (exchangeName.length() > MAX_LENGTH) - { - throw new IllegalArgumentException("DL exchange name '" + exchangeName - + "' length exceeds limit of " + MAX_LENGTH + " characters for queue " + name); - } - String queueName = getDeadLetterQueueName(name); - if (queueName.length() > MAX_LENGTH) - { - throw new IllegalArgumentException("DLQ queue name '" + queueName + "' length exceeds limit of " - + MAX_LENGTH + " characters for queue " + name); - } - } - - private static boolean shouldCreateDLQ(Map arguments, boolean virtualHostDefaultDeadLetterQueueEnabled) - { - boolean autoDelete = MapValueConverter.getEnumAttribute(LifetimePolicy.class, - Queue.LIFETIME_POLICY, - arguments, - LifetimePolicy.PERMANENT) != LifetimePolicy.PERMANENT; - - //feature is not to be enabled for temporary queues or when explicitly disabled by argument - if (!(autoDelete || (arguments != null && arguments.containsKey(Queue.ALTERNATE_EXCHANGE)))) - { - boolean dlqArgumentPresent = arguments != null - && arguments.containsKey(Queue.CREATE_DLQ_ON_CREATION); - if (dlqArgumentPresent) - { - boolean dlqEnabled = true; - if (dlqArgumentPresent) - { - Object argument = arguments.get(Queue.CREATE_DLQ_ON_CREATION); - dlqEnabled = (argument instanceof Boolean && ((Boolean)argument).booleanValue()) - || (argument instanceof String && Boolean.parseBoolean(argument.toString())); - } - return dlqEnabled; - } - return virtualHostDefaultDeadLetterQueueEnabled; - } - return false; - } - - private static String getDeadLetterQueueName(String name) - { - return name + System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, DEFAULT_DLQ_NAME_SUFFIX); - } - - private static String getDeadLetterExchangeName(String name) - { - return name + System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX); - } - -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index c015bd6d7c..f6a667e612 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -313,7 +313,16 @@ public abstract class AbstractQueue> _logSubject = new QueueLogSubject(this); - _virtualHost.getSecurityManager().authoriseCreateQueue(this); + try + { + + _virtualHost.getSecurityManager().authoriseCreateQueue(this); + } + catch(AccessControlException e) + { + deleted(); + throw e; + } Subject activeSubject = Subject.getSubject(AccessController.getContext()); Set sessionPrincipals = activeSubject == null ? Collections.emptySet() : activeSubject.getPrincipals(SessionPrincipal.class); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java index 9ee9125bcf..079f04f92f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java @@ -31,7 +31,7 @@ public class LastValueQueueImpl extends AbstractQueue implem public static final String DEFAULT_LVQ_KEY = "qpid.LVQ_key"; - protected LastValueQueueImpl(VirtualHostImpl virtualHost, + public LastValueQueueImpl(VirtualHostImpl virtualHost, Map attributes) { super(virtualHost, attributes, entryList(attributes)); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java index 4f73bea8e6..89d542aded 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java @@ -30,7 +30,7 @@ public class PriorityQueueImpl extends OutOfOrderQueue implem public static final int DEFAULT_PRIORITY_LEVELS = 10; - protected PriorityQueueImpl(VirtualHostImpl virtualHost, + public PriorityQueueImpl(VirtualHostImpl virtualHost, Map attributes) { super(virtualHost, attributes, entryList(attributes)); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java deleted file mode 100644 index 5b13c4c574..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import java.util.Map; - -import org.apache.qpid.server.protocol.AMQSessionModel; - -public interface QueueFactory -{ - AMQQueue createQueue(Map arguments); - - AMQQueue restoreQueue(Map arguments); - -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java index 3115f6f581..c495b09d4a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java @@ -44,7 +44,7 @@ public class SortedQueueImpl extends OutOfOrderQueue implements } - protected SortedQueueImpl(VirtualHostImpl virtualHost, + public SortedQueueImpl(VirtualHostImpl virtualHost, Map attributes) { this(virtualHost, diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index b4a54aa5cb..0484841aa9 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -43,6 +43,7 @@ import javax.security.auth.Subject; import org.apache.log4j.Logger; import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.connection.ConnectionRegistry; import org.apache.qpid.server.connection.IConnectionRegistry; @@ -69,13 +70,13 @@ import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.LastValueQueue; import org.apache.qpid.server.queue.LastValueQueueImpl; -import org.apache.qpid.server.queue.DefaultQueueRegistry; import org.apache.qpid.server.queue.PriorityQueue; -import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.queue.PriorityQueueImpl; import org.apache.qpid.server.queue.SortedQueue; +import org.apache.qpid.server.queue.SortedQueueImpl; +import org.apache.qpid.server.queue.StandardQueueImpl; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.access.Operation; import org.apache.qpid.server.stats.StatisticsCounter; @@ -91,12 +92,17 @@ import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; import org.apache.qpid.server.txn.DtxRegistry; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.util.MapValueConverter; public abstract class AbstractVirtualHost> extends AbstractConfiguredObject implements VirtualHostImpl, ExchangeImpl>, IConnectionRegistry.RegistryChangeListener, EventListener, VirtualHost, ExchangeImpl> { + public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ"; + public static final String DLQ_ROUTING_KEY = "dlq"; + private static final int MAX_LENGTH = 255; + private static final Logger _logger = Logger.getLogger(AbstractVirtualHost.class); private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5; @@ -107,12 +113,10 @@ public abstract class AbstractVirtualHost> exte private final Broker _broker; - private final QueueRegistry _queueRegistry; - private final ConnectionRegistry _connectionRegistry; private final DtxRegistry _dtxRegistry; - private final AMQQueueFactory _queueFactory; + private final SystemNodeRegistry _systemNodeRegistry = new SystemNodeRegistry(); private volatile VirtualHostState _state = VirtualHostState.INITIALISING; @@ -182,10 +186,6 @@ public abstract class AbstractVirtualHost> exte _connectionRegistry = new ConnectionRegistry(); _connectionRegistry.addRegistryChangeListener(this); - _queueRegistry = new DefaultQueueRegistry(this); - - _queueFactory = new AMQQueueFactory(this, _queueRegistry); - _defaultDestination = new DefaultDestination(this); } @@ -419,11 +419,7 @@ public abstract class AbstractVirtualHost> exte @Override public Collection getChildren(Class clazz) { - if(clazz == Queue.class) - { - return (Collection) getQueues(); - } - else if(clazz == Connection.class) + if(clazz == Connection.class) { return (Collection) getConnections(); } @@ -567,15 +563,10 @@ public abstract class AbstractVirtualHost> exte return _createTime; } - public QueueRegistry getQueueRegistry() - { - return _queueRegistry; - } - @Override public AMQQueue getQueue(String name) { - return _queueRegistry.getQueue(name); + return (AMQQueue) getChildByName(Queue.class, name); } @Override @@ -588,34 +579,31 @@ public abstract class AbstractVirtualHost> exte @Override public AMQQueue getQueue(UUID id) { - return _queueRegistry.getQueue(id); + return (AMQQueue) getChildById(Queue.class, id); } @Override public Collection> getQueues() { - return _queueRegistry.getQueues(); + Collection children = getChildren(Queue.class); + return children; } @Override public int removeQueue(AMQQueue queue) { - synchronized (getQueueRegistry()) - { - int purged = queue.delete(); + int purged = queue.delete(); - getQueueRegistry().unregisterQueue(queue.getName()); - if (queue.isDurable() && !(queue.getLifetimePolicy() - == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE - || queue.getLifetimePolicy() - == LifetimePolicy.DELETE_ON_SESSION_END)) - { - DurableConfigurationStore store = getDurableConfigurationStore(); - DurableConfigurationStoreHelper.removeQueue(store, queue); - } - return purged; + if (queue.isDurable() && !(queue.getLifetimePolicy() + == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE + || queue.getLifetimePolicy() + == LifetimePolicy.DELETE_ON_SESSION_END)) + { + DurableConfigurationStore store = getDurableConfigurationStore(); + DurableConfigurationStoreHelper.removeQueue(store, queue); } - } + return purged; +} public AMQQueue createQueue(Map attributes) throws QueueExistsException { @@ -656,36 +644,21 @@ public abstract class AbstractVirtualHost> exte } } - String queueName = MapValueConverter.getStringAttribute(Queue.NAME, attributes); - - synchronized (_queueRegistry) + if(!attributes.containsKey(Queue.ID)) { - if(_queueRegistry.getQueue(queueName) != null) - { - throw new QueueExistsException("Queue with name " + queueName + " already exists", _queueRegistry.getQueue(queueName)); - } - if(!attributes.containsKey(Queue.ID)) - { - - UUID id = UUID.randomUUID(); - while(_queueRegistry.getQueue(id) != null) - { - id = UUID.randomUUID(); - } - attributes.put(Queue.ID, id); + UUID id = UUID.randomUUID(); + attributes.put(Queue.ID, id); + } - } - else if(_queueRegistry.getQueue(MapValueConverter.getUUIDAttribute(Queue.ID, attributes)) != null) - { - throw new QueueExistsException("Queue with id " - + MapValueConverter.getUUIDAttribute(Queue.ID, - attributes) - + " already exists", _queueRegistry.getQueue(queueName)); - } + boolean createDLQ = shouldCreateDLQ(attributes, getDefaultDeadLetterQueueEnabled()); + if (createDLQ) + { + // TODO - this isn't really correct - what if the name has ${foo} in it? + validateDLNames(String.valueOf(attributes.get(Queue.NAME))); + } + return createOrRestoreQueue(attributes, true); - return _queueFactory.createQueue(attributes); - } } @@ -873,7 +846,6 @@ public abstract class AbstractVirtualHost> exte { //Stop Connections _connectionRegistry.close(); - _queueRegistry.close(); _dtxRegistry.close(); closeStorage(); shutdownHouseKeeping(); @@ -1127,7 +1099,7 @@ public abstract class AbstractVirtualHost> exte protected Map getDurableConfigurationRecoverers() { DurableConfiguredObjectRecoverer[] recoverers = { - new QueueRecoverer(this, _queueFactory), + new QueueRecoverer(this), new ExchangeRecoverer(this), new BindingRecoverer(this) }; @@ -1178,7 +1150,7 @@ public abstract class AbstractVirtualHost> exte public void execute() { - for (AMQQueue q : _queueRegistry.getQueues()) + for (AMQQueue q : getQueues()) { if (_logger.isDebugEnabled()) { @@ -1552,4 +1524,220 @@ public abstract class AbstractVirtualHost> exte return Collections.unmodifiableCollection(_aliases); } + + // TODO - remove + public AMQQueue restoreQueue(Map attributes) + { + return createOrRestoreQueue(attributes, false); + + } + + + private AMQQueue createOrRestoreQueue(Map attributes, boolean createInStore) + { + String queueName = MapValueConverter.getStringAttribute(Queue.NAME,attributes); + boolean createDLQ = createInStore && shouldCreateDLQ(attributes, getDefaultDeadLetterQueueEnabled()); + if (createDLQ) + { + validateDLNames(queueName); + } + + AMQQueue queue; + + try + { + + + if (attributes.containsKey(SortedQueue.SORT_KEY)) + { + queue = new SortedQueueImpl(this, attributes); + } + else if (attributes.containsKey(LastValueQueue.LVQ_KEY)) + { + queue = new LastValueQueueImpl(this, attributes); + } + else if (attributes.containsKey(PriorityQueue.PRIORITIES)) + { + queue = new PriorityQueueImpl(this, attributes); + } + else + { + queue = new StandardQueueImpl(this, attributes); + } + queue.open(); + } + catch(DuplicateNameException e) + { + + throw new QueueExistsException(e.getName(), getQueue(e.getName())); + } + + if(createDLQ) + { + createDLQ(queue); + } + else if(attributes != null && attributes.get(Queue.ALTERNATE_EXCHANGE) instanceof String) + { + + final String altExchangeAttr = (String) attributes.get(Queue.ALTERNATE_EXCHANGE); + ExchangeImpl altExchange; + try + { + altExchange = getExchange(UUID.fromString(altExchangeAttr)); + } + catch(IllegalArgumentException e) + { + altExchange = getExchange(altExchangeAttr); + } + queue.setAlternateExchange(altExchange); + } + + if (createInStore && queue.isDurable() && !(queue.getLifetimePolicy() + == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE + || queue.getLifetimePolicy() + == LifetimePolicy.DELETE_ON_SESSION_END)) + { + DurableConfigurationStoreHelper.createQueue(getDurableConfigurationStore(), queue); + } + + return queue; + } + + + + private void createDLQ(final AMQQueue queue) + { + final String queueName = queue.getName(); + final String dlExchangeName = getDeadLetterExchangeName(queueName); + final String dlQueueName = getDeadLetterQueueName(queueName); + + ExchangeImpl dlExchange = null; + final UUID dlExchangeId = UUIDGenerator.generateExchangeUUID(dlExchangeName, getName()); + + try + { + Map attributes = new HashMap(); + + attributes.put(org.apache.qpid.server.model.Exchange.ID, dlExchangeId); + attributes.put(org.apache.qpid.server.model.Exchange.NAME, dlExchangeName); + attributes.put(org.apache.qpid.server.model.Exchange.TYPE, ExchangeDefaults.FANOUT_EXCHANGE_CLASS); + attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, true); + attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, + false ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT); + attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null); + dlExchange = createExchange(attributes); + } + catch(ExchangeExistsException e) + { + // We're ok if the exchange already exists + dlExchange = e.getExistingExchange(); + } + catch (ReservedExchangeNameException e) + { + throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e); + } + catch (AMQUnknownExchangeType e) + { + throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e); + } + catch (UnknownExchangeException e) + { + throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e); + } + + AMQQueue dlQueue = null; + + { + dlQueue = getQueue(dlQueueName); + + if(dlQueue == null) + { + //set args to disable DLQ-ing/MDC from the DLQ itself, preventing loops etc + final Map args = new HashMap(); + args.put(Queue.CREATE_DLQ_ON_CREATION, false); + args.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 0); + + try + { + + + args.put(Queue.ID, UUID.randomUUID()); + args.put(Queue.NAME, dlQueueName); + args.put(Queue.DURABLE, true); + dlQueue = createQueue(args); + } + catch (QueueExistsException e) + { + // TODO - currently theoretically for two threads to be creating a queue at the same time. + // All model changing operations should be moved to the task executor of the virtual host + } + } + } + + //ensure the queue is bound to the exchange + if(!dlExchange.isBound(AbstractVirtualHost.DLQ_ROUTING_KEY, dlQueue)) + { + //actual routing key used does not matter due to use of fanout exchange, + //but we will make the key 'dlq' as it can be logged at creation. + dlExchange.addBinding(AbstractVirtualHost.DLQ_ROUTING_KEY, dlQueue, null); + } + queue.setAlternateExchange(dlExchange); + } + + private static void validateDLNames(String name) + { + // check if DLQ name and DLQ exchange name do not exceed 255 + String exchangeName = getDeadLetterExchangeName(name); + if (exchangeName.length() > MAX_LENGTH) + { + throw new IllegalArgumentException("DL exchange name '" + exchangeName + + "' length exceeds limit of " + MAX_LENGTH + " characters for queue " + name); + } + String queueName = getDeadLetterQueueName(name); + if (queueName.length() > MAX_LENGTH) + { + throw new IllegalArgumentException("DLQ queue name '" + queueName + "' length exceeds limit of " + + MAX_LENGTH + " characters for queue " + name); + } + } + + private static boolean shouldCreateDLQ(Map arguments, boolean virtualHostDefaultDeadLetterQueueEnabled) + { + boolean autoDelete = MapValueConverter.getEnumAttribute(LifetimePolicy.class, + Queue.LIFETIME_POLICY, + arguments, + LifetimePolicy.PERMANENT) != LifetimePolicy.PERMANENT; + + //feature is not to be enabled for temporary queues or when explicitly disabled by argument + if (!(autoDelete || (arguments != null && arguments.containsKey(Queue.ALTERNATE_EXCHANGE)))) + { + boolean dlqArgumentPresent = arguments != null + && arguments.containsKey(Queue.CREATE_DLQ_ON_CREATION); + if (dlqArgumentPresent) + { + boolean dlqEnabled = true; + if (dlqArgumentPresent) + { + Object argument = arguments.get(Queue.CREATE_DLQ_ON_CREATION); + dlqEnabled = (argument instanceof Boolean && ((Boolean)argument).booleanValue()) + || (argument instanceof String && Boolean.parseBoolean(argument.toString())); + } + return dlqEnabled; + } + return virtualHostDefaultDeadLetterQueueEnabled; + } + return false; + } + + private static String getDeadLetterQueueName(String name) + { + return name + System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX); + } + + private static String getDeadLetterExchangeName(String name) + { + return name + System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX); + } + + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java index 788ae8ac9f..1841f1e45f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java @@ -31,7 +31,6 @@ import org.apache.log4j.Logger; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueFactory; import org.apache.qpid.server.store.AbstractDurableConfiguredObjectRecoverer; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.UnresolvedDependency; @@ -41,13 +40,10 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer attributes = new LinkedHashMap(_attributes); attributes.put(Queue.ID, _id); attributes.put(Queue.DURABLE, true); - _queue = _queueFactory.restoreQueue(attributes); + _queue = _virtualHost.restoreQueue(attributes); } return _queue; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index 91b1a9e408..afb65fa326 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -127,4 +127,7 @@ public interface VirtualHostImpl< X extends VirtualHostImpl, Q extends AM TaskExecutor getTaskExecutor(); EventLogger getEventLogger(); + + // TODO - remove + public AMQQueue restoreQueue(Map attributes); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java deleted file mode 100644 index 8ec78e952e..0000000000 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java +++ /dev/null @@ -1,523 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import static org.mockito.Matchers.anyMap; -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; - -import org.mockito.ArgumentCaptor; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.server.configuration.BrokerProperties; -import org.apache.qpid.server.exchange.ExchangeImpl; -import org.apache.qpid.server.logging.EventLogger; -import org.apache.qpid.server.model.LifetimePolicy; -import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.plugin.ExchangeType; -import org.apache.qpid.server.security.SecurityManager; -import org.apache.qpid.server.store.DurableConfigurationStore; -import org.apache.qpid.server.util.MapValueConverter; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import org.apache.qpid.test.utils.QpidTestCase; - -public class AMQQueueFactoryTest extends QpidTestCase -{ - private QueueRegistry _queueRegistry; - private VirtualHostImpl _virtualHost; - private AMQQueueFactory _queueFactory; - private List _queues; - - @Override - public void setUp() throws Exception - { - super.setUp(); - - _queues = new ArrayList(); - - _virtualHost = mock(VirtualHostImpl.class); - when(_virtualHost.getSecurityManager()).thenReturn(mock(SecurityManager.class)); - when(_virtualHost.getEventLogger()).thenReturn(new EventLogger()); - - DurableConfigurationStore store = mock(DurableConfigurationStore.class); - when(_virtualHost.getDurableConfigurationStore()).thenReturn(store); - - mockExchangeCreation(); - mockQueueRegistry(); - delegateVhostQueueCreation(); - - when(_virtualHost.getQueues()).thenReturn(_queues); - - - _queueFactory = new AMQQueueFactory(_virtualHost, _queueRegistry); - - - - } - - private void delegateVhostQueueCreation() throws Exception - { - - final ArgumentCaptor attributes = ArgumentCaptor.forClass(Map.class); - - when(_virtualHost.createQueue(attributes.capture())).then( - new Answer() - { - @Override - public AMQQueue answer(InvocationOnMock invocation) throws Throwable - { - return _queueFactory.createQueue(attributes.getValue()); - } - } - ); - } - - private void mockQueueRegistry() - { - _queueRegistry = mock(QueueRegistry.class); - - final ArgumentCaptor capturedQueue = ArgumentCaptor.forClass(AMQQueue.class); - doAnswer(new Answer() - { - - @Override - public Object answer(final InvocationOnMock invocation) throws Throwable - { - AMQQueue queue = capturedQueue.getValue(); - when(_queueRegistry.getQueue(eq(queue.getId()))).thenReturn(queue); - when(_queueRegistry.getQueue(eq(queue.getName()))).thenReturn(queue); - when(_virtualHost.getQueue(eq(queue.getId()))).thenReturn(queue); - when(_virtualHost.getQueue(eq(queue.getName()))).thenReturn(queue); - _queues.add(queue); - - return null; - } - }).when(_queueRegistry).registerQueue(capturedQueue.capture()); - } - - private void mockExchangeCreation() throws Exception - { - final ArgumentCaptor attributes = ArgumentCaptor.forClass(Map.class); - - - when(_virtualHost.createExchange(attributes.capture())).then( - new Answer() - { - @Override - public ExchangeImpl answer(InvocationOnMock invocation) throws Throwable - { - Map attributeValues = attributes.getValue(); - final String name = MapValueConverter.getStringAttribute(org.apache.qpid.server.model.Exchange.NAME, attributeValues); - final UUID id = MapValueConverter.getUUIDAttribute(org.apache.qpid.server.model.Exchange.ID, attributeValues); - - final ExchangeImpl exchange = mock(ExchangeImpl.class); - ExchangeType exType = mock(ExchangeType.class); - - when(exchange.getName()).thenReturn(name); - when(exchange.getId()).thenReturn(id); - when(exchange.getExchangeType()).thenReturn(exType); - - final String typeName = MapValueConverter.getStringAttribute(org.apache.qpid.server.model.Exchange.TYPE, attributeValues); - when(exType.getType()).thenReturn(typeName); - when(exchange.getTypeName()).thenReturn(typeName); - - when(_virtualHost.getExchange(eq(name))).thenReturn(exchange); - when(_virtualHost.getExchange(eq(id))).thenReturn(exchange); - - final ArgumentCaptor queue = ArgumentCaptor.forClass(AMQQueue.class); - - when(exchange.addBinding(anyString(), queue.capture(), anyMap())).then(new Answer() - { - - @Override - public Boolean answer(InvocationOnMock invocation) throws Throwable - { - when(exchange.isBound(eq(queue.getValue()))).thenReturn(true); - return true; - } - }); - - return exchange; - } - } - ); - } - - @Override - public void tearDown() throws Exception - { - super.tearDown(); - } - - private void verifyRegisteredQueueCount(int count) - { - assertEquals("Queue was not registered in virtualhost", count, _virtualHost.getQueues().size()); - } - - - private void verifyQueueRegistered(String queueName) - { - assertNotNull("Queue " + queueName + " was not created", _virtualHost.getQueue(queueName)); - } - - public void testPriorityQueueRegistration() throws Exception - { - Map attributes = new HashMap(); - attributes.put(Queue.ID, UUID.randomUUID()); - attributes.put(Queue.NAME, "testPriorityQueue"); - - attributes.put(PriorityQueue.PRIORITIES, 5); - - - AMQQueue queue = _queueFactory.createQueue(attributes); - - assertEquals("Queue not a priority queue", PriorityQueueImpl.class, queue.getClass()); - verifyQueueRegistered("testPriorityQueue"); - verifyRegisteredQueueCount(1); - } - - - public void testSimpleQueueRegistration() throws Exception - { - String queueName = getName(); - String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX; - - Map attributes = new HashMap(); - attributes.put(Queue.ID, UUID.randomUUID()); - attributes.put(Queue.NAME, queueName); - - - AMQQueue queue = _queueFactory.createQueue(attributes); - assertEquals("Queue not a simple queue", StandardQueueImpl.class, queue.getClass()); - verifyQueueRegistered(queueName); - - //verify that no alternate exchange or DLQ were produced - - assertNull("Queue should not have an alternate exchange as DLQ wasn't enabled", queue.getAlternateExchange()); - assertNull("The DLQ should not exist", _virtualHost.getQueue(dlQueueName)); - - verifyRegisteredQueueCount(1); - } - - /** - * Tests that setting the {@link QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument true does - * cause the alternate exchange to be set and DLQ to be produced. - */ - public void testDeadLetterQueueEnabled() throws Exception - { - - String queueName = "testDeadLetterQueueEnabled"; - String dlExchangeName = queueName + VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX; - String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX; - - assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName)); - assertNull("The alternate exchange should not yet exist", _virtualHost.getExchange(dlExchangeName)); - - Map attributes = new HashMap(); - - attributes.put(Queue.ID, UUID.randomUUID()); - attributes.put(Queue.NAME, queueName); - attributes.put(Queue.CREATE_DLQ_ON_CREATION, true); - - AMQQueue queue = _queueFactory.createQueue(attributes); - - ExchangeImpl altExchange = queue.getAlternateExchange(); - assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange); - assertEquals("Alternate exchange name was not as expected", dlExchangeName, altExchange.getName()); - assertEquals("Alternate exchange type was not as expected", ExchangeDefaults.FANOUT_EXCHANGE_CLASS, altExchange.getTypeName()); - - assertNotNull("The alternate exchange was not registered as expected", _virtualHost.getExchange(dlExchangeName)); - assertEquals("The registered exchange was not the expected exchange instance", altExchange, _virtualHost.getExchange(dlExchangeName)); - - AMQQueue dlQueue = _virtualHost.getQueue(dlQueueName); - assertNotNull("The DLQ was not registered as expected", dlQueue); - assertTrue("DLQ should have been bound to the alternate exchange", altExchange.isBound(dlQueue)); - assertNull("DLQ should have no alternate exchange", dlQueue.getAlternateExchange()); - assertEquals("DLQ should have a zero maximum delivery count", 0, dlQueue.getMaximumDeliveryAttempts()); - - //2 queues should have been registered - verifyRegisteredQueueCount(2); - } - - /** - * Tests that the deadLetterQueues/maximumDeliveryCount settings from the configuration - * are not applied to the DLQ itself. - */ - public void testDeadLetterQueueDoesNotInheritDLQorMDCSettings() throws Exception - { - - String queueName = "testDeadLetterQueueEnabled"; - String dlExchangeName = queueName + VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX; - String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX; - - assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName)); - assertNull("The alternate exchange should not yet exist", _virtualHost.getExchange(dlExchangeName)); - - Map attributes = new HashMap(); - attributes.put(Queue.ID, UUID.randomUUID()); - attributes.put(Queue.NAME, queueName); - attributes.put(Queue.CREATE_DLQ_ON_CREATION, true); - attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 5); - - AMQQueue queue = _queueFactory.createQueue(attributes); - - assertEquals("Unexpected maximum delivery count", 5, queue.getMaximumDeliveryAttempts()); - ExchangeImpl altExchange = queue.getAlternateExchange(); - assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange); - assertEquals("Alternate exchange name was not as expected", dlExchangeName, altExchange.getName()); - assertEquals("Alternate exchange type was not as expected", ExchangeDefaults.FANOUT_EXCHANGE_CLASS, altExchange.getTypeName()); - - assertNotNull("The alternate exchange was not registered as expected", _virtualHost.getExchange(dlExchangeName)); - assertEquals("The registered exchange was not the expected exchange instance", altExchange, _virtualHost.getExchange(dlExchangeName)); - - AMQQueue dlQueue = _virtualHost.getQueue(dlQueueName); - assertNotNull("The DLQ was not registered as expected", dlQueue); - assertTrue("DLQ should have been bound to the alternate exchange", altExchange.isBound(dlQueue)); - assertNull("DLQ should have no alternate exchange", dlQueue.getAlternateExchange()); - assertEquals("DLQ should have a zero maximum delivery count", 0, dlQueue.getMaximumDeliveryAttempts()); - - //2 queues should have been registered - verifyRegisteredQueueCount(2); - } - - /** - * Tests that setting the {@link QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument false does not - * result in the alternate exchange being set and DLQ being created. - */ - public void testDeadLetterQueueDisabled() throws Exception - { - Map attributes = new HashMap(); - - - String queueName = "testDeadLetterQueueDisabled"; - String dlExchangeName = queueName + VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX; - String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX; - - assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName)); - assertNull("The alternate exchange should not exist", _virtualHost.getExchange(dlExchangeName)); - - attributes.put(Queue.ID, UUID.randomUUID()); - attributes.put(Queue.NAME, queueName); - attributes.put(Queue.CREATE_DLQ_ON_CREATION, false); - - AMQQueue queue = _queueFactory.createQueue(attributes); - - assertNull("Queue should not have an alternate exchange as DLQ is disabled", queue.getAlternateExchange()); - assertNull("The alternate exchange should still not exist", _virtualHost.getExchange(dlExchangeName)); - - assertNull("The DLQ should still not exist", _virtualHost.getQueue(dlQueueName)); - - //only 1 queue should have been registered - verifyRegisteredQueueCount(1); - } - - /** - * Tests that setting the {@link QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument true but - * creating an auto-delete queue, does not result in the alternate exchange - * being set and DLQ being created. - */ - public void testDeadLetterQueueNotCreatedForAutodeleteQueues() throws Exception - { - - String queueName = "testDeadLetterQueueNotCreatedForAutodeleteQueues"; - String dlExchangeName = queueName + VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX; - String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX; - - assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName)); - assertNull("The alternate exchange should not exist", _virtualHost.getExchange(dlExchangeName)); - - Map attributes = new HashMap(); - attributes.put(Queue.ID, UUID.randomUUID()); - attributes.put(Queue.NAME, queueName); - - attributes.put(Queue.CREATE_DLQ_ON_CREATION, true); - attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS); - - //create an autodelete queue - AMQQueue queue = _queueFactory.createQueue(attributes); - assertEquals("Queue should be autodelete", - LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS, - queue.getLifetimePolicy()); - - //ensure that the autodelete property overrides the request to enable DLQ - assertNull("Queue should not have an alternate exchange as queue is autodelete", queue.getAlternateExchange()); - assertNull("The alternate exchange should not exist as queue is autodelete", _virtualHost.getExchange(dlExchangeName)); - assertNull("The DLQ should not exist as queue is autodelete", _virtualHost.getQueue(dlQueueName)); - - //only 1 queue should have been registered - verifyRegisteredQueueCount(1); - } - - /** - * Tests that setting the {@link QueueArgumentsConverter#X_QPID_MAXIMUM_DELIVERY_COUNT} argument has - * the desired effect. - */ - public void testMaximumDeliveryCount() throws Exception - { - Map attributes = new HashMap(); - attributes.put(Queue.ID, UUID.randomUUID()); - attributes.put(Queue.NAME, "testMaximumDeliveryCount"); - - attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, (Object) 5); - - final AMQQueue queue = _queueFactory.createQueue(attributes); - - assertNotNull("The queue was not registered as expected ", queue); - assertEquals("Maximum delivery count not as expected", 5, queue.getMaximumDeliveryAttempts()); - - verifyRegisteredQueueCount(1); - } - - /** - * Tests that omitting the {@link QueueArgumentsConverter#X_QPID_MAXIMUM_DELIVERY_COUNT} argument means - * that queue is created with a default maximumDeliveryCount of zero (unless set in config). - */ - public void testMaximumDeliveryCountDefault() throws Exception - { - Map attributes = new HashMap(); - attributes.put(Queue.ID, UUID.randomUUID()); - attributes.put(Queue.NAME, "testMaximumDeliveryCountDefault"); - - final AMQQueue queue = _queueFactory.createQueue(attributes); - - assertNotNull("The queue was not registered as expected ", queue); - assertEquals("Maximum delivery count not as expected", 0, queue.getMaximumDeliveryAttempts()); - - verifyRegisteredQueueCount(1); - } - - /** - * Tests queue creation with queue name set to null - */ - public void testQueueNameNullValidation() - { - try - { - Map attributes = new HashMap(); - attributes.put(Queue.ID, UUID.randomUUID()); - - _queueFactory.createQueue(attributes); - fail("queue with null name can not be created!"); - } - catch (Exception e) - { - assertTrue(e instanceof IllegalArgumentException); - assertEquals("Value for attribute name is not found", e.getMessage()); - } - } - - /** - * Tests queue creation with queue name length less 255 characters but - * corresponding DLQ name length greater than 255. - */ - public void testQueueNameWithLengthLessThan255ButDLQNameWithLengthGreaterThan255() - { - String queueName = "test-" + generateStringWithLength('a', 245); - try - { - // change DLQ name to make its length bigger than exchange name - setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, "_DLE"); - setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, "_DLQUEUE"); - - Map attributes = new HashMap(); - attributes.put(Queue.ID, UUID.randomUUID()); - attributes.put(Queue.NAME, queueName); - - attributes.put(Queue.CREATE_DLQ_ON_CREATION, true); - - _queueFactory.createQueue(attributes); - fail("queue with DLQ name having more than 255 characters can not be created!"); - } - catch (Exception e) - { - assertTrue("Unexpected exception is thrown!", e instanceof IllegalArgumentException); - assertTrue("Unexpected exception message!", e.getMessage().contains("DLQ queue name") - && e.getMessage().contains("length exceeds limit of 255")); - } - } - - /** - * Tests queue creation with queue name length less 255 characters but - * corresponding DL exchange name length greater than 255. - */ - public void testQueueNameWithLengthLessThan255ButDLExchangeNameWithLengthGreaterThan255() - { - String queueName = "test-" + generateStringWithLength('a', 245); - try - { - // change DLQ name to make its length bigger than exchange name - setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, "_DLEXCHANGE"); - setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, "_DLQ"); - - Map attributes = new HashMap(); - attributes.put(Queue.ID, UUID.randomUUID()); - attributes.put(Queue.NAME, queueName); - - attributes.put(Queue.CREATE_DLQ_ON_CREATION, (Object) true); - - _queueFactory.createQueue(attributes); - fail("queue with DLE name having more than 255 characters can not be created!"); - } - catch (Exception e) - { - assertTrue("Unexpected exception is thrown!", e instanceof IllegalArgumentException); - assertTrue("Unexpected exception message!", e.getMessage().contains("DL exchange name") - && e.getMessage().contains("length exceeds limit of 255")); - } - } - - public void testMessageGroupQueue() throws Exception - { - - Map attributes = new HashMap(); - attributes.put(Queue.ID, UUID.randomUUID()); - attributes.put(Queue.NAME, getTestName()); - attributes.put(Queue.MESSAGE_GROUP_KEY,"mykey"); - attributes.put(Queue.MESSAGE_GROUP_SHARED_GROUPS, true); - - AMQQueue queue = _queueFactory.createQueue(attributes); - assertEquals("mykey", queue.getAttribute(Queue.MESSAGE_GROUP_KEY)); - assertEquals(Boolean.TRUE, queue.getAttribute(Queue.MESSAGE_GROUP_SHARED_GROUPS)); - } - - private String generateStringWithLength(char ch, int length) - { - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < length; i++) - { - sb.append(ch); - } - return sb.toString(); - } - - -} diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java index 8cbf999dfb..95de8fd9e5 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java @@ -60,7 +60,6 @@ import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueFactory; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.ConfiguredObjectRecordImpl; import org.apache.qpid.server.store.DurableConfigurationRecoverer; @@ -83,7 +82,6 @@ public class DurableConfigurationRecovererTest extends QpidTestCase private DurableConfigurationRecoverer _durableConfigurationRecoverer; private VirtualHostImpl _vhost; private DurableConfigurationStore _store; - private QueueFactory _queueFactory; private ConfiguredObjectFactory _configuredObjectFactory; private ConfiguredObjectTypeFactory _exchangeFactory; @@ -135,9 +133,7 @@ public class DurableConfigurationRecovererTest extends QpidTestCase final ArgumentCaptor attributesArg = ArgumentCaptor.forClass(Map.class); - _queueFactory = mock(QueueFactory.class); - - when(_queueFactory.restoreQueue(attributesArg.capture())).then( + when(_vhost.restoreQueue(attributesArg.capture())).then( new Answer() { @@ -184,7 +180,7 @@ public class DurableConfigurationRecovererTest extends QpidTestCase DurableConfiguredObjectRecoverer[] recoverers = { - new QueueRecoverer(_vhost, _queueFactory), + new QueueRecoverer(_vhost), new ExchangeRecoverer(_vhost), new BindingRecoverer(_vhost) }; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java index 85eede527a..a58a251fbe 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java @@ -651,6 +651,12 @@ public class MockVirtualHost implements VirtualHostImpl attributes) + { + return null; + } + @Override public boolean getDefaultDeadLetterQueueEnabled() { diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java new file mode 100644 index 0000000000..5313b416c0 --- /dev/null +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java @@ -0,0 +1,430 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.server.configuration.BrokerProperties; +import org.apache.qpid.server.configuration.updater.TaskExecutor; +import org.apache.qpid.server.exchange.ExchangeImpl; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.PriorityQueue; +import org.apache.qpid.server.queue.PriorityQueueImpl; +import org.apache.qpid.server.queue.StandardQueueImpl; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.TestMemoryMessageStore; +import org.apache.qpid.server.util.BrokerTestHelper; +import org.apache.qpid.test.utils.QpidTestCase; + +public class VirtualHostQueueCreationTest extends QpidTestCase +{ + private VirtualHostImpl _virtualHost; + private Broker _broker; + + @Override + public void setUp() throws Exception + { + super.setUp(); + + _broker = BrokerTestHelper.createBrokerMock(); + TaskExecutor taskExecutor = mock(TaskExecutor.class); + when(taskExecutor.isTaskExecutorThread()).thenReturn(true); + when(_broker.getTaskExecutor()).thenReturn(taskExecutor); + + _virtualHost = createHost(); + _virtualHost.open(); + + + + } + private VirtualHostImpl createHost() + { + Map attributes = new HashMap(); + attributes.put(VirtualHost.NAME, getName()); + attributes.put(VirtualHost.TYPE, StandardVirtualHost.TYPE); + attributes.put(VirtualHost.MESSAGE_STORE_SETTINGS, Collections.singletonMap(MessageStore.STORE_TYPE, + TestMemoryMessageStore.TYPE)); + + attributes = new HashMap(attributes); + attributes.put(VirtualHost.ID, UUID.randomUUID()); + return new StandardVirtualHost(attributes, _broker); + } + + @Override + public void tearDown() throws Exception + { + super.tearDown(); + } + + private void verifyRegisteredQueueCount(int count) + { + assertEquals("Queue was not registered in virtualhost", count, _virtualHost.getQueues().size()); + } + + + private void verifyQueueRegistered(String queueName) + { + assertNotNull("Queue " + queueName + " was not created", _virtualHost.getQueue(queueName)); + } + + public void testPriorityQueueRegistration() throws Exception + { + Map attributes = new HashMap(); + attributes.put(Queue.ID, UUID.randomUUID()); + attributes.put(Queue.NAME, "testPriorityQueue"); + + attributes.put(PriorityQueue.PRIORITIES, 5); + + + AMQQueue queue = _virtualHost.createQueue(attributes); + + assertEquals("Queue not a priority queue", PriorityQueueImpl.class, queue.getClass()); + verifyQueueRegistered("testPriorityQueue"); + verifyRegisteredQueueCount(1); + } + + + public void testSimpleQueueRegistration() throws Exception + { + String queueName = getName(); + String dlQueueName = queueName + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX; + + Map attributes = new HashMap(); + attributes.put(Queue.ID, UUID.randomUUID()); + attributes.put(Queue.NAME, queueName); + + + AMQQueue queue = _virtualHost.createQueue(attributes); + assertEquals("Queue not a simple queue", StandardQueueImpl.class, queue.getClass()); + verifyQueueRegistered(queueName); + + //verify that no alternate exchange or DLQ were produced + + assertNull("Queue should not have an alternate exchange as DLQ wasn't enabled", queue.getAlternateExchange()); + assertNull("The DLQ should not exist", _virtualHost.getQueue(dlQueueName)); + + verifyRegisteredQueueCount(1); + } + + /** + * Tests that setting the {@link org.apache.qpid.server.queue.QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument true does + * cause the alternate exchange to be set and DLQ to be produced. + */ + public void testDeadLetterQueueEnabled() throws Exception + { + + String queueName = "testDeadLetterQueueEnabled"; + String dlExchangeName = queueName + VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX; + String dlQueueName = queueName + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX; + + assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName)); + assertNull("The alternate exchange should not yet exist", _virtualHost.getExchange(dlExchangeName)); + + Map attributes = new HashMap(); + + attributes.put(Queue.ID, UUID.randomUUID()); + attributes.put(Queue.NAME, queueName); + attributes.put(Queue.CREATE_DLQ_ON_CREATION, true); + + AMQQueue queue = _virtualHost.createQueue(attributes); + + ExchangeImpl altExchange = queue.getAlternateExchange(); + assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange); + assertEquals("Alternate exchange name was not as expected", dlExchangeName, altExchange.getName()); + assertEquals("Alternate exchange type was not as expected", ExchangeDefaults.FANOUT_EXCHANGE_CLASS, altExchange.getTypeName()); + + assertNotNull("The alternate exchange was not registered as expected", _virtualHost.getExchange(dlExchangeName)); + assertEquals("The registered exchange was not the expected exchange instance", altExchange, _virtualHost.getExchange(dlExchangeName)); + + AMQQueue dlQueue = _virtualHost.getQueue(dlQueueName); + assertNotNull("The DLQ was not registered as expected", dlQueue); + assertTrue("DLQ should have been bound to the alternate exchange", altExchange.isBound(dlQueue)); + assertNull("DLQ should have no alternate exchange", dlQueue.getAlternateExchange()); + assertEquals("DLQ should have a zero maximum delivery count", 0, dlQueue.getMaximumDeliveryAttempts()); + + //2 queues should have been registered + verifyRegisteredQueueCount(2); + } + + /** + * Tests that the deadLetterQueues/maximumDeliveryCount settings from the configuration + * are not applied to the DLQ itself. + */ + public void testDeadLetterQueueDoesNotInheritDLQorMDCSettings() throws Exception + { + + String queueName = "testDeadLetterQueueEnabled"; + String dlExchangeName = queueName + VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX; + String dlQueueName = queueName + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX; + + assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName)); + assertNull("The alternate exchange should not yet exist", _virtualHost.getExchange(dlExchangeName)); + + Map attributes = new HashMap(); + attributes.put(Queue.ID, UUID.randomUUID()); + attributes.put(Queue.NAME, queueName); + attributes.put(Queue.CREATE_DLQ_ON_CREATION, true); + attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 5); + + AMQQueue queue = _virtualHost.createQueue(attributes); + + assertEquals("Unexpected maximum delivery count", 5, queue.getMaximumDeliveryAttempts()); + ExchangeImpl altExchange = queue.getAlternateExchange(); + assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange); + assertEquals("Alternate exchange name was not as expected", dlExchangeName, altExchange.getName()); + assertEquals("Alternate exchange type was not as expected", ExchangeDefaults.FANOUT_EXCHANGE_CLASS, altExchange.getTypeName()); + + assertNotNull("The alternate exchange was not registered as expected", _virtualHost.getExchange(dlExchangeName)); + assertEquals("The registered exchange was not the expected exchange instance", altExchange, _virtualHost.getExchange(dlExchangeName)); + + AMQQueue dlQueue = _virtualHost.getQueue(dlQueueName); + assertNotNull("The DLQ was not registered as expected", dlQueue); + assertTrue("DLQ should have been bound to the alternate exchange", altExchange.isBound(dlQueue)); + assertNull("DLQ should have no alternate exchange", dlQueue.getAlternateExchange()); + assertEquals("DLQ should have a zero maximum delivery count", 0, dlQueue.getMaximumDeliveryAttempts()); + + //2 queues should have been registered + verifyRegisteredQueueCount(2); + } + + /** + * Tests that setting the {@link org.apache.qpid.server.queue.QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument false does not + * result in the alternate exchange being set and DLQ being created. + */ + public void testDeadLetterQueueDisabled() throws Exception + { + Map attributes = new HashMap(); + + + String queueName = "testDeadLetterQueueDisabled"; + String dlExchangeName = queueName + VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX; + String dlQueueName = queueName + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX; + + assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName)); + assertNull("The alternate exchange should not exist", _virtualHost.getExchange(dlExchangeName)); + + attributes.put(Queue.ID, UUID.randomUUID()); + attributes.put(Queue.NAME, queueName); + attributes.put(Queue.CREATE_DLQ_ON_CREATION, false); + + AMQQueue queue = _virtualHost.createQueue(attributes); + + assertNull("Queue should not have an alternate exchange as DLQ is disabled", queue.getAlternateExchange()); + assertNull("The alternate exchange should still not exist", _virtualHost.getExchange(dlExchangeName)); + + assertNull("The DLQ should still not exist", _virtualHost.getQueue(dlQueueName)); + + //only 1 queue should have been registered + verifyRegisteredQueueCount(1); + } + + /** + * Tests that setting the {@link org.apache.qpid.server.queue.QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument true but + * creating an auto-delete queue, does not result in the alternate exchange + * being set and DLQ being created. + */ + public void testDeadLetterQueueNotCreatedForAutodeleteQueues() throws Exception + { + + String queueName = "testDeadLetterQueueNotCreatedForAutodeleteQueues"; + String dlExchangeName = queueName + VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX; + String dlQueueName = queueName + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX; + + assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName)); + assertNull("The alternate exchange should not exist", _virtualHost.getExchange(dlExchangeName)); + + Map attributes = new HashMap(); + attributes.put(Queue.ID, UUID.randomUUID()); + attributes.put(Queue.NAME, queueName); + + attributes.put(Queue.CREATE_DLQ_ON_CREATION, true); + attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS); + + //create an autodelete queue + AMQQueue queue = _virtualHost.createQueue(attributes); + assertEquals("Queue should be autodelete", + LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS, + queue.getLifetimePolicy()); + + //ensure that the autodelete property overrides the request to enable DLQ + assertNull("Queue should not have an alternate exchange as queue is autodelete", queue.getAlternateExchange()); + assertNull("The alternate exchange should not exist as queue is autodelete", _virtualHost.getExchange(dlExchangeName)); + assertNull("The DLQ should not exist as queue is autodelete", _virtualHost.getQueue(dlQueueName)); + + //only 1 queue should have been registered + verifyRegisteredQueueCount(1); + } + + /** + * Tests that setting the {@link org.apache.qpid.server.queue.QueueArgumentsConverter#X_QPID_MAXIMUM_DELIVERY_COUNT} argument has + * the desired effect. + */ + public void testMaximumDeliveryCount() throws Exception + { + Map attributes = new HashMap(); + attributes.put(Queue.ID, UUID.randomUUID()); + attributes.put(Queue.NAME, "testMaximumDeliveryCount"); + + attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, (Object) 5); + + final AMQQueue queue = _virtualHost.createQueue(attributes); + + assertNotNull("The queue was not registered as expected ", queue); + assertEquals("Maximum delivery count not as expected", 5, queue.getMaximumDeliveryAttempts()); + + verifyRegisteredQueueCount(1); + } + + /** + * Tests that omitting the {@link org.apache.qpid.server.queue.QueueArgumentsConverter#X_QPID_MAXIMUM_DELIVERY_COUNT} argument means + * that queue is created with a default maximumDeliveryCount of zero (unless set in config). + */ + public void testMaximumDeliveryCountDefault() throws Exception + { + Map attributes = new HashMap(); + attributes.put(Queue.ID, UUID.randomUUID()); + attributes.put(Queue.NAME, "testMaximumDeliveryCountDefault"); + + final AMQQueue queue = _virtualHost.createQueue(attributes); + + assertNotNull("The queue was not registered as expected ", queue); + assertEquals("Maximum delivery count not as expected", 0, queue.getMaximumDeliveryAttempts()); + + verifyRegisteredQueueCount(1); + } + + /** + * Tests queue creation with queue name set to null + */ + public void testQueueNameNullValidation() + { + try + { + Map attributes = new HashMap(); + attributes.put(Queue.ID, UUID.randomUUID()); + + _virtualHost.createQueue(attributes); + fail("queue with null name can not be created!"); + } + catch (Exception e) + { + assertTrue(e instanceof IllegalArgumentException); + assertEquals("Value for attribute name is not found", e.getMessage()); + } + } + + /** + * Tests queue creation with queue name length less 255 characters but + * corresponding DLQ name length greater than 255. + */ + public void testQueueNameWithLengthLessThan255ButDLQNameWithLengthGreaterThan255() + { + String queueName = "test-" + generateStringWithLength('a', 245); + try + { + // change DLQ name to make its length bigger than exchange name + setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, "_DLE"); + setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, "_DLQUEUE"); + + Map attributes = new HashMap(); + attributes.put(Queue.ID, UUID.randomUUID()); + attributes.put(Queue.NAME, queueName); + + attributes.put(Queue.CREATE_DLQ_ON_CREATION, true); + + _virtualHost.createQueue(attributes); + fail("queue with DLQ name having more than 255 characters can not be created!"); + } + catch (Exception e) + { + assertTrue("Unexpected exception is thrown!", e instanceof IllegalArgumentException); + assertTrue("Unexpected exception message!", e.getMessage().contains("DLQ queue name") + && e.getMessage().contains("length exceeds limit of 255")); + } + } + + /** + * Tests queue creation with queue name length less 255 characters but + * corresponding DL exchange name length greater than 255. + */ + public void testQueueNameWithLengthLessThan255ButDLExchangeNameWithLengthGreaterThan255() + { + String queueName = "test-" + generateStringWithLength('a', 245); + try + { + // change DLQ name to make its length bigger than exchange name + setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, "_DLEXCHANGE"); + setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, "_DLQ"); + + Map attributes = new HashMap(); + attributes.put(Queue.ID, UUID.randomUUID()); + attributes.put(Queue.NAME, queueName); + + attributes.put(Queue.CREATE_DLQ_ON_CREATION, (Object) true); + + _virtualHost.createQueue(attributes); + fail("queue with DLE name having more than 255 characters can not be created!"); + } + catch (Exception e) + { + assertTrue("Unexpected exception is thrown!", e instanceof IllegalArgumentException); + assertTrue("Unexpected exception message!", e.getMessage().contains("DL exchange name") + && e.getMessage().contains("length exceeds limit of 255")); + } + } + + public void testMessageGroupQueue() throws Exception + { + + Map attributes = new HashMap(); + attributes.put(Queue.ID, UUID.randomUUID()); + attributes.put(Queue.NAME, getTestName()); + attributes.put(Queue.MESSAGE_GROUP_KEY,"mykey"); + attributes.put(Queue.MESSAGE_GROUP_SHARED_GROUPS, true); + + AMQQueue queue = _virtualHost.createQueue(attributes); + assertEquals("mykey", queue.getAttribute(Queue.MESSAGE_GROUP_KEY)); + assertEquals(Boolean.TRUE, queue.getAttribute(Queue.MESSAGE_GROUP_SHARED_GROUPS)); + } + + private String generateStringWithLength(char ch, int length) + { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < length; i++) + { + sb.append(ch); + } + return sb.toString(); + } + + +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java index bf36fcbd2e..5e1e38106a 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java @@ -45,7 +45,7 @@ import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.RejectBehaviour; import org.apache.qpid.configuration.ClientProperties; -import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.virtualhost.AbstractVirtualHost; import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.apache.qpid.test.utils.TestBrokerConfiguration; @@ -305,11 +305,11 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase AMQDestination checkQueueDLQ; if(durableSub) { - checkQueueDLQ = new AMQQueue("amq.topic", "clientid" + ":" + getName() + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + checkQueueDLQ = new AMQQueue("amq.topic", "clientid" + ":" + getName() + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX); } else { - checkQueueDLQ = new AMQQueue("amq.direct", getTestQueueName() + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + checkQueueDLQ = new AMQQueue("amq.direct", getTestQueueName() + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX); } assertEquals("The DLQ should have " + expected + " msgs on it", expected, @@ -323,12 +323,12 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase MessageConsumer consumer; if(durableSub) { - consumer = clientSession.createConsumer(clientSession.createQueue("clientid:" +getName() + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX)); + consumer = clientSession.createConsumer(clientSession.createQueue("clientid:" +getName() + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX)); } else { consumer = clientSession.createConsumer( - clientSession.createQueue(destName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX)); + clientSession.createQueue(destName + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX)); } //keep track of the message we expect to still be on the DLQ -- cgit v1.2.1