summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java274
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java11
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java33
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java320
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java8
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java3
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java8
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java6
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java (renamed from qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java)197
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java10
14 files changed, 337 insertions, 540 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
index de17acabab..97d6355fa4 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
@@ -163,7 +163,6 @@ public class BDBHAVirtualHost extends AbstractVirtualHost<BDBHAVirtualHost>
getConnectionRegistry().close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT);
removeHouseKeepingTasks();
- getQueueRegistry().close();
getDtxRegistry().close();
finalState = VirtualHostState.PASSIVE;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
deleted file mode 100644
index 9a051be324..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.qpid.server.exchange.AMQUnknownExchangeType;
-import org.apache.qpid.server.exchange.ExchangeImpl;
-import org.apache.qpid.server.model.LifetimePolicy;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.server.configuration.BrokerProperties;
-import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
-import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
-import org.apache.qpid.server.util.MapValueConverter;
-import org.apache.qpid.server.util.ServerScopedRuntimeException;
-import org.apache.qpid.server.virtualhost.ExchangeExistsException;
-import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
-import org.apache.qpid.server.virtualhost.UnknownExchangeException;
-import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import org.apache.qpid.server.virtualhost.QueueExistsException;
-
-public class AMQQueueFactory implements QueueFactory
-{
-
-
- public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ";
- public static final String DLQ_ROUTING_KEY = "dlq";
- private static final int MAX_LENGTH = 255;
-
- private final VirtualHostImpl _virtualHost;
- private final QueueRegistry _queueRegistry;
-
- public AMQQueueFactory(VirtualHostImpl virtualHost, QueueRegistry queueRegistry)
- {
- _virtualHost = virtualHost;
- _queueRegistry = queueRegistry;
- }
-
- @Override
- public AMQQueue restoreQueue(Map<String, Object> attributes)
- {
- return createOrRestoreQueue(attributes, false);
-
- }
-
- @Override
- public AMQQueue createQueue(Map<String, Object> attributes)
- {
- return createOrRestoreQueue(attributes, true);
- }
-
- private AMQQueue createOrRestoreQueue(Map<String, Object> attributes, boolean createInStore)
- {
- String queueName = MapValueConverter.getStringAttribute(Queue.NAME,attributes);
- boolean createDLQ = createInStore && shouldCreateDLQ(attributes, _virtualHost.getDefaultDeadLetterQueueEnabled());
- if (createDLQ)
- {
- validateDLNames(queueName);
- }
-
- AMQQueue queue;
-
- if(attributes.containsKey(SortedQueue.SORT_KEY))
- {
- queue = new SortedQueueImpl(_virtualHost, attributes);
- }
- else if(attributes.containsKey(LastValueQueue.LVQ_KEY))
- {
- queue = new LastValueQueueImpl(_virtualHost, attributes);
- }
- else if(attributes.containsKey(PriorityQueue.PRIORITIES))
- {
- queue = new PriorityQueueImpl(_virtualHost, attributes);
- }
- else
- {
- queue = new StandardQueueImpl(_virtualHost, attributes);
- }
- queue.open();
- //Register the new queue
- _queueRegistry.registerQueue(queue);
-
- if(createDLQ)
- {
- createDLQ(queue);
- }
- else if(attributes != null && attributes.get(Queue.ALTERNATE_EXCHANGE) instanceof String)
- {
-
- final String altExchangeAttr = (String) attributes.get(Queue.ALTERNATE_EXCHANGE);
- ExchangeImpl altExchange;
- try
- {
- altExchange = _virtualHost.getExchange(UUID.fromString(altExchangeAttr));
- }
- catch(IllegalArgumentException e)
- {
- altExchange = _virtualHost.getExchange(altExchangeAttr);
- }
- queue.setAlternateExchange(altExchange);
- }
-
- if (createInStore && queue.isDurable() && !(queue.getLifetimePolicy()
- == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE
- || queue.getLifetimePolicy()
- == LifetimePolicy.DELETE_ON_SESSION_END))
- {
- DurableConfigurationStoreHelper.createQueue(_virtualHost.getDurableConfigurationStore(), queue);
- }
-
- return queue;
- }
-
- private void createDLQ(final AMQQueue queue)
- {
- final String queueName = queue.getName();
- final String dlExchangeName = getDeadLetterExchangeName(queueName);
- final String dlQueueName = getDeadLetterQueueName(queueName);
-
- ExchangeImpl dlExchange = null;
- final UUID dlExchangeId = UUIDGenerator.generateExchangeUUID(dlExchangeName, _virtualHost.getName());
-
- try
- {
- Map<String,Object> attributes = new HashMap<String, Object>();
-
- attributes.put(org.apache.qpid.server.model.Exchange.ID, dlExchangeId);
- attributes.put(org.apache.qpid.server.model.Exchange.NAME, dlExchangeName);
- attributes.put(org.apache.qpid.server.model.Exchange.TYPE, ExchangeDefaults.FANOUT_EXCHANGE_CLASS);
- attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, true);
- attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY,
- false ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
- attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null);
- dlExchange = _virtualHost.createExchange(attributes);
- }
- catch(ExchangeExistsException e)
- {
- // We're ok if the exchange already exists
- dlExchange = e.getExistingExchange();
- }
- catch (ReservedExchangeNameException e)
- {
- throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e);
- }
- catch (AMQUnknownExchangeType e)
- {
- throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e);
- }
- catch (UnknownExchangeException e)
- {
- throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e);
- }
-
- AMQQueue dlQueue = null;
-
- synchronized(_queueRegistry)
- {
- dlQueue = _queueRegistry.getQueue(dlQueueName);
-
- if(dlQueue == null)
- {
- //set args to disable DLQ-ing/MDC from the DLQ itself, preventing loops etc
- final Map<String, Object> args = new HashMap<String, Object>();
- args.put(Queue.CREATE_DLQ_ON_CREATION, false);
- args.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 0);
-
- try
- {
-
-
- args.put(Queue.ID, UUID.randomUUID());
- args.put(Queue.NAME, dlQueueName);
- args.put(Queue.DURABLE, true);
- dlQueue = _virtualHost.createQueue(args);
- }
- catch (QueueExistsException e)
- {
- throw new ServerScopedRuntimeException("Attempt to create a queue failed because the " +
- "queue already exists, however this occurred within " +
- "a block where the queue existence had previously been " +
- "checked, and no queue creation should have been " +
- "possible from another thread", e);
- }
- }
- }
-
- //ensure the queue is bound to the exchange
- if(!dlExchange.isBound(DLQ_ROUTING_KEY, dlQueue))
- {
- //actual routing key used does not matter due to use of fanout exchange,
- //but we will make the key 'dlq' as it can be logged at creation.
- dlExchange.addBinding(DLQ_ROUTING_KEY, dlQueue, null);
- }
- queue.setAlternateExchange(dlExchange);
- }
-
- private static void validateDLNames(String name)
- {
- // check if DLQ name and DLQ exchange name do not exceed 255
- String exchangeName = getDeadLetterExchangeName(name);
- if (exchangeName.length() > MAX_LENGTH)
- {
- throw new IllegalArgumentException("DL exchange name '" + exchangeName
- + "' length exceeds limit of " + MAX_LENGTH + " characters for queue " + name);
- }
- String queueName = getDeadLetterQueueName(name);
- if (queueName.length() > MAX_LENGTH)
- {
- throw new IllegalArgumentException("DLQ queue name '" + queueName + "' length exceeds limit of "
- + MAX_LENGTH + " characters for queue " + name);
- }
- }
-
- private static boolean shouldCreateDLQ(Map<String, Object> arguments, boolean virtualHostDefaultDeadLetterQueueEnabled)
- {
- boolean autoDelete = MapValueConverter.getEnumAttribute(LifetimePolicy.class,
- Queue.LIFETIME_POLICY,
- arguments,
- LifetimePolicy.PERMANENT) != LifetimePolicy.PERMANENT;
-
- //feature is not to be enabled for temporary queues or when explicitly disabled by argument
- if (!(autoDelete || (arguments != null && arguments.containsKey(Queue.ALTERNATE_EXCHANGE))))
- {
- boolean dlqArgumentPresent = arguments != null
- && arguments.containsKey(Queue.CREATE_DLQ_ON_CREATION);
- if (dlqArgumentPresent)
- {
- boolean dlqEnabled = true;
- if (dlqArgumentPresent)
- {
- Object argument = arguments.get(Queue.CREATE_DLQ_ON_CREATION);
- dlqEnabled = (argument instanceof Boolean && ((Boolean)argument).booleanValue())
- || (argument instanceof String && Boolean.parseBoolean(argument.toString()));
- }
- return dlqEnabled;
- }
- return virtualHostDefaultDeadLetterQueueEnabled;
- }
- return false;
- }
-
- private static String getDeadLetterQueueName(String name)
- {
- return name + System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, DEFAULT_DLQ_NAME_SUFFIX);
- }
-
- private static String getDeadLetterExchangeName(String name)
- {
- return name + System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX);
- }
-
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index c015bd6d7c..f6a667e612 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -313,7 +313,16 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
_logSubject = new QueueLogSubject(this);
- _virtualHost.getSecurityManager().authoriseCreateQueue(this);
+ try
+ {
+
+ _virtualHost.getSecurityManager().authoriseCreateQueue(this);
+ }
+ catch(AccessControlException e)
+ {
+ deleted();
+ throw e;
+ }
Subject activeSubject = Subject.getSubject(AccessController.getContext());
Set<SessionPrincipal> sessionPrincipals = activeSubject == null ? Collections.<SessionPrincipal>emptySet() : activeSubject.getPrincipals(SessionPrincipal.class);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java
index 9ee9125bcf..079f04f92f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java
@@ -31,7 +31,7 @@ public class LastValueQueueImpl extends AbstractQueue<LastValueQueueImpl> implem
public static final String DEFAULT_LVQ_KEY = "qpid.LVQ_key";
- protected LastValueQueueImpl(VirtualHostImpl virtualHost,
+ public LastValueQueueImpl(VirtualHostImpl virtualHost,
Map<String, Object> attributes)
{
super(virtualHost, attributes, entryList(attributes));
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java
index 4f73bea8e6..89d542aded 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java
@@ -30,7 +30,7 @@ public class PriorityQueueImpl extends OutOfOrderQueue<PriorityQueueImpl> implem
public static final int DEFAULT_PRIORITY_LEVELS = 10;
- protected PriorityQueueImpl(VirtualHostImpl virtualHost,
+ public PriorityQueueImpl(VirtualHostImpl virtualHost,
Map<String, Object> attributes)
{
super(virtualHost, attributes, entryList(attributes));
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java
deleted file mode 100644
index 5b13c4c574..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import java.util.Map;
-
-import org.apache.qpid.server.protocol.AMQSessionModel;
-
-public interface QueueFactory
-{
- AMQQueue createQueue(Map<String, Object> arguments);
-
- AMQQueue restoreQueue(Map<String, Object> arguments);
-
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
index 3115f6f581..c495b09d4a 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
@@ -44,7 +44,7 @@ public class SortedQueueImpl extends OutOfOrderQueue<SortedQueueImpl> implements
}
- protected SortedQueueImpl(VirtualHostImpl virtualHost,
+ public SortedQueueImpl(VirtualHostImpl virtualHost,
Map<String, Object> attributes)
{
this(virtualHost,
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index b4a54aa5cb..0484841aa9 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -43,6 +43,7 @@ import javax.security.auth.Subject;
import org.apache.log4j.Logger;
import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.connection.ConnectionRegistry;
import org.apache.qpid.server.connection.IConnectionRegistry;
@@ -69,13 +70,13 @@ import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.LastValueQueue;
import org.apache.qpid.server.queue.LastValueQueueImpl;
-import org.apache.qpid.server.queue.DefaultQueueRegistry;
import org.apache.qpid.server.queue.PriorityQueue;
-import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.PriorityQueueImpl;
import org.apache.qpid.server.queue.SortedQueue;
+import org.apache.qpid.server.queue.SortedQueueImpl;
+import org.apache.qpid.server.queue.StandardQueueImpl;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.stats.StatisticsCounter;
@@ -91,12 +92,17 @@ import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.MapValueConverter;
public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> extends AbstractConfiguredObject<X>
implements VirtualHostImpl<X, AMQQueue<?>, ExchangeImpl<?>>, IConnectionRegistry.RegistryChangeListener, EventListener,
VirtualHost<X,AMQQueue<?>, ExchangeImpl<?>>
{
+ public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ";
+ public static final String DLQ_ROUTING_KEY = "dlq";
+ private static final int MAX_LENGTH = 255;
+
private static final Logger _logger = Logger.getLogger(AbstractVirtualHost.class);
private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5;
@@ -107,12 +113,10 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
private final Broker<?> _broker;
- private final QueueRegistry _queueRegistry;
-
private final ConnectionRegistry _connectionRegistry;
private final DtxRegistry _dtxRegistry;
- private final AMQQueueFactory _queueFactory;
+
private final SystemNodeRegistry _systemNodeRegistry = new SystemNodeRegistry();
private volatile VirtualHostState _state = VirtualHostState.INITIALISING;
@@ -182,10 +186,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
_connectionRegistry = new ConnectionRegistry();
_connectionRegistry.addRegistryChangeListener(this);
- _queueRegistry = new DefaultQueueRegistry(this);
-
- _queueFactory = new AMQQueueFactory(this, _queueRegistry);
-
_defaultDestination = new DefaultDestination(this);
}
@@ -419,11 +419,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
@Override
public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz)
{
- if(clazz == Queue.class)
- {
- return (Collection<C>) getQueues();
- }
- else if(clazz == Connection.class)
+ if(clazz == Connection.class)
{
return (Collection<C>) getConnections();
}
@@ -567,15 +563,10 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
return _createTime;
}
- public QueueRegistry getQueueRegistry()
- {
- return _queueRegistry;
- }
-
@Override
public AMQQueue<?> getQueue(String name)
{
- return _queueRegistry.getQueue(name);
+ return (AMQQueue<?>) getChildByName(Queue.class, name);
}
@Override
@@ -588,34 +579,31 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
@Override
public AMQQueue<?> getQueue(UUID id)
{
- return _queueRegistry.getQueue(id);
+ return (AMQQueue<?>) getChildById(Queue.class, id);
}
@Override
public Collection<AMQQueue<?>> getQueues()
{
- return _queueRegistry.getQueues();
+ Collection children = getChildren(Queue.class);
+ return children;
}
@Override
public int removeQueue(AMQQueue<?> queue)
{
- synchronized (getQueueRegistry())
- {
- int purged = queue.delete();
+ int purged = queue.delete();
- getQueueRegistry().unregisterQueue(queue.getName());
- if (queue.isDurable() && !(queue.getLifetimePolicy()
- == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE
- || queue.getLifetimePolicy()
- == LifetimePolicy.DELETE_ON_SESSION_END))
- {
- DurableConfigurationStore store = getDurableConfigurationStore();
- DurableConfigurationStoreHelper.removeQueue(store, queue);
- }
- return purged;
+ if (queue.isDurable() && !(queue.getLifetimePolicy()
+ == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE
+ || queue.getLifetimePolicy()
+ == LifetimePolicy.DELETE_ON_SESSION_END))
+ {
+ DurableConfigurationStore store = getDurableConfigurationStore();
+ DurableConfigurationStoreHelper.removeQueue(store, queue);
}
- }
+ return purged;
+}
public AMQQueue<?> createQueue(Map<String, Object> attributes) throws QueueExistsException
{
@@ -656,36 +644,21 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
}
- String queueName = MapValueConverter.getStringAttribute(Queue.NAME, attributes);
-
- synchronized (_queueRegistry)
+ if(!attributes.containsKey(Queue.ID))
{
- if(_queueRegistry.getQueue(queueName) != null)
- {
- throw new QueueExistsException("Queue with name " + queueName + " already exists", _queueRegistry.getQueue(queueName));
- }
- if(!attributes.containsKey(Queue.ID))
- {
-
- UUID id = UUID.randomUUID();
- while(_queueRegistry.getQueue(id) != null)
- {
- id = UUID.randomUUID();
- }
- attributes.put(Queue.ID, id);
+ UUID id = UUID.randomUUID();
+ attributes.put(Queue.ID, id);
+ }
- }
- else if(_queueRegistry.getQueue(MapValueConverter.getUUIDAttribute(Queue.ID, attributes)) != null)
- {
- throw new QueueExistsException("Queue with id "
- + MapValueConverter.getUUIDAttribute(Queue.ID,
- attributes)
- + " already exists", _queueRegistry.getQueue(queueName));
- }
+ boolean createDLQ = shouldCreateDLQ(attributes, getDefaultDeadLetterQueueEnabled());
+ if (createDLQ)
+ {
+ // TODO - this isn't really correct - what if the name has ${foo} in it?
+ validateDLNames(String.valueOf(attributes.get(Queue.NAME)));
+ }
+ return createOrRestoreQueue(attributes, true);
- return _queueFactory.createQueue(attributes);
- }
}
@@ -873,7 +846,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
{
//Stop Connections
_connectionRegistry.close();
- _queueRegistry.close();
_dtxRegistry.close();
closeStorage();
shutdownHouseKeeping();
@@ -1127,7 +1099,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
protected Map<String, DurableConfiguredObjectRecoverer> getDurableConfigurationRecoverers()
{
DurableConfiguredObjectRecoverer[] recoverers = {
- new QueueRecoverer(this, _queueFactory),
+ new QueueRecoverer(this),
new ExchangeRecoverer(this),
new BindingRecoverer(this)
};
@@ -1178,7 +1150,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
public void execute()
{
- for (AMQQueue<?> q : _queueRegistry.getQueues())
+ for (AMQQueue<?> q : getQueues())
{
if (_logger.isDebugEnabled())
{
@@ -1552,4 +1524,220 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
return Collections.unmodifiableCollection(_aliases);
}
+
+ // TODO - remove
+ public AMQQueue restoreQueue(Map<String, Object> attributes)
+ {
+ return createOrRestoreQueue(attributes, false);
+
+ }
+
+
+ private AMQQueue createOrRestoreQueue(Map<String, Object> attributes, boolean createInStore)
+ {
+ String queueName = MapValueConverter.getStringAttribute(Queue.NAME,attributes);
+ boolean createDLQ = createInStore && shouldCreateDLQ(attributes, getDefaultDeadLetterQueueEnabled());
+ if (createDLQ)
+ {
+ validateDLNames(queueName);
+ }
+
+ AMQQueue queue;
+
+ try
+ {
+
+
+ if (attributes.containsKey(SortedQueue.SORT_KEY))
+ {
+ queue = new SortedQueueImpl(this, attributes);
+ }
+ else if (attributes.containsKey(LastValueQueue.LVQ_KEY))
+ {
+ queue = new LastValueQueueImpl(this, attributes);
+ }
+ else if (attributes.containsKey(PriorityQueue.PRIORITIES))
+ {
+ queue = new PriorityQueueImpl(this, attributes);
+ }
+ else
+ {
+ queue = new StandardQueueImpl(this, attributes);
+ }
+ queue.open();
+ }
+ catch(DuplicateNameException e)
+ {
+
+ throw new QueueExistsException(e.getName(), getQueue(e.getName()));
+ }
+
+ if(createDLQ)
+ {
+ createDLQ(queue);
+ }
+ else if(attributes != null && attributes.get(Queue.ALTERNATE_EXCHANGE) instanceof String)
+ {
+
+ final String altExchangeAttr = (String) attributes.get(Queue.ALTERNATE_EXCHANGE);
+ ExchangeImpl altExchange;
+ try
+ {
+ altExchange = getExchange(UUID.fromString(altExchangeAttr));
+ }
+ catch(IllegalArgumentException e)
+ {
+ altExchange = getExchange(altExchangeAttr);
+ }
+ queue.setAlternateExchange(altExchange);
+ }
+
+ if (createInStore && queue.isDurable() && !(queue.getLifetimePolicy()
+ == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE
+ || queue.getLifetimePolicy()
+ == LifetimePolicy.DELETE_ON_SESSION_END))
+ {
+ DurableConfigurationStoreHelper.createQueue(getDurableConfigurationStore(), queue);
+ }
+
+ return queue;
+ }
+
+
+
+ private void createDLQ(final AMQQueue queue)
+ {
+ final String queueName = queue.getName();
+ final String dlExchangeName = getDeadLetterExchangeName(queueName);
+ final String dlQueueName = getDeadLetterQueueName(queueName);
+
+ ExchangeImpl dlExchange = null;
+ final UUID dlExchangeId = UUIDGenerator.generateExchangeUUID(dlExchangeName, getName());
+
+ try
+ {
+ Map<String,Object> attributes = new HashMap<String, Object>();
+
+ attributes.put(org.apache.qpid.server.model.Exchange.ID, dlExchangeId);
+ attributes.put(org.apache.qpid.server.model.Exchange.NAME, dlExchangeName);
+ attributes.put(org.apache.qpid.server.model.Exchange.TYPE, ExchangeDefaults.FANOUT_EXCHANGE_CLASS);
+ attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, true);
+ attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY,
+ false ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
+ attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null);
+ dlExchange = createExchange(attributes);
+ }
+ catch(ExchangeExistsException e)
+ {
+ // We're ok if the exchange already exists
+ dlExchange = e.getExistingExchange();
+ }
+ catch (ReservedExchangeNameException e)
+ {
+ throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e);
+ }
+ catch (AMQUnknownExchangeType e)
+ {
+ throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e);
+ }
+ catch (UnknownExchangeException e)
+ {
+ throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e);
+ }
+
+ AMQQueue dlQueue = null;
+
+ {
+ dlQueue = getQueue(dlQueueName);
+
+ if(dlQueue == null)
+ {
+ //set args to disable DLQ-ing/MDC from the DLQ itself, preventing loops etc
+ final Map<String, Object> args = new HashMap<String, Object>();
+ args.put(Queue.CREATE_DLQ_ON_CREATION, false);
+ args.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 0);
+
+ try
+ {
+
+
+ args.put(Queue.ID, UUID.randomUUID());
+ args.put(Queue.NAME, dlQueueName);
+ args.put(Queue.DURABLE, true);
+ dlQueue = createQueue(args);
+ }
+ catch (QueueExistsException e)
+ {
+ // TODO - currently theoretically for two threads to be creating a queue at the same time.
+ // All model changing operations should be moved to the task executor of the virtual host
+ }
+ }
+ }
+
+ //ensure the queue is bound to the exchange
+ if(!dlExchange.isBound(AbstractVirtualHost.DLQ_ROUTING_KEY, dlQueue))
+ {
+ //actual routing key used does not matter due to use of fanout exchange,
+ //but we will make the key 'dlq' as it can be logged at creation.
+ dlExchange.addBinding(AbstractVirtualHost.DLQ_ROUTING_KEY, dlQueue, null);
+ }
+ queue.setAlternateExchange(dlExchange);
+ }
+
+ private static void validateDLNames(String name)
+ {
+ // check if DLQ name and DLQ exchange name do not exceed 255
+ String exchangeName = getDeadLetterExchangeName(name);
+ if (exchangeName.length() > MAX_LENGTH)
+ {
+ throw new IllegalArgumentException("DL exchange name '" + exchangeName
+ + "' length exceeds limit of " + MAX_LENGTH + " characters for queue " + name);
+ }
+ String queueName = getDeadLetterQueueName(name);
+ if (queueName.length() > MAX_LENGTH)
+ {
+ throw new IllegalArgumentException("DLQ queue name '" + queueName + "' length exceeds limit of "
+ + MAX_LENGTH + " characters for queue " + name);
+ }
+ }
+
+ private static boolean shouldCreateDLQ(Map<String, Object> arguments, boolean virtualHostDefaultDeadLetterQueueEnabled)
+ {
+ boolean autoDelete = MapValueConverter.getEnumAttribute(LifetimePolicy.class,
+ Queue.LIFETIME_POLICY,
+ arguments,
+ LifetimePolicy.PERMANENT) != LifetimePolicy.PERMANENT;
+
+ //feature is not to be enabled for temporary queues or when explicitly disabled by argument
+ if (!(autoDelete || (arguments != null && arguments.containsKey(Queue.ALTERNATE_EXCHANGE))))
+ {
+ boolean dlqArgumentPresent = arguments != null
+ && arguments.containsKey(Queue.CREATE_DLQ_ON_CREATION);
+ if (dlqArgumentPresent)
+ {
+ boolean dlqEnabled = true;
+ if (dlqArgumentPresent)
+ {
+ Object argument = arguments.get(Queue.CREATE_DLQ_ON_CREATION);
+ dlqEnabled = (argument instanceof Boolean && ((Boolean)argument).booleanValue())
+ || (argument instanceof String && Boolean.parseBoolean(argument.toString()));
+ }
+ return dlqEnabled;
+ }
+ return virtualHostDefaultDeadLetterQueueEnabled;
+ }
+ return false;
+ }
+
+ private static String getDeadLetterQueueName(String name)
+ {
+ return name + System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX);
+ }
+
+ private static String getDeadLetterExchangeName(String name)
+ {
+ return name + System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX);
+ }
+
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java
index 788ae8ac9f..1841f1e45f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java
@@ -31,7 +31,6 @@ import org.apache.log4j.Logger;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueFactory;
import org.apache.qpid.server.store.AbstractDurableConfiguredObjectRecoverer;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.UnresolvedDependency;
@@ -41,13 +40,10 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQ
{
private static final Logger _logger = Logger.getLogger(QueueRecoverer.class);
private final VirtualHostImpl _virtualHost;
- private final QueueFactory _queueFactory;
- public QueueRecoverer(final VirtualHostImpl virtualHost,
- final QueueFactory queueFactory)
+ public QueueRecoverer(final VirtualHostImpl virtualHost)
{
_virtualHost = virtualHost;
- _queueFactory = queueFactory;
}
@Override
@@ -110,7 +106,7 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQ
Map<String, Object> attributes = new LinkedHashMap<String, Object>(_attributes);
attributes.put(Queue.ID, _id);
attributes.put(Queue.DURABLE, true);
- _queue = _queueFactory.restoreQueue(attributes);
+ _queue = _virtualHost.restoreQueue(attributes);
}
return _queue;
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
index 91b1a9e408..afb65fa326 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
@@ -127,4 +127,7 @@ public interface VirtualHostImpl< X extends VirtualHostImpl<X,Q,E>, Q extends AM
TaskExecutor getTaskExecutor();
EventLogger getEventLogger();
+
+ // TODO - remove
+ public AMQQueue restoreQueue(Map<String, Object> attributes);
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
index 8cbf999dfb..95de8fd9e5 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
@@ -60,7 +60,6 @@ import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueFactory;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
import org.apache.qpid.server.store.DurableConfigurationRecoverer;
@@ -83,7 +82,6 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
private DurableConfigurationRecoverer _durableConfigurationRecoverer;
private VirtualHostImpl _vhost;
private DurableConfigurationStore _store;
- private QueueFactory _queueFactory;
private ConfiguredObjectFactory _configuredObjectFactory;
private ConfiguredObjectTypeFactory _exchangeFactory;
@@ -135,9 +133,7 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
final ArgumentCaptor<Map> attributesArg = ArgumentCaptor.forClass(Map.class);
- _queueFactory = mock(QueueFactory.class);
-
- when(_queueFactory.restoreQueue(attributesArg.capture())).then(
+ when(_vhost.restoreQueue(attributesArg.capture())).then(
new Answer()
{
@@ -184,7 +180,7 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
DurableConfiguredObjectRecoverer[] recoverers = {
- new QueueRecoverer(_vhost, _queueFactory),
+ new QueueRecoverer(_vhost),
new ExchangeRecoverer(_vhost),
new BindingRecoverer(_vhost)
};
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
index 85eede527a..a58a251fbe 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
@@ -652,6 +652,12 @@ public class MockVirtualHost implements VirtualHostImpl<MockVirtualHost, AMQQueu
}
@Override
+ public AMQQueue restoreQueue(final Map<String, Object> attributes)
+ {
+ return null;
+ }
+
+ @Override
public boolean getDefaultDeadLetterQueueEnabled()
{
return false;
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java
index 8ec78e952e..5313b416c0 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java
@@ -18,158 +18,65 @@
* under the License.
*
*/
-package org.apache.qpid.server.queue;
+package org.apache.qpid.server.virtualhost;
-import static org.mockito.Matchers.anyMap;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.UUID;
-import org.mockito.ArgumentCaptor;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.configuration.BrokerProperties;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.exchange.ExchangeImpl;
-import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.plugin.ExchangeType;
-import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.server.util.MapValueConverter;
-import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.PriorityQueue;
+import org.apache.qpid.server.queue.PriorityQueueImpl;
+import org.apache.qpid.server.queue.StandardQueueImpl;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TestMemoryMessageStore;
+import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.test.utils.QpidTestCase;
-public class AMQQueueFactoryTest extends QpidTestCase
+public class VirtualHostQueueCreationTest extends QpidTestCase
{
- private QueueRegistry _queueRegistry;
private VirtualHostImpl _virtualHost;
- private AMQQueueFactory _queueFactory;
- private List<AMQQueue> _queues;
+ private Broker _broker;
@Override
public void setUp() throws Exception
{
super.setUp();
- _queues = new ArrayList<AMQQueue>();
-
- _virtualHost = mock(VirtualHostImpl.class);
- when(_virtualHost.getSecurityManager()).thenReturn(mock(SecurityManager.class));
- when(_virtualHost.getEventLogger()).thenReturn(new EventLogger());
-
- DurableConfigurationStore store = mock(DurableConfigurationStore.class);
- when(_virtualHost.getDurableConfigurationStore()).thenReturn(store);
-
- mockExchangeCreation();
- mockQueueRegistry();
- delegateVhostQueueCreation();
-
- when(_virtualHost.getQueues()).thenReturn(_queues);
+ _broker = BrokerTestHelper.createBrokerMock();
+ TaskExecutor taskExecutor = mock(TaskExecutor.class);
+ when(taskExecutor.isTaskExecutorThread()).thenReturn(true);
+ when(_broker.getTaskExecutor()).thenReturn(taskExecutor);
-
- _queueFactory = new AMQQueueFactory(_virtualHost, _queueRegistry);
+ _virtualHost = createHost();
+ _virtualHost.open();
}
-
- private void delegateVhostQueueCreation() throws Exception
- {
-
- final ArgumentCaptor<Map> attributes = ArgumentCaptor.forClass(Map.class);
-
- when(_virtualHost.createQueue(attributes.capture())).then(
- new Answer<AMQQueue>()
- {
- @Override
- public AMQQueue answer(InvocationOnMock invocation) throws Throwable
- {
- return _queueFactory.createQueue(attributes.getValue());
- }
- }
- );
- }
-
- private void mockQueueRegistry()
- {
- _queueRegistry = mock(QueueRegistry.class);
-
- final ArgumentCaptor<AMQQueue> capturedQueue = ArgumentCaptor.forClass(AMQQueue.class);
- doAnswer(new Answer()
- {
-
- @Override
- public Object answer(final InvocationOnMock invocation) throws Throwable
- {
- AMQQueue queue = capturedQueue.getValue();
- when(_queueRegistry.getQueue(eq(queue.getId()))).thenReturn(queue);
- when(_queueRegistry.getQueue(eq(queue.getName()))).thenReturn(queue);
- when(_virtualHost.getQueue(eq(queue.getId()))).thenReturn(queue);
- when(_virtualHost.getQueue(eq(queue.getName()))).thenReturn(queue);
- _queues.add(queue);
-
- return null;
- }
- }).when(_queueRegistry).registerQueue(capturedQueue.capture());
- }
-
- private void mockExchangeCreation() throws Exception
+ private VirtualHostImpl createHost()
{
- final ArgumentCaptor<Map> attributes = ArgumentCaptor.forClass(Map.class);
-
-
- when(_virtualHost.createExchange(attributes.capture())).then(
- new Answer<ExchangeImpl>()
- {
- @Override
- public ExchangeImpl answer(InvocationOnMock invocation) throws Throwable
- {
- Map attributeValues = attributes.getValue();
- final String name = MapValueConverter.getStringAttribute(org.apache.qpid.server.model.Exchange.NAME, attributeValues);
- final UUID id = MapValueConverter.getUUIDAttribute(org.apache.qpid.server.model.Exchange.ID, attributeValues);
-
- final ExchangeImpl exchange = mock(ExchangeImpl.class);
- ExchangeType exType = mock(ExchangeType.class);
-
- when(exchange.getName()).thenReturn(name);
- when(exchange.getId()).thenReturn(id);
- when(exchange.getExchangeType()).thenReturn(exType);
-
- final String typeName = MapValueConverter.getStringAttribute(org.apache.qpid.server.model.Exchange.TYPE, attributeValues);
- when(exType.getType()).thenReturn(typeName);
- when(exchange.getTypeName()).thenReturn(typeName);
-
- when(_virtualHost.getExchange(eq(name))).thenReturn(exchange);
- when(_virtualHost.getExchange(eq(id))).thenReturn(exchange);
-
- final ArgumentCaptor<AMQQueue> queue = ArgumentCaptor.forClass(AMQQueue.class);
-
- when(exchange.addBinding(anyString(), queue.capture(), anyMap())).then(new Answer<Boolean>()
- {
-
- @Override
- public Boolean answer(InvocationOnMock invocation) throws Throwable
- {
- when(exchange.isBound(eq(queue.getValue()))).thenReturn(true);
- return true;
- }
- });
-
- return exchange;
- }
- }
- );
+ Map<String, Object> attributes = new HashMap<String, Object>();
+ attributes.put(VirtualHost.NAME, getName());
+ attributes.put(VirtualHost.TYPE, StandardVirtualHost.TYPE);
+ attributes.put(VirtualHost.MESSAGE_STORE_SETTINGS, Collections.singletonMap(MessageStore.STORE_TYPE,
+ TestMemoryMessageStore.TYPE));
+
+ attributes = new HashMap<String, Object>(attributes);
+ attributes.put(VirtualHost.ID, UUID.randomUUID());
+ return new StandardVirtualHost(attributes, _broker);
}
@Override
@@ -198,7 +105,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
attributes.put(PriorityQueue.PRIORITIES, 5);
- AMQQueue queue = _queueFactory.createQueue(attributes);
+ AMQQueue queue = _virtualHost.createQueue(attributes);
assertEquals("Queue not a priority queue", PriorityQueueImpl.class, queue.getClass());
verifyQueueRegistered("testPriorityQueue");
@@ -209,14 +116,14 @@ public class AMQQueueFactoryTest extends QpidTestCase
public void testSimpleQueueRegistration() throws Exception
{
String queueName = getName();
- String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
+ String dlQueueName = queueName + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX;
Map<String,Object> attributes = new HashMap<String, Object>();
attributes.put(Queue.ID, UUID.randomUUID());
attributes.put(Queue.NAME, queueName);
- AMQQueue queue = _queueFactory.createQueue(attributes);
+ AMQQueue queue = _virtualHost.createQueue(attributes);
assertEquals("Queue not a simple queue", StandardQueueImpl.class, queue.getClass());
verifyQueueRegistered(queueName);
@@ -229,7 +136,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
}
/**
- * Tests that setting the {@link QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument true does
+ * Tests that setting the {@link org.apache.qpid.server.queue.QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument true does
* cause the alternate exchange to be set and DLQ to be produced.
*/
public void testDeadLetterQueueEnabled() throws Exception
@@ -237,7 +144,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
String queueName = "testDeadLetterQueueEnabled";
String dlExchangeName = queueName + VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX;
- String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
+ String dlQueueName = queueName + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX;
assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName));
assertNull("The alternate exchange should not yet exist", _virtualHost.getExchange(dlExchangeName));
@@ -248,7 +155,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
attributes.put(Queue.NAME, queueName);
attributes.put(Queue.CREATE_DLQ_ON_CREATION, true);
- AMQQueue queue = _queueFactory.createQueue(attributes);
+ AMQQueue queue = _virtualHost.createQueue(attributes);
ExchangeImpl altExchange = queue.getAlternateExchange();
assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange);
@@ -277,7 +184,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
String queueName = "testDeadLetterQueueEnabled";
String dlExchangeName = queueName + VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX;
- String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
+ String dlQueueName = queueName + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX;
assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName));
assertNull("The alternate exchange should not yet exist", _virtualHost.getExchange(dlExchangeName));
@@ -288,7 +195,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
attributes.put(Queue.CREATE_DLQ_ON_CREATION, true);
attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 5);
- AMQQueue queue = _queueFactory.createQueue(attributes);
+ AMQQueue queue = _virtualHost.createQueue(attributes);
assertEquals("Unexpected maximum delivery count", 5, queue.getMaximumDeliveryAttempts());
ExchangeImpl altExchange = queue.getAlternateExchange();
@@ -310,7 +217,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
}
/**
- * Tests that setting the {@link QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument false does not
+ * Tests that setting the {@link org.apache.qpid.server.queue.QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument false does not
* result in the alternate exchange being set and DLQ being created.
*/
public void testDeadLetterQueueDisabled() throws Exception
@@ -320,7 +227,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
String queueName = "testDeadLetterQueueDisabled";
String dlExchangeName = queueName + VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX;
- String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
+ String dlQueueName = queueName + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX;
assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName));
assertNull("The alternate exchange should not exist", _virtualHost.getExchange(dlExchangeName));
@@ -329,7 +236,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
attributes.put(Queue.NAME, queueName);
attributes.put(Queue.CREATE_DLQ_ON_CREATION, false);
- AMQQueue queue = _queueFactory.createQueue(attributes);
+ AMQQueue queue = _virtualHost.createQueue(attributes);
assertNull("Queue should not have an alternate exchange as DLQ is disabled", queue.getAlternateExchange());
assertNull("The alternate exchange should still not exist", _virtualHost.getExchange(dlExchangeName));
@@ -341,7 +248,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
}
/**
- * Tests that setting the {@link QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument true but
+ * Tests that setting the {@link org.apache.qpid.server.queue.QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument true but
* creating an auto-delete queue, does not result in the alternate exchange
* being set and DLQ being created.
*/
@@ -350,7 +257,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
String queueName = "testDeadLetterQueueNotCreatedForAutodeleteQueues";
String dlExchangeName = queueName + VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX;
- String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
+ String dlQueueName = queueName + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX;
assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName));
assertNull("The alternate exchange should not exist", _virtualHost.getExchange(dlExchangeName));
@@ -363,7 +270,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
//create an autodelete queue
- AMQQueue queue = _queueFactory.createQueue(attributes);
+ AMQQueue queue = _virtualHost.createQueue(attributes);
assertEquals("Queue should be autodelete",
LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS,
queue.getLifetimePolicy());
@@ -378,7 +285,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
}
/**
- * Tests that setting the {@link QueueArgumentsConverter#X_QPID_MAXIMUM_DELIVERY_COUNT} argument has
+ * Tests that setting the {@link org.apache.qpid.server.queue.QueueArgumentsConverter#X_QPID_MAXIMUM_DELIVERY_COUNT} argument has
* the desired effect.
*/
public void testMaximumDeliveryCount() throws Exception
@@ -389,7 +296,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, (Object) 5);
- final AMQQueue queue = _queueFactory.createQueue(attributes);
+ final AMQQueue queue = _virtualHost.createQueue(attributes);
assertNotNull("The queue was not registered as expected ", queue);
assertEquals("Maximum delivery count not as expected", 5, queue.getMaximumDeliveryAttempts());
@@ -398,7 +305,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
}
/**
- * Tests that omitting the {@link QueueArgumentsConverter#X_QPID_MAXIMUM_DELIVERY_COUNT} argument means
+ * Tests that omitting the {@link org.apache.qpid.server.queue.QueueArgumentsConverter#X_QPID_MAXIMUM_DELIVERY_COUNT} argument means
* that queue is created with a default maximumDeliveryCount of zero (unless set in config).
*/
public void testMaximumDeliveryCountDefault() throws Exception
@@ -407,7 +314,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
attributes.put(Queue.ID, UUID.randomUUID());
attributes.put(Queue.NAME, "testMaximumDeliveryCountDefault");
- final AMQQueue queue = _queueFactory.createQueue(attributes);
+ final AMQQueue queue = _virtualHost.createQueue(attributes);
assertNotNull("The queue was not registered as expected ", queue);
assertEquals("Maximum delivery count not as expected", 0, queue.getMaximumDeliveryAttempts());
@@ -425,7 +332,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
Map<String,Object> attributes = new HashMap<String, Object>();
attributes.put(Queue.ID, UUID.randomUUID());
- _queueFactory.createQueue(attributes);
+ _virtualHost.createQueue(attributes);
fail("queue with null name can not be created!");
}
catch (Exception e)
@@ -454,7 +361,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
attributes.put(Queue.CREATE_DLQ_ON_CREATION, true);
- _queueFactory.createQueue(attributes);
+ _virtualHost.createQueue(attributes);
fail("queue with DLQ name having more than 255 characters can not be created!");
}
catch (Exception e)
@@ -484,7 +391,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
attributes.put(Queue.CREATE_DLQ_ON_CREATION, (Object) true);
- _queueFactory.createQueue(attributes);
+ _virtualHost.createQueue(attributes);
fail("queue with DLE name having more than 255 characters can not be created!");
}
catch (Exception e)
@@ -504,7 +411,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
attributes.put(Queue.MESSAGE_GROUP_KEY,"mykey");
attributes.put(Queue.MESSAGE_GROUP_SHARED_GROUPS, true);
- AMQQueue queue = _queueFactory.createQueue(attributes);
+ AMQQueue queue = _virtualHost.createQueue(attributes);
assertEquals("mykey", queue.getAttribute(Queue.MESSAGE_GROUP_KEY));
assertEquals(Boolean.TRUE, queue.getAttribute(Queue.MESSAGE_GROUP_SHARED_GROUPS));
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
index bf36fcbd2e..5e1e38106a 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
@@ -45,7 +45,7 @@ import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.RejectBehaviour;
import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.apache.qpid.test.utils.TestBrokerConfiguration;
@@ -305,11 +305,11 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase
AMQDestination checkQueueDLQ;
if(durableSub)
{
- checkQueueDLQ = new AMQQueue("amq.topic", "clientid" + ":" + getName() + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+ checkQueueDLQ = new AMQQueue("amq.topic", "clientid" + ":" + getName() + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX);
}
else
{
- checkQueueDLQ = new AMQQueue("amq.direct", getTestQueueName() + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+ checkQueueDLQ = new AMQQueue("amq.direct", getTestQueueName() + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX);
}
assertEquals("The DLQ should have " + expected + " msgs on it", expected,
@@ -323,12 +323,12 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase
MessageConsumer consumer;
if(durableSub)
{
- consumer = clientSession.createConsumer(clientSession.createQueue("clientid:" +getName() + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX));
+ consumer = clientSession.createConsumer(clientSession.createQueue("clientid:" +getName() + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX));
}
else
{
consumer = clientSession.createConsumer(
- clientSession.createQueue(destName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX));
+ clientSession.createQueue(destName + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX));
}
//keep track of the message we expect to still be on the DLQ