From db6241e3cfb4ef420025ba5c8b8ddae888c7171c Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Wed, 9 Dec 2009 23:58:25 +0000 Subject: QPID-2258 : AMQP0-9-1 Compliance fixes git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@889022 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/server/exchange/DirectExchange.java | 2 +- .../server/handler/BasicConsumeMethodHandler.java | 5 ++ .../qpid/server/handler/BasicGetMethodHandler.java | 10 +++ .../qpid/server/handler/ExchangeDeleteHandler.java | 5 ++ .../qpid/server/handler/QueueBindHandler.java | 5 ++ .../qpid/server/handler/QueueDeclareHandler.java | 74 ++++++++++++++++------ .../qpid/server/handler/QueueDeleteHandler.java | 6 +- .../qpid/server/handler/QueuePurgeHandler.java | 5 ++ .../org/apache/qpid/server/queue/AMQQueue.java | 3 + .../apache/qpid/server/queue/SimpleAMQQueue.java | 16 ++++- .../qpid/server/subscription/Subscription.java | 2 + .../qpid/server/subscription/SubscriptionImpl.java | 5 ++ .../server/subscription/Subscription_0_10.java | 5 ++ .../server/transport/ServerSessionDelegate.java | 5 ++ 14 files changed, 124 insertions(+), 24 deletions(-) (limited to 'qpid/java/broker/src/main') diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java index 4788f96d6c..3c3902c545 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java @@ -212,7 +212,7 @@ public class DirectExchange extends AbstractExchange final String routingKey = payload.getRoutingKey(); - final ArrayList queues = (routingKey == null) ? null : _index.get(routingKey); + final ArrayList queues = (routingKey == null) ? _index.get("") : _index.get(routingKey); if (_logger.isDebugEnabled()) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java index 859a3477e6..0343457a73 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java @@ -102,6 +102,11 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener { throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied"); } + else if (queue.isExclusive() && !queue.isDurable() && queue.getExclusiveOwner() != session) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, + "Queue " + queue.getName() + " is exclusive, but not created on this Connection."); + } if (!exch.isBound(routingKey, body.getArguments(), queue)) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index 4f69afe755..bb57fdbc36 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -114,25 +114,37 @@ public class QueueDeclareHandler implements StateAwareMethodListener, ExchangeReferrer, TransactionLogResource { + boolean getDeleteOnNoConsumers(); + + void setDeleteOnNoConsumers(boolean b); public interface Context diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 6915850376..3d5d99f0b0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -150,6 +150,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private boolean _nolocal; private final AtomicBoolean _overfull = new AtomicBoolean(false); + private boolean _deleteOnNoConsumers; protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) { @@ -374,7 +375,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener throw new ExistingExclusiveSubscription(); } - if (exclusive) + if (exclusive && !subscription.isTransient()) { if (getConsumerCount() != 0) { @@ -431,7 +432,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // auto-delete queues must be deleted if there are no remaining subscribers - if (_autoDelete && getConsumerCount() == 0 && !isExclusive()) + if (_autoDelete && getDeleteOnNoConsumers() && !subscription.isTransient() && getConsumerCount() == 0 ) { if (_logger.isInfoEnabled()) { @@ -448,6 +449,17 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } + public boolean getDeleteOnNoConsumers() + { + return _deleteOnNoConsumers; + } + + public void setDeleteOnNoConsumers(boolean b) + { + _deleteOnNoConsumers = b; + } + + // ------ Enqueue / Dequeue public QueueEntry enqueue(ServerMessage message) throws AMQException diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java index 4db9c305b2..9e9d2da579 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java @@ -31,6 +31,8 @@ public interface Subscription { LogActor getLogActor(); + boolean isTransient(); + public static enum State { ACTIVE, diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index 5302a3c5d4..684d3c2e74 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -667,6 +667,11 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage return !isBrowser(); } + public boolean isTransient() + { + return false; + } + public void set(String key, Object value) { _properties.put(key, value); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index fb0a5cf2c7..5b3668ab64 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -649,6 +649,11 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr return _logActor; } + public boolean isTransient() + { + return false; + } + ServerSession getSession() { return _session; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index df2754c16b..36ed8e24ce 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -823,6 +823,11 @@ public class ServerSessionDelegate extends SessionDelegate queue.setPrincipalHolder((ServerSession)session); queue.setExclusiveOwner(session); } + else if(method.getAutoDelete()) + { + queue.setDeleteOnNoConsumers(true); + } + final String alternateExchangeName = method.getAlternateExchange(); if(alternateExchangeName != null && alternateExchangeName.length() != 0) { -- cgit v1.2.1