summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2009-12-09 23:58:25 +0000
committerRobert Godfrey <rgodfrey@apache.org>2009-12-09 23:58:25 +0000
commitdb6241e3cfb4ef420025ba5c8b8ddae888c7171c (patch)
treee5aec74133fb1e4e13fdac632de81a863e270dd1 /qpid/java/broker/src/main
parentf1f7698a041f6534b4c3396f90c3352549ec95f9 (diff)
downloadqpid-python-db6241e3cfb4ef420025ba5c8b8ddae888c7171c.tar.gz
QPID-2258 : AMQP0-9-1 Compliance fixes
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@889022 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src/main')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java74
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java16
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java5
14 files changed, 124 insertions, 24 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
index 4788f96d6c..3c3902c545 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
@@ -212,7 +212,7 @@ public class DirectExchange extends AbstractExchange
final String routingKey = payload.getRoutingKey();
- final ArrayList<AMQQueue> queues = (routingKey == null) ? null : _index.get(routingKey);
+ final ArrayList<AMQQueue> queues = (routingKey == null) ? _index.get("") : _index.get(routingKey);
if (_logger.isDebugEnabled())
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
index 859a3477e6..0343457a73 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
@@ -102,6 +102,11 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
body.getNoLocal(), body.getNowait(), queue))
{
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
+ }
+ else if (queue.isExclusive() && !queue.isDurable() && queue.getExclusiveOwner() != session)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue " + queue.getName() + " is exclusive, but not created on this Connection.");
}
if (body.getConsumerTag() != null)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
index a473184efb..2c4a9b310a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
@@ -97,6 +97,11 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
{
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
}
+ else if (queue.isExclusive() && queue.getExclusiveOwner() != session)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue is exclusive, but not created on this Connection.");
+ }
if (!performGet(queue,session, channel, !body.getNoAck()))
{
@@ -188,6 +193,11 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
}
+ public boolean isTransient()
+ {
+ return true;
+ }
+
public boolean wouldSuspend(QueueEntry msg)
{
return !getCreditManager().useCreditForMessage(msg.getMessage());
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
index bd4b610933..8dbd457cc9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
@@ -60,6 +60,10 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD
try
{
+ if(exchangeRegistry.getExchange(body.getExchange()) == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "No such exchange: " + body.getExchange());
+ }
exchangeRegistry.unregisterExchange(body.getExchange(), body.getIfUnused());
ExchangeDeleteOkBody responseBody = session.getMethodRegistry().createExchangeDeleteOkBody();
@@ -68,6 +72,7 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD
}
catch (ExchangeInUseException e)
{
+ throw body.getChannelException(AMQConstant.IN_USE, "Exchange in use");
// TODO: sort out consistent channel close mechanism that does all clean up etc.
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
index 84491c1d2e..57ce7a7240 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
@@ -113,6 +113,11 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
{
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
}
+ else if (queue.isExclusive() && !queue.isDurable() && queue.getExclusiveOwner() != session)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue " + queue.getName() + " is exclusive, but not created on this Connection.");
+ }
if (!exch.isBound(routingKey, body.getArguments(), queue))
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
index 4f69afe755..bb57fdbc36 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
@@ -114,25 +114,37 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
{
store.createQueue(queue, body.getArguments());
}
+ if(body.getAutoDelete())
+ {
+ queue.setDeleteOnNoConsumers(true);
+ }
queueRegistry.registerQueue(queue);
- if(queue.isExclusive() && !queue.isAutoDelete())
+ if(body.getExclusive())
{
- final AMQQueue q = queue;
- queue.setExclusiveOwner(session);
- final AMQProtocolSession.Task sessionCloseTask = new AMQProtocolSession.Task()
+ if(body.getDurable())
{
- public void doTask(AMQProtocolSession session) throws AMQException
- {
- q.setExclusiveOwner(null);
- }
- };
- session.addSessionCloseTask(sessionCloseTask);
- queue.addQueueDeleteTask(new AMQQueue.Task() {
- public void doTask(AMQQueue queue) throws AMQException
+ queue.setExclusiveOwner(session.getPrincipal().getName());
+ }
+ else
+ {
+ final AMQQueue q = queue;
+ queue.setExclusiveOwner(session);
+ final AMQProtocolSession.Task sessionCloseTask = new AMQProtocolSession.Task()
{
- session.removeSessionCloseTask(sessionCloseTask);
- }
- });
+ public void doTask(AMQProtocolSession session) throws AMQException
+ {
+ q.setExclusiveOwner(null);
+ }
+ };
+ session.addSessionCloseTask(sessionCloseTask);
+ queue.addQueueDeleteTask(new AMQQueue.Task() {
+ public void doTask(AMQQueue queue) throws AMQException
+ {
+ session.removeSessionCloseTask(sessionCloseTask);
+ }
+ });
+ }
+
}
if (autoRegister)
{
@@ -143,11 +155,19 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
}
}
}
- else if (queue.getPrincipalHolder() != null
- && queue.getPrincipalHolder().getPrincipal() != null
- && queue.getPrincipalHolder().getPrincipal().getName() != null
- && (!queue.getPrincipalHolder().getPrincipal().getName().equals(session.getPrincipal().getName())
- || ((!body.getPassive() && queue.getExclusiveOwner() != null && queue.getExclusiveOwner() != session))))
+ else if (queue.isExclusive() && !queue.isDurable() && queue.getExclusiveOwner() != session)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue " + queue.getName() + " is exclusive, but not created on this Connection.");
+ }
+ else if(!body.getPassive() && ((queue.isExclusive()) != body.getExclusive()))
+ {
+
+ throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
+ "Cannot re-declare queue '" + queue.getName() + "' with different exclusivity (was: "
+ + queue.isExclusive() + " requested " + body.getExclusive() + ")");
+ }
+ else if (!body.getPassive() && body.getExclusive() && !queue.getExclusiveOwner().equals(queue.isDurable() ? session.getPrincipal().getName() : session))
{
throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + queueName + "'),"
+ " as exclusive queue with same name "
@@ -155,6 +175,20 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
+ queue.getPrincipalHolder().getPrincipal().getName() + "')");
}
+ else if(!body.getPassive() && queue.isAutoDelete() != body.getAutoDelete())
+ {
+ throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
+ "Cannot re-declare queue '" + queue.getName() + "' with different auto-delete (was: "
+ + queue.isAutoDelete() + " requested " + body.getAutoDelete() + ")");
+ }
+ else if(!body.getPassive() && queue.isDurable() != body.getDurable())
+ {
+ throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
+ "Cannot re-declare queue '" + queue.getName() + "' with different durability (was: "
+ + queue.isDurable() + " requested " + body.getDurable() + ")");
+ }
+
+
AMQChannel channel = session.getChannel(channelId);
if (channel == null)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
index 8417492171..3d58ec2133 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
@@ -110,7 +110,11 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB
{
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
}
-
+ else if (queue.isExclusive() && !queue.isDurable() && queue.getExclusiveOwner() != session)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue " + queue.getName() + " is exclusive, but not created on this Connection.");
+ }
int purged = queue.delete();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
index 3e0f2182b7..b94ebb6538 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
@@ -103,6 +103,11 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod
if (!virtualHost.getAccessManager().authorisePurge(session, queue))
{
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
+ }
+ else if (queue.isExclusive() && queue.getExclusiveOwner() != session)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue is exclusive, but not created on this Connection.");
}
long purged = queue.clearQueue();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index a459c64946..028f7e15a4 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -42,6 +42,9 @@ import java.util.Map;
public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeReferrer, TransactionLogResource
{
+ boolean getDeleteOnNoConsumers();
+
+ void setDeleteOnNoConsumers(boolean b);
public interface Context
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 6915850376..3d5d99f0b0 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -150,6 +150,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private boolean _nolocal;
private final AtomicBoolean _overfull = new AtomicBoolean(false);
+ private boolean _deleteOnNoConsumers;
protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
{
@@ -374,7 +375,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
throw new ExistingExclusiveSubscription();
}
- if (exclusive)
+ if (exclusive && !subscription.isTransient())
{
if (getConsumerCount() != 0)
{
@@ -431,7 +432,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
// auto-delete queues must be deleted if there are no remaining subscribers
- if (_autoDelete && getConsumerCount() == 0 && !isExclusive())
+ if (_autoDelete && getDeleteOnNoConsumers() && !subscription.isTransient() && getConsumerCount() == 0 )
{
if (_logger.isInfoEnabled())
{
@@ -448,6 +449,17 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
+ public boolean getDeleteOnNoConsumers()
+ {
+ return _deleteOnNoConsumers;
+ }
+
+ public void setDeleteOnNoConsumers(boolean b)
+ {
+ _deleteOnNoConsumers = b;
+ }
+
+
// ------ Enqueue / Dequeue
public QueueEntry enqueue(ServerMessage message) throws AMQException
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
index 4db9c305b2..9e9d2da579 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
@@ -31,6 +31,8 @@ public interface Subscription
{
LogActor getLogActor();
+ boolean isTransient();
+
public static enum State
{
ACTIVE,
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index 5302a3c5d4..684d3c2e74 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -667,6 +667,11 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
return !isBrowser();
}
+ public boolean isTransient()
+ {
+ return false;
+ }
+
public void set(String key, Object value)
{
_properties.put(key, value);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
index fb0a5cf2c7..5b3668ab64 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
@@ -649,6 +649,11 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
return _logActor;
}
+ public boolean isTransient()
+ {
+ return false;
+ }
+
ServerSession getSession()
{
return _session;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index df2754c16b..36ed8e24ce 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -823,6 +823,11 @@ public class ServerSessionDelegate extends SessionDelegate
queue.setPrincipalHolder((ServerSession)session);
queue.setExclusiveOwner(session);
}
+ else if(method.getAutoDelete())
+ {
+ queue.setDeleteOnNoConsumers(true);
+ }
+
final String alternateExchangeName = method.getAlternateExchange();
if(alternateExchangeName != null && alternateExchangeName.length() != 0)
{