From a327ecc9225fb303f7fc1305d0e135f331dc7bce Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Mon, 29 Jan 2007 12:13:04 +0000 Subject: Fixes to get the python queue tests to work. (NB: currently, auto-delete is not in so tests that re-use the same exclusive queue conflict with each other) git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@501021 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/server/AMQChannel.java | 4 +- .../handler/ConnectionOpenMethodHandler.java | 8 +- .../qpid/server/handler/MessageConsumeHandler.java | 2 +- .../qpid/server/handler/QueueDeclareHandler.java | 45 +++++++--- .../qpid/server/handler/QueueDeleteHandler.java | 27 ++++-- .../qpid/server/handler/QueuePurgeHandler.java | 81 ++++++++++++++++++ .../server/protocol/AMQMinaProtocolSession.java | 7 ++ .../org/apache/qpid/server/queue/AMQQueue.java | 96 ++++++++++++++++++++-- .../server/queue/ConcurrentDeliveryManager.java | 3 +- .../queue/ConcurrentSelectorDeliveryManager.java | 5 +- .../apache/qpid/server/queue/DeliveryManager.java | 2 +- .../server/queue/SynchronizedDeliveryManager.java | 3 +- .../apache/qpid/server/state/AMQStateManager.java | 1 + .../java/org/apache/qpid/protocol/AMQConstant.java | 2 +- .../qpid/server/queue/AMQQueueMBeanTest.java | 2 +- 15 files changed, 246 insertions(+), 42 deletions(-) create mode 100644 java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java (limited to 'java') diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 5b6bd24faf..3c2c44d422 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -375,7 +375,7 @@ public class AMQChannel * @throws AMQException if something goes wrong */ public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks, - FieldTable filters, boolean noLocal) throws AMQException, ConsumerTagNotUniqueException + FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException, ConsumerTagNotUniqueException { if (tag == null) { @@ -386,7 +386,7 @@ public class AMQChannel throw new ConsumerTagNotUniqueException(); } - queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal); + queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal, exclusive); _consumerTag2QueueMap.put(tag, queue); return tag; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java index e3c1b11162..a0105005b9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java @@ -54,20 +54,18 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener evt) throws AMQException { ConnectionOpenBody body = evt.getMethod(); - String contextKey = body.virtualHost; //todo //FIXME The virtual host must be validated by the server for the connection to open-ok // See Spec (0.8.2). Section 3.1.2 Virtual Hosts - if (contextKey == null) + if (protocolSession.getContextKey() == null) { - contextKey = generateClientID(); + protocolSession.setContextKey(generateClientID()); } - protocolSession.setContextKey(contextKey); // Be aware of possible changes to parameter order as versions change. AMQMethodBody response = ConnectionOpenOkBody.createMethodBody( protocolSession.getMajor(), // AMQP major version protocolSession.getMinor(), // AMQP minor version - contextKey); // knownHosts + body.virtualHost); // knownHosts protocolSession.getStateManager().changeState(AMQState.CONNECTION_OPEN); protocolSession.writeResponse(evt, response); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java index 61da80a2d1..83121e7977 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java @@ -84,7 +84,7 @@ public class MessageConsumeHandler implements StateAwareMethodListener +{ + private static final QueuePurgeHandler _instance = new QueuePurgeHandler(); + + public static QueuePurgeHandler getInstance() + { + return _instance; + } + + private final boolean _failIfNotFound; + + public QueuePurgeHandler() + { + this(true); + } + + public QueuePurgeHandler(boolean failIfNotFound) + { + _failIfNotFound = failIfNotFound; + } + + public void methodReceived(AMQProtocolSession session, AMQMethodEvent evt) throws AMQException + { + QueueRegistry queueRegistry = session.getQueueRegistry(); + + QueuePurgeBody body = evt.getMethod(); + AMQQueue queue; + if(body.queue == null) + { + queue = session.getChannel(evt.getChannelId()).getDefaultQueue(); + if(queue == null) + { + if(_failIfNotFound) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(),"No queue specified."); + } + + } + } + else + { + queue = queueRegistry.getQueue(body.queue); + } + + if(queue == null) + { + if(_failIfNotFound) + { + throw body.getChannelException(404, "Queue " + body.queue + " does not exist."); + } + } + else + { + long purged = queue.clearQueue(); + + + if(!body.nowait) + { + AMQMethodBody response + = QueuePurgeOkBody.createMethodBody(session.getMajor(), session.getMinor(), purged); + session.writeResponse(evt, response); + + } + } + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index d12416437d..1d4f67000e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -49,6 +49,7 @@ import org.apache.qpid.framing.RequestManager; import org.apache.qpid.framing.ResponseManager; import org.apache.qpid.framing.RequestResponseMappingException; import org.apache.qpid.framing.MessageTransferBody; +import org.apache.qpid.common.ClientProperties; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.codec.AMQDecoder; import org.apache.qpid.protocol.AMQMethodEvent; @@ -82,6 +83,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class); + private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString(); + private final IoSession _minaProtocolSession; private String _contextKey; @@ -666,6 +669,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, public void setClientProperties(FieldTable clientProperties) { _clientProperties = clientProperties; + if((_clientProperties != null) && (_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE) != null)) + { + setContextKey(_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE)); + } } public QueueRegistry getQueueRegistry() diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 101a2833a0..a3c4fb1820 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -34,6 +34,8 @@ import javax.management.JMException; import java.text.MessageFormat; import java.util.List; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicBoolean; /** * This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like @@ -41,6 +43,28 @@ import java.util.concurrent.Executor; */ public class AMQQueue implements Managable, Comparable { + public static final class ExistingExclusiveSubscription extends AMQException + { + + public ExistingExclusiveSubscription() + { + super(""); + } + } + + public static final class ExistingSubscriptionPreventsExclusive extends AMQException + { + + public ExistingSubscriptionPreventsExclusive() + { + super(""); + } + } + + private static final ExistingExclusiveSubscription EXISTING_EXCLUSIVE = new ExistingExclusiveSubscription(); + private static final ExistingSubscriptionPreventsExclusive EXISTING_SUBSCRIPTION = new ExistingSubscriptionPreventsExclusive(); + + private static final Logger _logger = Logger.getLogger(AMQQueue.class); private final String _name; @@ -64,6 +88,10 @@ public class AMQQueue implements Managable, Comparable private final SubscriptionFactory _subscriptionFactory; + private final AtomicInteger _subscriberCount = new AtomicInteger(); + + private final AtomicBoolean _isExclusive = new AtomicBoolean(); + /** * Manages message delivery. */ @@ -352,9 +380,9 @@ public class AMQQueue implements Managable, Comparable /** * removes all the messages from the queue. */ - public void clearQueue() throws AMQException + public long clearQueue() throws AMQException { - _deliveryMgr.clearAllMessages(); + return _deliveryMgr.clearAllMessages(); } public void bind(String routingKey, Exchange exchange) @@ -362,14 +390,29 @@ public class AMQQueue implements Managable, Comparable _bindings.addBinding(routingKey, exchange); } - public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters) throws AMQException - { - registerProtocolSession(ps, channel, consumerTag, acks, filters, false); - } - - public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters, boolean noLocal) + public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, + FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException { + if(incrementSubscriberCount() > 1) + { + if(isExclusive()) + { + decrementSubscriberCount(); + throw EXISTING_EXCLUSIVE; + } + else if(exclusive) + { + decrementSubscriberCount(); + throw EXISTING_SUBSCRIPTION; + } + + } + else if(exclusive) + { + setExclusive(true); + } + debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this); Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal); @@ -378,13 +421,33 @@ public class AMQQueue implements Managable, Comparable { if (_deliveryMgr.hasQueuedMessages()) { - _deliveryMgr.populatePreDeliveryQueue(subscription); + _deliveryMgr.populatePreDeliveryQueue(subscription); } } _subscribers.addSubscriber(subscription); } + private boolean isExclusive() + { + return _isExclusive.get(); + } + + private void setExclusive(boolean exclusive) + { + _isExclusive.set(exclusive); + } + + private int incrementSubscriberCount() + { + return _subscriberCount.incrementAndGet(); + } + + private int decrementSubscriberCount() + { + return _subscriberCount.decrementAndGet(); + } + public void unregisterProtocolSession(AMQProtocolSession ps, int channel, String consumerTag) throws AMQException { debug("Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}", ps, channel, consumerTag, @@ -400,6 +463,10 @@ public class AMQQueue implements Managable, Comparable " and protocol session key " + ps.getKey() + " not registered with queue " + this); } + setExclusive(false); + decrementSubscriberCount(); + + // if we are eligible for auto deletion, unregister from the queue registry if (_autoDelete && _subscribers.isEmpty()) { @@ -410,6 +477,17 @@ public class AMQQueue implements Managable, Comparable } } + public boolean isUnused() + { + return _subscribers.isEmpty(); + } + + public boolean isEmpty() + { + return !_deliveryMgr.hasQueuedMessages(); + } + + public int delete(boolean checkUnused, boolean checkEmpty) throws AMQException { if (checkUnused && !_subscribers.isEmpty()) diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java index 2e49a9d4b1..560b881ee3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java @@ -217,7 +217,7 @@ public class ConcurrentDeliveryManager implements DeliveryManager } } - public synchronized void clearAllMessages() throws AMQException + public synchronized long clearAllMessages() throws AMQException { AMQMessage msg = poll(); while (msg != null) @@ -225,6 +225,7 @@ public class ConcurrentDeliveryManager implements DeliveryManager msg.dequeue(_queue); msg = poll(); } + return 0; } /** diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 74f69030e0..5b3bf2bbeb 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -176,14 +176,17 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } - public synchronized void clearAllMessages() throws AMQException + public synchronized long clearAllMessages() throws AMQException { + long count = 0; AMQMessage msg = poll(); while (msg != null) { msg.dequeue(_queue); + count++; msg = poll(); } + return count; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java index cac499587f..936afcde10 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java @@ -70,7 +70,7 @@ interface DeliveryManager void removeAMessageFromTop() throws AMQException; - void clearAllMessages() throws AMQException; + long clearAllMessages() throws AMQException; List getMessages(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java index c967ea2cde..54cd3013ea 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java @@ -136,7 +136,7 @@ class SynchronizedDeliveryManager implements DeliveryManager } } - public synchronized void clearAllMessages() throws AMQException + public synchronized long clearAllMessages() throws AMQException { AMQMessage msg = poll(); while (msg != null) @@ -144,6 +144,7 @@ class SynchronizedDeliveryManager implements DeliveryManager msg.dequeue(_queue); msg = poll(); } + return 0; } /** diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java index 54be424d50..6f158c89d6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java @@ -134,6 +134,7 @@ public class AMQStateManager implements AMQMethodListener frame2handlerMap.put(QueueBindBody.class, QueueBindHandler.getInstance()); frame2handlerMap.put(QueueDeclareBody.class, QueueDeclareHandler.getInstance()); frame2handlerMap.put(QueueDeleteBody.class, QueueDeleteHandler.getInstance()); + frame2handlerMap.put(QueuePurgeBody.class, QueuePurgeHandler.getInstance()); frame2handlerMap.put(ChannelFlowBody.class, ChannelFlowHandler.getInstance()); frame2handlerMap.put(TxSelectBody.class, TxSelectHandler.getInstance()); frame2handlerMap.put(TxCommitBody.class, TxCommitHandler.getInstance()); diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java index a0d243ca30..8ea0d6ef1a 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java @@ -92,7 +92,7 @@ public final class AMQConstant public static final AMQConstant RESOURCE_ERROR = new AMQConstant(506, "resource error", true); - public static final AMQConstant NOT_ALLOWED = new AMQConstant(507, "not allowed", true); + public static final AMQConstant NOT_ALLOWED = new AMQConstant(530, "not allowed", true); public static final AMQConstant NOT_IMPLEMENTED = new AMQConstant(540, "not implemented", true); diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 7c5de67c5e..4352f56d90 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -70,7 +70,7 @@ public class AMQQueueMBeanTest extends TestCase _channel = new AMQChannel(1,_protocolSession, _messageStore, null,null); _protocolSession.addChannel(_channel); - _queue.registerProtocolSession(_protocolSession, 1, "test", false, null); + _queue.registerProtocolSession(_protocolSession, 1, "test", false, null, false, false); assertTrue(_queueMBean.getActiveConsumerCount() == 1); SubscriptionSet _subscribers = (SubscriptionSet) mgr; -- cgit v1.2.1