diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2009-12-09 23:58:25 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2009-12-09 23:58:25 +0000 |
| commit | db6241e3cfb4ef420025ba5c8b8ddae888c7171c (patch) | |
| tree | e5aec74133fb1e4e13fdac632de81a863e270dd1 /qpid/java/broker/src/main | |
| parent | f1f7698a041f6534b4c3396f90c3352549ec95f9 (diff) | |
| download | qpid-python-db6241e3cfb4ef420025ba5c8b8ddae888c7171c.tar.gz | |
QPID-2258 : AMQP0-9-1 Compliance fixes
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@889022 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src/main')
14 files changed, 124 insertions, 24 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java index 4788f96d6c..3c3902c545 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java @@ -212,7 +212,7 @@ public class DirectExchange extends AbstractExchange final String routingKey = payload.getRoutingKey(); - final ArrayList<AMQQueue> queues = (routingKey == null) ? null : _index.get(routingKey); + final ArrayList<AMQQueue> queues = (routingKey == null) ? _index.get("") : _index.get(routingKey); if (_logger.isDebugEnabled()) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java index 859a3477e6..0343457a73 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java @@ -102,6 +102,11 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic body.getNoLocal(), body.getNowait(), queue)) { throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied"); + } + else if (queue.isExclusive() && !queue.isDurable() && queue.getExclusiveOwner() != session) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, + "Queue " + queue.getName() + " is exclusive, but not created on this Connection."); } if (body.getConsumerTag() != null) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java index a473184efb..2c4a9b310a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java @@ -97,6 +97,11 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB {
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
}
+ else if (queue.isExclusive() && queue.getExclusiveOwner() != session)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue is exclusive, but not created on this Connection.");
+ }
if (!performGet(queue,session, channel, !body.getNoAck()))
{
@@ -188,6 +193,11 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
}
+ public boolean isTransient()
+ {
+ return true;
+ }
+
public boolean wouldSuspend(QueueEntry msg)
{
return !getCreditManager().useCreditForMessage(msg.getMessage());
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java index bd4b610933..8dbd457cc9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java @@ -60,6 +60,10 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD try { + if(exchangeRegistry.getExchange(body.getExchange()) == null) + { + throw body.getChannelException(AMQConstant.NOT_FOUND, "No such exchange: " + body.getExchange()); + } exchangeRegistry.unregisterExchange(body.getExchange(), body.getIfUnused()); ExchangeDeleteOkBody responseBody = session.getMethodRegistry().createExchangeDeleteOkBody(); @@ -68,6 +72,7 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD } catch (ExchangeInUseException e) { + throw body.getChannelException(AMQConstant.IN_USE, "Exchange in use"); // TODO: sort out consistent channel close mechanism that does all clean up etc. } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java index 84491c1d2e..57ce7a7240 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java @@ -113,6 +113,11 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> { throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied"); } + else if (queue.isExclusive() && !queue.isDurable() && queue.getExclusiveOwner() != session) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, + "Queue " + queue.getName() + " is exclusive, but not created on this Connection."); + } if (!exch.isBound(routingKey, body.getArguments(), queue)) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index 4f69afe755..bb57fdbc36 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -114,25 +114,37 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar { store.createQueue(queue, body.getArguments()); } + if(body.getAutoDelete()) + { + queue.setDeleteOnNoConsumers(true); + } queueRegistry.registerQueue(queue); - if(queue.isExclusive() && !queue.isAutoDelete()) + if(body.getExclusive()) { - final AMQQueue q = queue; - queue.setExclusiveOwner(session); - final AMQProtocolSession.Task sessionCloseTask = new AMQProtocolSession.Task() + if(body.getDurable()) { - public void doTask(AMQProtocolSession session) throws AMQException - { - q.setExclusiveOwner(null); - } - }; - session.addSessionCloseTask(sessionCloseTask); - queue.addQueueDeleteTask(new AMQQueue.Task() { - public void doTask(AMQQueue queue) throws AMQException + queue.setExclusiveOwner(session.getPrincipal().getName()); + } + else + { + final AMQQueue q = queue; + queue.setExclusiveOwner(session); + final AMQProtocolSession.Task sessionCloseTask = new AMQProtocolSession.Task() { - session.removeSessionCloseTask(sessionCloseTask); - } - }); + public void doTask(AMQProtocolSession session) throws AMQException + { + q.setExclusiveOwner(null); + } + }; + session.addSessionCloseTask(sessionCloseTask); + queue.addQueueDeleteTask(new AMQQueue.Task() { + public void doTask(AMQQueue queue) throws AMQException + { + session.removeSessionCloseTask(sessionCloseTask); + } + }); + } + } if (autoRegister) { @@ -143,11 +155,19 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar } } } - else if (queue.getPrincipalHolder() != null - && queue.getPrincipalHolder().getPrincipal() != null - && queue.getPrincipalHolder().getPrincipal().getName() != null - && (!queue.getPrincipalHolder().getPrincipal().getName().equals(session.getPrincipal().getName()) - || ((!body.getPassive() && queue.getExclusiveOwner() != null && queue.getExclusiveOwner() != session)))) + else if (queue.isExclusive() && !queue.isDurable() && queue.getExclusiveOwner() != session) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, + "Queue " + queue.getName() + " is exclusive, but not created on this Connection."); + } + else if(!body.getPassive() && ((queue.isExclusive()) != body.getExclusive())) + { + + throw body.getChannelException(AMQConstant.ALREADY_EXISTS, + "Cannot re-declare queue '" + queue.getName() + "' with different exclusivity (was: " + + queue.isExclusive() + " requested " + body.getExclusive() + ")"); + } + else if (!body.getPassive() && body.getExclusive() && !queue.getExclusiveOwner().equals(queue.isDurable() ? session.getPrincipal().getName() : session)) { throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + queueName + "')," + " as exclusive queue with same name " @@ -155,6 +175,20 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar + queue.getPrincipalHolder().getPrincipal().getName() + "')"); } + else if(!body.getPassive() && queue.isAutoDelete() != body.getAutoDelete()) + { + throw body.getChannelException(AMQConstant.ALREADY_EXISTS, + "Cannot re-declare queue '" + queue.getName() + "' with different auto-delete (was: " + + queue.isAutoDelete() + " requested " + body.getAutoDelete() + ")"); + } + else if(!body.getPassive() && queue.isDurable() != body.getDurable()) + { + throw body.getChannelException(AMQConstant.ALREADY_EXISTS, + "Cannot re-declare queue '" + queue.getName() + "' with different durability (was: " + + queue.isDurable() + " requested " + body.getDurable() + ")"); + } + + AMQChannel channel = session.getChannel(channelId); if (channel == null) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java index 8417492171..3d58ec2133 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java @@ -110,7 +110,11 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB { throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied"); } - + else if (queue.isExclusive() && !queue.isDurable() && queue.getExclusiveOwner() != session) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, + "Queue " + queue.getName() + " is exclusive, but not created on this Connection."); + } int purged = queue.delete(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java index 3e0f2182b7..b94ebb6538 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java @@ -103,6 +103,11 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod if (!virtualHost.getAccessManager().authorisePurge(session, queue))
{
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
+ }
+ else if (queue.isExclusive() && queue.getExclusiveOwner() != session)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue is exclusive, but not created on this Connection.");
}
long purged = queue.clearQueue();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index a459c64946..028f7e15a4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -42,6 +42,9 @@ import java.util.Map; public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeReferrer, TransactionLogResource { + boolean getDeleteOnNoConsumers(); + + void setDeleteOnNoConsumers(boolean b); public interface Context diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 6915850376..3d5d99f0b0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -150,6 +150,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private boolean _nolocal; private final AtomicBoolean _overfull = new AtomicBoolean(false); + private boolean _deleteOnNoConsumers; protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) { @@ -374,7 +375,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener throw new ExistingExclusiveSubscription(); } - if (exclusive) + if (exclusive && !subscription.isTransient()) { if (getConsumerCount() != 0) { @@ -431,7 +432,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // auto-delete queues must be deleted if there are no remaining subscribers - if (_autoDelete && getConsumerCount() == 0 && !isExclusive()) + if (_autoDelete && getDeleteOnNoConsumers() && !subscription.isTransient() && getConsumerCount() == 0 ) { if (_logger.isInfoEnabled()) { @@ -448,6 +449,17 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } + public boolean getDeleteOnNoConsumers() + { + return _deleteOnNoConsumers; + } + + public void setDeleteOnNoConsumers(boolean b) + { + _deleteOnNoConsumers = b; + } + + // ------ Enqueue / Dequeue public QueueEntry enqueue(ServerMessage message) throws AMQException diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java index 4db9c305b2..9e9d2da579 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java @@ -31,6 +31,8 @@ public interface Subscription { LogActor getLogActor(); + boolean isTransient(); + public static enum State { ACTIVE, diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index 5302a3c5d4..684d3c2e74 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -667,6 +667,11 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage return !isBrowser(); } + public boolean isTransient() + { + return false; + } + public void set(String key, Object value) { _properties.put(key, value); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index fb0a5cf2c7..5b3668ab64 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -649,6 +649,11 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr return _logActor; } + public boolean isTransient() + { + return false; + } + ServerSession getSession() { return _session; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index df2754c16b..36ed8e24ce 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -823,6 +823,11 @@ public class ServerSessionDelegate extends SessionDelegate queue.setPrincipalHolder((ServerSession)session); queue.setExclusiveOwner(session); } + else if(method.getAutoDelete()) + { + queue.setDeleteOnNoConsumers(true); + } + final String alternateExchangeName = method.getAlternateExchange(); if(alternateExchangeName != null && alternateExchangeName.length() != 0) { |
