From 63c953d881df3ba843fc9f2c8667faafe84a9e9a Mon Sep 17 00:00:00 2001 From: Robert Greig Date: Mon, 22 Jan 2007 10:26:03 +0000 Subject: QPID-275 : Patch supplied by Rob Godfrey - Add support for get / purge / qos size / default exchanges and some other small fixes highlighted by the python tests git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@498574 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/server/handler/BasicGetMethodHandler.java | 77 ++++++++++++++++++++ .../qpid/server/handler/QueuePurgeHandler.java | 81 ++++++++++++++++++++++ 2 files changed, 158 insertions(+) create mode 100644 java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java (limited to 'java') diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java new file mode 100644 index 0000000000..1eb3152973 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java @@ -0,0 +1,77 @@ +package org.apache.qpid.server.handler; + +import org.apache.qpid.framing.*; +import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.ConsumerTagNotUniqueException; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQInvalidSelectorException; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; + +public class BasicGetMethodHandler implements StateAwareMethodListener +{ + private static final Logger _log = Logger.getLogger(BasicGetMethodHandler.class); + + private static final BasicGetMethodHandler _instance = new BasicGetMethodHandler(); + + public static BasicGetMethodHandler getInstance() + { + return _instance; + } + + private BasicGetMethodHandler() + { + } + + public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, + ExchangeRegistry exchangeRegistry, AMQProtocolSession session, + AMQMethodEvent evt) throws AMQException + { + BasicGetBody body = evt.getMethod(); + final int channelId = evt.getChannelId(); + + AMQChannel channel = session.getChannel(channelId); + if (channel == null) + { + _log.error("Channel " + channelId + " not found"); + // TODO: either alert or error that the + } + else + { + AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : queueRegistry.getQueue(body.queue); + + if (queue == null) + { + _log.info("No queue for '" + body.queue + "'"); + if(body.queue!=null) + { + throw body.getConnectionException(AMQConstant.NOT_FOUND.getCode(), + "No such queue, '" + body.queue + "'"); + } + else + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(), + "No queue name provided, no default queue defined."); + } + } + else + { + if(!queue.performGet(session, channel, !body.noAck)) + { + + + // TODO - set clusterId + session.writeFrame(BasicGetEmptyBody.createAMQFrame(channelId, (byte) 8, (byte) 0, null)); + } + } + } + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java new file mode 100644 index 0000000000..ad63d36351 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java @@ -0,0 +1,81 @@ +package org.apache.qpid.server.handler; + +import org.apache.qpid.framing.*; +import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; + +public class QueuePurgeHandler implements StateAwareMethodListener +{ + private static final QueuePurgeHandler _instance = new QueuePurgeHandler(); + + public static QueuePurgeHandler getInstance() + { + return _instance; + } + + private final boolean _failIfNotFound; + + public QueuePurgeHandler() + { + this(true); + } + + public QueuePurgeHandler(boolean failIfNotFound) + { + _failIfNotFound = failIfNotFound; + } + + public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent evt) throws AMQException + { + QueuePurgeBody body = evt.getMethod(); + AMQQueue queue; + if(body.queue == null) + { + queue = session.getChannel(evt.getChannelId()).getDefaultQueue(); + if(queue == null) + { + if(_failIfNotFound) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(),"No queue specified."); + } + + } + } + else + { + queue = queues.getQueue(body.queue); + } + + if(queue == null) + { + if(_failIfNotFound) + { + throw body.getChannelException(404, "Queue " + body.queue + " does not exist."); + } + } + else + { + long purged = queue.clearQueue(session.getChannel(evt.getChannelId()).getStoreContext()); + + + if(!body.nowait) + { + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + session.writeFrame(QueuePurgeOkBody.createAMQFrame(evt.getChannelId(), + (byte)8, (byte)0, // AMQP version (major, minor) + purged)); // messageCount + } + } + } +} -- cgit v1.2.1