From ab6fffad2230229810c995253a6f021e42e03aaf Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Sun, 18 Aug 2013 09:13:02 +0000 Subject: QPID-5081 : [Java Broker] Refactor Queue Creation git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1515079 13f79535-47bb-0310-9956-ffa450edef68 --- .../protocol/v0_10/ServerSessionDelegate.java | 252 +++++++++------------ .../server/protocol/v0_10/Subscription_0_10.java | 11 +- .../v0_8/handler/BasicConsumeMethodHandler.java | 2 +- .../v0_8/handler/BasicGetMethodHandler.java | 2 +- .../v0_8/handler/ExchangeBoundHandler.java | 5 +- .../protocol/v0_8/handler/QueueBindHandler.java | 9 +- .../protocol/v0_8/handler/QueueDeclareHandler.java | 179 ++++++++------- .../protocol/v0_8/handler/QueueDeleteHandler.java | 10 +- .../protocol/v0_8/handler/QueuePurgeHandler.java | 3 +- .../protocol/v0_8/handler/QueueUnbindHandler.java | 4 +- .../qpid/server/protocol/v1_0/SendingLink_1_0.java | 16 +- .../qpid/server/protocol/v1_0/Session_1_0.java | 24 +- .../server/jmx/mbeans/VirtualHostManagerMBean.java | 10 +- .../jmx/mbeans/VirtualHostManagerMBeanTest.java | 7 +- 14 files changed, 250 insertions(+), 284 deletions(-) (limited to 'qpid/java/broker-plugins') diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 8e79813216..60211823f8 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -20,24 +20,26 @@ */ package org.apache.qpid.server.protocol.v0_10; +import java.util.LinkedHashMap; +import java.util.UUID; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQStoreException; import org.apache.qpid.AMQUnknownExchangeType; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeInUseException; import org.apache.qpid.server.exchange.HeadersExchange; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; import org.apache.qpid.server.logging.messages.ExchangeMessages; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.DurableConfigurationStoreHelper; @@ -61,6 +63,7 @@ import org.apache.qpid.server.virtualhost.RequiredExchangeException; import org.apache.qpid.server.virtualhost.ReservedExchangeNameException; import org.apache.qpid.server.virtualhost.UnknownExchangeException; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.virtualhost.plugins.QueueExistsException; import org.apache.qpid.transport.*; import java.nio.ByteBuffer; @@ -72,11 +75,6 @@ public class ServerSessionDelegate extends SessionDelegate { private static final Logger LOGGER = Logger.getLogger(ServerSessionDelegate.class); - /** - * No-local queue argument is used to support the no-local feature of Durable Subscribers. - */ - private static final String QUEUE_ARGUMENT_NO_LOCAL = "no-local"; - public ServerSessionDelegate() { @@ -195,10 +193,9 @@ public class ServerSessionDelegate extends SessionDelegate else { String queueName = method.getQueue(); - QueueRegistry queueRegistry = getQueueRegistry(session); + VirtualHost vhost = getVirtualHost(session); - - final AMQQueue queue = queueRegistry.getQueue(queueName); + final AMQQueue queue = vhost.getQueue(queueName); if(queue == null) { @@ -929,7 +926,6 @@ public class ServerSessionDelegate extends SessionDelegate { VirtualHost virtualHost = getVirtualHost(session); - QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); if (!method.hasQueue()) { @@ -947,7 +943,7 @@ public class ServerSessionDelegate extends SessionDelegate { method.setBindingKey(method.getQueue()); } - AMQQueue queue = queueRegistry.getQueue(method.getQueue()); + AMQQueue queue = virtualHost.getQueue(method.getQueue()); Exchange exchange = virtualHost.getExchange(method.getExchange()); if(queue == null) { @@ -991,7 +987,6 @@ public class ServerSessionDelegate extends SessionDelegate public void exchangeUnbind(Session session, ExchangeUnbind method) { VirtualHost virtualHost = getVirtualHost(session); - QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); if (!method.hasQueue()) { @@ -1007,7 +1002,7 @@ public class ServerSessionDelegate extends SessionDelegate } else { - AMQQueue queue = queueRegistry.getQueue(method.getQueue()); + AMQQueue queue = virtualHost.getQueue(method.getQueue()); Exchange exchange = virtualHost.getExchange(method.getExchange()); if(queue == null) { @@ -1174,158 +1169,137 @@ public class ServerSessionDelegate extends SessionDelegate private AMQQueue getQueue(Session session, String queue) { - QueueRegistry queueRegistry = getQueueRegistry(session); - return queueRegistry.getQueue(queue); - } - - private QueueRegistry getQueueRegistry(Session session) - { - return getVirtualHost(session).getQueueRegistry(); + return getVirtualHost(session).getQueue(queue); } @Override public void queueDeclare(Session session, final QueueDeclare method) { - VirtualHost virtualHost = getVirtualHost(session); + final VirtualHost virtualHost = getVirtualHost(session); DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); String queueName = method.getQueue(); AMQQueue queue; - QueueRegistry queueRegistry = getQueueRegistry(session); //TODO: do we need to check that the queue already exists with exactly the same "configuration"? - synchronized (queueRegistry) + final boolean exclusive = method.getExclusive(); + final boolean autoDelete = method.getAutoDelete(); + + if(method.getPassive()) { + queue = virtualHost.getQueue(queueName); - if (((queue = queueRegistry.getQueue(queueName)) == null)) + if (queue == null) { + String description = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ")."; + ExecutionErrorCode errorCode = ExecutionErrorCode.NOT_FOUND; - if (method.getPassive()) - { - String description = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ")."; - ExecutionErrorCode errorCode = ExecutionErrorCode.NOT_FOUND; + exception(session, method, errorCode, description); - exception(session, method, errorCode, description); + } + else if (exclusive && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session))) + { + String description = "Cannot declare queue('" + queueName + "')," + + " as exclusive queue with same name " + + "declared on another session"; + ExecutionErrorCode errorCode = ExecutionErrorCode.RESOURCE_LOCKED; - return; - } - else + exception(session, method, errorCode, description); + + } + } + else + { + + try + { + + String owner = method.getExclusive() ? ((ServerSession)session).getClientID() : null; + final String alternateExchangeName = method.getAlternateExchange(); + + + final Map arguments = QueueArgumentsConverter.convertWireArgsToModel(method.getArguments()); + + if(alternateExchangeName != null && alternateExchangeName.length() != 0) { - try - { - queue = createQueue(queueName, method, virtualHost, (ServerSession)session); - if(!method.getExclusive() && method.getAutoDelete()) - { - queue.setDeleteOnNoConsumers(true); - } + arguments.put(Queue.ALTERNATE_EXCHANGE, alternateExchangeName); + } - final String alternateExchangeName = method.getAlternateExchange(); - if(alternateExchangeName != null && alternateExchangeName.length() != 0) - { - Exchange alternate = getExchange(session, alternateExchangeName); - queue.setAlternateExchange(alternate); - } + final UUID id = UUIDGenerator.generateQueueUUID(queueName, virtualHost.getName()); - if(method.hasArguments() && method.getArguments() != null) - { - if(method.getArguments().containsKey(QUEUE_ARGUMENT_NO_LOCAL)) - { - Object noLocal = method.getArguments().get(QUEUE_ARGUMENT_NO_LOCAL); - queue.setNoLocal(convertBooleanValue(noLocal)); - } - } + final boolean deleteOnNoConsumer = !exclusive && autoDelete; + queue = virtualHost.createQueue(id, queueName, method.getDurable(), owner, + autoDelete, exclusive, deleteOnNoConsumer, + arguments); - if (queue.isDurable() && !queue.isAutoDelete()) + if (autoDelete && exclusive) + { + final AMQQueue q = queue; + final ServerSession.Task deleteQueueTask = new ServerSession.Task() { - if(method.hasArguments() && method.getArguments() != null) + public void doTask(ServerSession session) { - Map args = method.getArguments(); - FieldTable ftArgs = new FieldTable(); - for(Map.Entry entry : args.entrySet()) + try + { + virtualHost.removeQueue(q); + } + catch (AMQException e) { - ftArgs.put(new AMQShortString(entry.getKey()), entry.getValue()); + exception(session, method, e, "Cannot delete '" + method.getQueue()); } - DurableConfigurationStoreHelper.createQueue(store, queue, ftArgs); } - else + }; + final ServerSession s = (ServerSession) session; + s.addSessionCloseTask(deleteQueueTask); + queue.addQueueDeleteTask(new AMQQueue.Task() + { + public void doTask(AMQQueue queue) throws AMQException { - DurableConfigurationStoreHelper.createQueue(store, queue, null); + s.removeSessionCloseTask(deleteQueueTask); } - } - queueRegistry.registerQueue(queue); - - if (method.hasAutoDelete() - && method.getAutoDelete() - && method.hasExclusive() - && method.getExclusive()) + }); + } + if (exclusive) + { + final AMQQueue q = queue; + final ServerSession.Task removeExclusive = new ServerSession.Task() + { + public void doTask(ServerSession session) { - final AMQQueue q = queue; - final ServerSession.Task deleteQueueTask = new ServerSession.Task() - { - public void doTask(ServerSession session) - { - try - { - q.delete(); - } - catch (AMQException e) - { - exception(session, method, e, "Cannot delete '" + method.getQueue()); - } - } - }; - final ServerSession s = (ServerSession) session; - s.addSessionCloseTask(deleteQueueTask); - queue.addQueueDeleteTask(new AMQQueue.Task() - { - public void doTask(AMQQueue queue) throws AMQException - { - s.removeSessionCloseTask(deleteQueueTask); - } - }); + q.setAuthorizationHolder(null); + q.setExclusiveOwningSession(null); } - if (method.hasExclusive() - && method.getExclusive()) + }; + final ServerSession s = (ServerSession) session; + q.setExclusiveOwningSession(s); + s.addSessionCloseTask(removeExclusive); + queue.addQueueDeleteTask(new AMQQueue.Task() + { + public void doTask(AMQQueue queue) throws AMQException { - final AMQQueue q = queue; - final ServerSession.Task removeExclusive = new ServerSession.Task() - { - public void doTask(ServerSession session) - { - q.setAuthorizationHolder(null); - q.setExclusiveOwningSession(null); - } - }; - final ServerSession s = (ServerSession) session; - q.setExclusiveOwningSession(s); - s.addSessionCloseTask(removeExclusive); - queue.addQueueDeleteTask(new AMQQueue.Task() - { - public void doTask(AMQQueue queue) throws AMQException - { - s.removeSessionCloseTask(removeExclusive); - } - }); + s.removeSessionCloseTask(removeExclusive); } - } - catch (AMQException e) - { - exception(session, method, e, "Cannot declare queue '" + queueName); - } + }); } } - else if (method.getExclusive() && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session))) + catch(QueueExistsException qe) { + queue = qe.getExistingQueue(); + if (exclusive && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session))) + { String description = "Cannot declare queue('" + queueName + "')," + " as exclusive queue with same name " + "declared on another session"; ExecutionErrorCode errorCode = ExecutionErrorCode.RESOURCE_LOCKED; exception(session, method, errorCode, description); - - return; + } + } + catch (AMQException e) + { + exception(session, method, e, "Cannot declare queue '" + queueName); } } } @@ -1354,20 +1328,6 @@ public class ServerSessionDelegate extends SessionDelegate return false; } - protected AMQQueue createQueue(final String queueName, - final QueueDeclare body, - VirtualHost virtualHost, - final ServerSession session) - throws AMQException - { - String owner = body.getExclusive() ? session.getClientID() : null; - - final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateQueueUUID(queueName, virtualHost.getName()), queueName, body.getDurable(), owner, - body.getAutoDelete(), body.getExclusive(), virtualHost, body.getArguments()); - - return queue; - } - @Override public void queueDelete(Session session, QueueDelete method) { @@ -1412,12 +1372,7 @@ public class ServerSessionDelegate extends SessionDelegate try { - queue.delete(); - if (queue.isDurable() && !queue.isAutoDelete()) - { - DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); - DurableConfigurationStoreHelper.removeQueue(store,queue); - } + virtualHost.removeQueue(queue); } catch (AMQException e) { @@ -1471,7 +1426,14 @@ public class ServerSessionDelegate extends SessionDelegate result.setDurable(queue.isDurable()); result.setExclusive(queue.isExclusive()); result.setAutoDelete(queue.isAutoDelete()); - result.setArguments(queue.getArguments()); + Map arguments = new LinkedHashMap(); + Collection availableAttrs = queue.getAvailableAttributes(); + + for(String attrName : availableAttrs) + { + arguments.put(attrName, queue.getAttribute(attrName)); + } + result.setArguments(QueueArgumentsConverter.convertModelArgsToWire(arguments)); result.setMessageCount(queue.getMessageCount()); result.setSubscriberCount(queue.getConsumerCount()); @@ -1491,7 +1453,7 @@ public class ServerSessionDelegate extends SessionDelegate if(sub == null) { - exception(session, sfm, ExecutionErrorCode.NOT_FOUND, "not-found: destination '"+destination+"'"); + exception(session, sfm, ExecutionErrorCode.NOT_FOUND, "not-found: destination '" + destination + "'"); } else if(sub.isStopped()) { diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java index c6bceb6ac7..63582702cb 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java @@ -21,9 +21,6 @@ package org.apache.qpid.server.protocol.v0_10; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.flow.FlowCreditManager; @@ -33,6 +30,7 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.messages.SubscriptionMessages; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.message.InboundMessage; @@ -40,6 +38,7 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.InboundMessageAdapter; +import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.AutoCommitTransaction; @@ -65,7 +64,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -169,9 +167,8 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr } _queue = queue; - Map arguments = queue.getArguments(); - _traceExclude = (String) arguments.get("qpid.trace.exclude"); - _trace = (String) arguments.get("qpid.trace.id"); + _traceExclude = (String) queue.getAttribute(Queue.FEDERATION_EXCLUDES); + _trace = (String) queue.getAttribute(Queue.FEDERATION_ID); String filterLogString = null; _logActor = GenericActor.getInstance(this); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java index 6577efe292..4e620327c9 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java @@ -73,7 +73,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener { AMQProtocolSession protocolConnection = stateManager.getProtocolSession(); VirtualHost virtualHost = protocolConnection.getVirtualHost(); - QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); AMQChannel channel = protocolConnection.getChannel(channelId); if (channel == null) @@ -73,7 +72,9 @@ public class QueueBindHandler implements StateAwareMethodListener final AMQQueue queue; final AMQShortString routingKey; - if (body.getQueue() == null) + final AMQShortString queueName = body.getQueue(); + + if (queueName == null) { queue = channel.getDefaultQueue(); @@ -94,13 +95,13 @@ public class QueueBindHandler implements StateAwareMethodListener } else { - queue = queueRegistry.getQueue(body.getQueue()); + queue = virtualHost.getQueue(queueName.toString()); routingKey = body.getRoutingKey() == null ? AMQShortString.EMPTY_STRING : body.getRoutingKey().intern(); } if (queue == null) { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist."); + throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + queueName + " does not exist."); } final String exchangeName = body.getExchange() == null ? null : body.getExchange().toString(); final Exchange exch = virtualHost.getExchange(exchangeName); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java index 9f887d881d..fd547d4bac 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java @@ -35,6 +35,7 @@ import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; @@ -44,6 +45,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Map; import java.util.UUID; +import org.apache.qpid.server.virtualhost.plugins.QueueExistsException; public class QueueDeclareHandler implements StateAwareMethodListener { @@ -61,8 +63,6 @@ public class QueueDeclareHandler implements StateAwareMethodListener arguments = FieldTable.convertToMap(body.getArguments()); + final boolean durable = body.getDurable(); + final boolean autoDelete = body.getAutoDelete(); + final boolean exclusive = body.getExclusive(); + + String owner = exclusive ? AMQShortString.toString(session.getContextKey()) : null; + + Map arguments = + QueueArgumentsConverter.convertWireArgsToModel(FieldTable.convertToMap(body.getArguments())); String queueNameString = AMQShortString.toString(queueName); + final UUID id = UUIDGenerator.generateQueueUUID(queueNameString, virtualHost.getName()); - final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateQueueUUID(queueNameString, virtualHost.getName()), - queueNameString, body.getDurable(), owner, body.getAutoDelete(), - body.getExclusive(),virtualHost, arguments); + final AMQQueue queue = virtualHost.createQueue(id, queueNameString, durable, owner, autoDelete, + exclusive, autoDelete, arguments); - if (body.getExclusive() && !body.getDurable()) + if (exclusive && !durable) { final AMQProtocolSession.Task deleteQueueTask = new AMQProtocolSession.Task() { public void doTask(AMQProtocolSession session) throws AMQException { - if (registry.getQueue(queueName) == queue) + if (virtualHost.getQueue(queueName.toString()) == queue) { - queue.delete(); + virtualHost.removeQueue(queue); } } }; diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java index 6f5e0ea992..a39faf2e70 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java @@ -62,7 +62,6 @@ public class QueueDeleteHandler implements StateAwareMethodListener filters = source.getFilter(); @@ -194,19 +196,19 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS name = UUID.randomUUID().toString(); } - queue = _vhost.getQueueRegistry().getQueue(name); + queue = _vhost.getQueue(name); Exchange exchange = exchangeDestination.getExchange(); if(queue == null) { - queue = AMQQueueFactory.createAMQQueueImpl( + queue = _vhost.createQueue( UUIDGenerator.generateQueueUUID(name, _vhost.getName()), name, isDurable, null, true, true, - _vhost, + true, Collections.EMPTY_MAP); } else @@ -309,11 +311,11 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS { public void doTask(Connection_1_0 session) { - if (_vhost.getQueueRegistry().getQueue(queueName) == tempQueue) + if (_vhost.getQueue(queueName) == tempQueue) { try { - tempQueue.delete(); + _vhost.removeQueue(tempQueue); } catch (AMQException e) { @@ -417,7 +419,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS { try { - queue.delete(); + queue.getVirtualHost().removeQueue(queue); } catch(AMQException e) { diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index ed75a8c165..d3962c779c 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -107,7 +107,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu source.setAddress(tempQueue.getName()); } String addr = source.getAddress(); - AMQQueue queue = _vhost.getQueueRegistry().getQueue(addr); + AMQQueue queue = _vhost.getQueue(addr); if(queue != null) { @@ -256,7 +256,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu } else { - AMQQueue queue = _vhost.getQueueRegistry().getQueue(addr); + AMQQueue queue = _vhost.getQueue(addr); if(queue != null) { @@ -329,14 +329,14 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu ? null : (LifetimePolicy) properties.get(LIFETIME_POLICY); - final AMQQueue tempQueue = queue = AMQQueueFactory.createAMQQueueImpl( UUIDGenerator.generateQueueUUID(queueName, _vhost.getName()), - queueName, - false, // durable - null, // owner - false, // autodelete - false, // exclusive - _vhost, - properties); + final AMQQueue tempQueue = queue = _vhost.createQueue( UUIDGenerator.generateQueueUUID(queueName, _vhost.getName()), + queueName, + false, // durable + null, // owner + false, // autodelete + false, // exclusive + false, + properties); @@ -347,11 +347,11 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu { public void doTask(Connection_1_0 session) { - if (_vhost.getQueueRegistry().getQueue(queueName) == tempQueue) + if (_vhost.getQueue(queueName) == tempQueue) { try { - tempQueue.delete(); + _vhost.removeQueue(tempQueue); } catch (AMQException e) { diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java index 67ac1bdc7c..2c88f83405 100644 --- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java +++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java @@ -48,6 +48,7 @@ import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.QueueArgumentsConverter; @MBeanDescription("This MBean exposes the broker level management features") public class VirtualHostManagerMBean extends AbstractStatisticsGatheringMBean implements ManagedBroker @@ -180,7 +181,8 @@ public class VirtualHostManagerMBean extends AbstractStatisticsGatheringMBean createArgs = processNewQueueArguments(queueName, owner, originalArguments); - getConfiguredObject().createQueue(queueName, State.ACTIVE, durable, false, LifetimePolicy.PERMANENT, 0l, createArgs); + getConfiguredObject().createQueue(queueName, State.ACTIVE, durable, false, LifetimePolicy.PERMANENT, 0l, + QueueArgumentsConverter.convertWireArgsToModel(createArgs)); } @@ -196,11 +198,11 @@ public class VirtualHostManagerMBean extends AbstractStatisticsGatheringMBean(arguments == null ? new HashMap() : arguments); - if (!argumentsCopy.containsKey(AMQQueueFactory.X_QPID_DESCRIPTION)) + if (!argumentsCopy.containsKey(QueueArgumentsConverter.X_QPID_DESCRIPTION)) { - LOGGER.warn("Non-exclusive owner " + owner + " for new queue " + queueName + " moved to " + AMQQueueFactory.X_QPID_DESCRIPTION); + LOGGER.warn("Non-exclusive owner " + owner + " for new queue " + queueName + " moved to " + QueueArgumentsConverter.X_QPID_DESCRIPTION); - argumentsCopy.put(AMQQueueFactory.X_QPID_DESCRIPTION, owner); + argumentsCopy.put(QueueArgumentsConverter.X_QPID_DESCRIPTION, owner); } else { diff --git a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java index e3fac9f711..4240dd5280 100644 --- a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java +++ b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java @@ -38,6 +38,7 @@ import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.QueueArgumentsConverter; public class VirtualHostManagerMBeanTest extends TestCase { @@ -79,16 +80,16 @@ public class VirtualHostManagerMBeanTest extends TestCase { _virtualHostManagerMBean.createNewQueue(TEST_QUEUE_NAME, TEST_OWNER, true); - Map expectedArguments = Collections.singletonMap(AMQQueueFactory.X_QPID_DESCRIPTION, (Object)TEST_OWNER); + Map expectedArguments = Collections.singletonMap(Queue.DESCRIPTION, (Object)TEST_OWNER); verify(_mockVirtualHost).createQueue(TEST_QUEUE_NAME, State.ACTIVE, true, false, LifetimePolicy.PERMANENT, 0, expectedArguments); } public void testCreateQueueWithOwnerAndDescriptionDiscardsOwner() throws Exception { - Map arguments = Collections.singletonMap(AMQQueueFactory.X_QPID_DESCRIPTION, (Object)TEST_DESCRIPTION); + Map arguments = Collections.singletonMap(QueueArgumentsConverter.X_QPID_DESCRIPTION, (Object)TEST_DESCRIPTION); _virtualHostManagerMBean.createNewQueue(TEST_QUEUE_NAME, TEST_OWNER, true, arguments); - Map expectedArguments = Collections.singletonMap(AMQQueueFactory.X_QPID_DESCRIPTION, (Object)TEST_DESCRIPTION); + Map expectedArguments = Collections.singletonMap(Queue.DESCRIPTION, (Object)TEST_DESCRIPTION); verify(_mockVirtualHost).createQueue(TEST_QUEUE_NAME, State.ACTIVE, true, false, LifetimePolicy.PERMANENT, 0, expectedArguments); } -- cgit v1.2.1