From db6241e3cfb4ef420025ba5c8b8ddae888c7171c Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Wed, 9 Dec 2009 23:58:25 +0000 Subject: QPID-2258 : AMQP0-9-1 Compliance fixes git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@889022 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/server/exchange/DirectExchange.java | 2 +- .../server/handler/BasicConsumeMethodHandler.java | 5 ++ .../qpid/server/handler/BasicGetMethodHandler.java | 10 +++ .../qpid/server/handler/ExchangeDeleteHandler.java | 5 ++ .../qpid/server/handler/QueueBindHandler.java | 5 ++ .../qpid/server/handler/QueueDeclareHandler.java | 74 ++++++++++++++++------ .../qpid/server/handler/QueueDeleteHandler.java | 6 +- .../qpid/server/handler/QueuePurgeHandler.java | 5 ++ .../org/apache/qpid/server/queue/AMQQueue.java | 3 + .../apache/qpid/server/queue/SimpleAMQQueue.java | 16 ++++- .../qpid/server/subscription/Subscription.java | 2 + .../qpid/server/subscription/SubscriptionImpl.java | 5 ++ .../server/subscription/Subscription_0_10.java | 5 ++ .../server/transport/ServerSessionDelegate.java | 5 ++ .../org/apache/qpid/server/queue/MockAMQQueue.java | 9 +++ .../qpid/server/queue/SimpleAMQQueueTest.java | 1 + .../qpid/server/subscription/MockSubscription.java | 5 ++ .../handler/ConnectionStartMethodHandler.java | 8 +++ .../qpid/client/protocol/AMQProtocolSession.java | 9 +-- .../client/MessageListenerMultiConsumerTest.java | 4 +- .../server/logging/DurableQueueLoggingTest.java | 16 +++-- .../org/apache/qpid/server/queue/PriorityTest.java | 6 +- .../qpid/server/queue/ProducerFlowControlTest.java | 16 +++-- .../qpid/server/queue/SubscriptionTestHelper.java | 5 ++ .../test/client/message/MessageToStringTest.java | 12 +++- .../test/client/message/ObjectMessageTest.java | 9 ++- 26 files changed, 197 insertions(+), 51 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java index 4788f96d6c..3c3902c545 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java @@ -212,7 +212,7 @@ public class DirectExchange extends AbstractExchange final String routingKey = payload.getRoutingKey(); - final ArrayList queues = (routingKey == null) ? null : _index.get(routingKey); + final ArrayList queues = (routingKey == null) ? _index.get("") : _index.get(routingKey); if (_logger.isDebugEnabled()) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java index 859a3477e6..0343457a73 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java @@ -102,6 +102,11 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener { throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied"); } + else if (queue.isExclusive() && !queue.isDurable() && queue.getExclusiveOwner() != session) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, + "Queue " + queue.getName() + " is exclusive, but not created on this Connection."); + } if (!exch.isBound(routingKey, body.getArguments(), queue)) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index 4f69afe755..bb57fdbc36 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -114,25 +114,37 @@ public class QueueDeclareHandler implements StateAwareMethodListener, ExchangeReferrer, TransactionLogResource { + boolean getDeleteOnNoConsumers(); + + void setDeleteOnNoConsumers(boolean b); public interface Context diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 6915850376..3d5d99f0b0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -150,6 +150,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private boolean _nolocal; private final AtomicBoolean _overfull = new AtomicBoolean(false); + private boolean _deleteOnNoConsumers; protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) { @@ -374,7 +375,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener throw new ExistingExclusiveSubscription(); } - if (exclusive) + if (exclusive && !subscription.isTransient()) { if (getConsumerCount() != 0) { @@ -431,7 +432,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // auto-delete queues must be deleted if there are no remaining subscribers - if (_autoDelete && getConsumerCount() == 0 && !isExclusive()) + if (_autoDelete && getDeleteOnNoConsumers() && !subscription.isTransient() && getConsumerCount() == 0 ) { if (_logger.isInfoEnabled()) { @@ -448,6 +449,17 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } + public boolean getDeleteOnNoConsumers() + { + return _deleteOnNoConsumers; + } + + public void setDeleteOnNoConsumers(boolean b) + { + _deleteOnNoConsumers = b; + } + + // ------ Enqueue / Dequeue public QueueEntry enqueue(ServerMessage message) throws AMQException diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java index 4db9c305b2..9e9d2da579 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java @@ -31,6 +31,8 @@ public interface Subscription { LogActor getLogActor(); + boolean isTransient(); + public static enum State { ACTIVE, diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index 5302a3c5d4..684d3c2e74 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -667,6 +667,11 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage return !isBrowser(); } + public boolean isTransient() + { + return false; + } + public void set(String key, Object value) { _properties.put(key, value); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index fb0a5cf2c7..5b3668ab64 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -649,6 +649,11 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr return _logActor; } + public boolean isTransient() + { + return false; + } + ServerSession getSession() { return _session; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index df2754c16b..36ed8e24ce 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -823,6 +823,11 @@ public class ServerSessionDelegate extends SessionDelegate queue.setPrincipalHolder((ServerSession)session); queue.setExclusiveOwner(session); } + else if(method.getAutoDelete()) + { + queue.setDeleteOnNoConsumers(true); + } + final String alternateExchangeName = method.getAlternateExchange(); if(alternateExchangeName != null && alternateExchangeName.length() != 0) { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index 910c7d42ed..a487b160e1 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -52,6 +52,15 @@ public class MockAMQQueue implements AMQQueue _name = new AMQShortString(name); } + public boolean getDeleteOnNoConsumers() + { + return false; + } + + public void setDeleteOnNoConsumers(boolean b) + { + } + public AMQShortString getName() { return _name; diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 8c6574095b..408893870b 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -259,6 +259,7 @@ public class SimpleAMQQueueTest extends TestCase { _queue.stop(); _queue = new SimpleAMQQueue(_qname, false, null, true, _virtualHost); + _queue.setDeleteOnNoConsumers(true); _queue.registerSubscription(_subscription, false); AMQMessage message = createMessage(new Long(25)); _queue.enqueue(message); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java index 97ba143bdf..e6fd2172f0 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java @@ -105,6 +105,11 @@ public class MockSubscription implements Subscription return null; //To change body of implemented methods use File | Settings | File Templates. } + public boolean isTransient() + { + return false; + } + public AMQQueue getQueue() { return queue; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java index 8857f1115a..c9212a54c1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java @@ -68,6 +68,14 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener arguments = new HashMap(); arguments.put("x-qpid-priorities", PRIORITIES); // Need to create a queue that does not exist so use test name - ((AMQSession) _session).createQueue(new AMQShortString(getTestQueueName()), false, _durable, false, arguments); + final String queueName = getTestQueueName(); + ((AMQSession) _session).createQueue(new AMQShortString(queueName), false, _durable, false, arguments); + + Queue queue = (Queue) _session.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='"+_durable+"'&autodelete='false'"); + //Need to create a Consumer to ensure that the log has had time to write // as the above Create is Asynchronous - _session.createConsumer(_session.createQueue(getTestQueueName())); + _session.createConsumer(queue); // Validation List results = _monitor.findMatches(QUEUE_PREFIX); @@ -310,11 +314,15 @@ public class DurableQueueLoggingTest extends AbstractTestLogging final Map arguments = new HashMap(); arguments.put("x-qpid-priorities", PRIORITIES); // Need to create a queue that does not exist so use test name - ((AMQSession) _session).createQueue(new AMQShortString(getTestQueueName()), true, _durable, false, arguments); + final String queueName = getTestQueueName() + "-autoDeletePriority"; + ((AMQSession) _session).createQueue(new AMQShortString(queueName), true, _durable, false, arguments); + + Queue queue = (Queue) _session.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='"+_durable+"'&autodelete='true'"); + //Need to create a Consumer to ensure that the log has had time to write // as the above Create is Asynchronous - _session.createConsumer(_session.createQueue(getTestQueueName())); + _session.createConsumer(queue); // Validation List results = _monitor.findMatches(QUEUE_PREFIX); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java index 35b4d7c772..ca38807fb1 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java @@ -90,7 +90,8 @@ public class PriorityTest extends QpidTestCase final Map arguments = new HashMap(); arguments.put("x-qpid-priorities",10); ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); - queue = new AMQQueue("amq.direct",QUEUE); + queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'"); + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); producer = producerSession.createProducer(queue); @@ -130,7 +131,8 @@ public class PriorityTest extends QpidTestCase final Map arguments = new HashMap(); arguments.put("x-qpid-priorities",3); ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); - queue = new AMQQueue("amq.direct",QUEUE); + queue = producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'"); + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); producer = producerSession.createProducer(queue); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java index e6be7c8263..ecb2f7d559 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java @@ -107,7 +107,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging arguments.put("x-qpid-capacity",1000); arguments.put("x-qpid-flow-resume-capacity",800); ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments); - queue = new AMQQueue("amq.direct",queueName); + queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'"); ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); producer = producerSession.createProducer(queue); @@ -149,7 +149,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging arguments.put("x-qpid-capacity",1000); arguments.put("x-qpid-flow-resume-capacity",800); ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments); - queue = new AMQQueue("amq.direct",queueName); + queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'"); ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); producer = producerSession.createProducer(queue); @@ -194,7 +194,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging arguments.put("x-qpid-capacity",1000); arguments.put("x-qpid-flow-resume-capacity",800); ((AMQSession) session).createQueue(new AMQShortString(queueName), true, false, false, arguments); - queue = new AMQQueue("amq.direct",queueName); + queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'"); ((AMQSession) session).declareAndBind((AMQDestination)queue); producer = session.createProducer(queue); @@ -224,7 +224,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging arguments.put("x-qpid-capacity",1000); arguments.put("x-qpid-flow-resume-capacity",1000); ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments); - queue = new AMQQueue("amq.direct",queueName); + queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'"); ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); producer = producerSession.createProducer(queue); @@ -266,7 +266,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging ((AMQSession) consumerSession).createQueue(new AMQShortString(queueName), false, false, false, arguments); - queue = new AMQQueue("amq.direct",queueName); + queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='false'"); ((AMQSession) consumerSession).declareAndBind((AMQDestination)queue); consumerConnection.start(); @@ -322,7 +322,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging arguments.put("x-qpid-capacity",1000); arguments.put("x-qpid-flow-resume-capacity",800); ((AMQSession) session).createQueue(new AMQShortString(queueName), true, false, false, arguments); - queue = new AMQQueue("amq.direct",queueName); + queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'"); ((AMQSession) session).declareAndBind((AMQDestination)queue); producer = session.createProducer(queue); @@ -354,7 +354,9 @@ public class ProducerFlowControlTest extends AbstractTestLogging arguments.put("x-qpid-capacity",0); arguments.put("x-qpid-flow-resume-capacity",0); ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments); - queue = new AMQQueue("amq.direct",queueName); + + queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'"); + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); producer = producerSession.createProducer(queue); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java index fe25bf07f0..352f6ad119 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java @@ -188,6 +188,11 @@ public class SubscriptionTestHelper implements Subscription return null; //To change body of implemented methods use File | Settings | File Templates. } + public boolean isTransient() + { + return false; + } + public AMQQueue getQueue() { return null; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/MessageToStringTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/MessageToStringTest.java index 1744b92d62..39861bb2d5 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/MessageToStringTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/MessageToStringTest.java @@ -21,7 +21,9 @@ package org.apache.qpid.test.client.message; import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.framing.AMQShortString; import javax.jms.BytesMessage; import javax.jms.Connection; @@ -57,12 +59,16 @@ public class MessageToStringTest extends QpidTestCase //Create Producer put some messages on the queue _connection = getConnection(); - //Create Queue - _queue = new AMQQueue("amq.direct", "queue"); - //Create Consumer _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + String queueName = getTestQueueName(); + + //Create Queue + ((AMQSession) _session).createQueue(new AMQShortString(queueName), true, false, false); + _queue = _session.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'"); + + _consumer = _session.createConsumer(_queue); _connection.start(); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java index 001a40988b..f0bbcc7003 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/ObjectMessageTest.java @@ -21,7 +21,9 @@ package org.apache.qpid.test.client.message; import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.framing.AMQShortString; import javax.jms.Connection; import javax.jms.JMSException; @@ -46,12 +48,15 @@ public class ObjectMessageTest extends QpidTestCase //Create Connection _connection = getConnection(); - //Create Queue - Queue queue = new AMQQueue("amq.direct", "queue"); //Create Session _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + //Create Queue + String queueName = getTestQueueName(); + ((AMQSession) _session).createQueue(new AMQShortString(queueName), true, false, false); + Queue queue = _session.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'"); + //Create Consumer _consumer = _session.createConsumer(queue); -- cgit v1.2.1