From 267709b3bb825f6cfb652510307687bc67f3dbb0 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Fri, 16 Feb 2007 13:00:39 +0000 Subject: QPID-11 remove protocol literals from code. QPID-376 use of getChannel() does not correct handle error cases when null is returned. Updated AMQMethodBody - to have a convenience method getChannelNotFoundException to be used for QPID-376 when channel is null. This allows the replyCode NOT_FOUND=404 to be changed to changed easily if required. QPID-376 - Updated All Handlers to throw channel exception when channel is null. QPID-11 Updated all handlers to use AMQConstant values rather than hardcoded literals. - Updated AMQException to use AMQConstant values rather than int to ensure that no more literal values creep back in to the code base. Replaced all usages of int above framing to store replycode with AMQConstant to prevent creep. Had to create new constants for literals used in code base but not yet part of spec. 405=Already Exists 406=In Use 323=Invalid Routing Key Remove non spec constant 500=Unknown_Exchange_Name replaced with generic NOT_FOUND git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@508381 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/server/AMQChannel.java | 2 +- .../qpid/server/RequiredDeliveryException.java | 5 ++- .../qpid/server/handler/BasicAckMethodHandler.java | 8 +++- .../server/handler/BasicCancelMethodHandler.java | 12 ++++-- .../server/handler/BasicConsumeMethodHandler.java | 17 ++++---- .../qpid/server/handler/BasicGetMethodHandler.java | 7 ++-- .../server/handler/BasicPublishMethodHandler.java | 10 ++++- .../qpid/server/handler/BasicQosHandler.java | 13 ++++-- .../server/handler/BasicRecoverMethodHandler.java | 7 ++-- .../qpid/server/handler/ChannelFlowHandler.java | 6 +++ .../handler/ConnectionOpenMethodHandler.java | 2 +- .../server/handler/ExchangeDeclareHandler.java | 6 +-- .../qpid/server/handler/QueueBindHandler.java | 35 ++++++++++++---- .../qpid/server/handler/QueueDeclareHandler.java | 18 ++++++--- .../qpid/server/handler/QueueDeleteHandler.java | 18 +++++++-- .../qpid/server/handler/QueuePurgeHandler.java | 20 +++++++--- .../qpid/server/handler/TxCommitHandler.java | 12 ++++-- .../qpid/server/handler/TxRollbackHandler.java | 17 ++++++-- .../qpid/server/handler/TxSelectHandler.java | 15 +++++-- .../server/protocol/AMQMinaProtocolSession.java | 2 + .../qpid/server/queue/NoConsumersException.java | 4 +- .../apache/qpid/server/queue/SubscriptionImpl.java | 46 ++++++++++------------ .../apache/qpid/server/state/AMQStateManager.java | 3 +- 23 files changed, 193 insertions(+), 92 deletions(-) (limited to 'java/broker/src/main') diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 8b36576a30..0879b77f37 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -602,7 +602,7 @@ public class AMQChannel for (RequiredDeliveryException bouncedMessage : _returnMessages) { AMQMessage message = bouncedMessage.getAMQMessage(); - message.writeReturn(session, _channelId, bouncedMessage.getReplyCode(), new AMQShortString(bouncedMessage.getMessage())); + message.writeReturn(session, _channelId, bouncedMessage.getReplyCode().getCode(), new AMQShortString(bouncedMessage.getMessage())); } _returnMessages.clear(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java index b85e3603b7..820f0122f5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java @@ -21,6 +21,7 @@ package org.apache.qpid.server; import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.queue.AMQMessage; /** @@ -44,10 +45,10 @@ public abstract class RequiredDeliveryException extends AMQException return _amqMessage; } - public int getErrorCode() + public AMQConstant getErrorCode() { return getReplyCode(); } - public abstract int getReplyCode(); + public abstract AMQConstant getReplyCode(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java index 6688318a0a..f93b2b25e6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java @@ -47,13 +47,19 @@ public class BasicAckMethodHandler implements StateAwareMethodListener evt) throws AMQException { AMQProtocolSession protocolSession = stateManager.getProtocolSession(); - + if (_log.isDebugEnabled()) { _log.debug("Ack received on channel " + evt.getChannelId()); } BasicAckBody body = evt.getMethod(); final AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); + + if (channel == null) + { + throw body.getChannelNotFoundException(evt.getChannelId()); + } + // this method throws an AMQException if the delivery tag is not known channel.acknowledgeMessage(body.deliveryTag, body.multiple); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java index 1e56542b2b..7d18043f5c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java @@ -49,15 +49,21 @@ public class BasicCancelMethodHandler implements StateAwareMethodListener { @@ -40,12 +41,18 @@ public class BasicQosHandler implements StateAwareMethodListener public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException { AMQProtocolSession session = stateManager.getProtocolSession(); - session.getChannel(evt.getChannelId()).setPrefetchCount(evt.getMethod().prefetchCount); - session.getChannel(evt.getChannelId()).setPrefetchSize(evt.getMethod().prefetchSize); + AMQChannel channel = session.getChannel(evt.getChannelId()); + if (channel == null) + { + throw evt.getMethod().getChannelNotFoundException(evt.getChannelId()); + } + + channel.setPrefetchCount(evt.getMethod().prefetchCount); + channel.setPrefetchSize(evt.getMethod().prefetchSize); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - session.writeFrame(BasicQosOkBody.createAMQFrame(evt.getChannelId(),(byte)8, (byte)0)); + session.writeFrame(BasicQosOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0)); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java index 9f0d096a73..5f5b7ccad1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java @@ -46,12 +46,13 @@ public class BasicRecoverMethodHandler implements StateAwareMethodListener { @@ -57,17 +59,25 @@ public class QueueBindHandler implements StateAwareMethodListener VirtualHost virtualHost = session.getVirtualHost(); ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); - final QueueBindBody body = evt.getMethod(); final AMQQueue queue; if (body.queue == null) { - queue = session.getChannel(evt.getChannelId()).getDefaultQueue(); + AMQChannel channel = session.getChannel(evt.getChannelId()); + +// if (channel == null) +// { +// throw body.getChannelNotFoundException(evt.getChannelId()); +// } + + queue = channel.getDefaultQueue(); + if (queue == null) { - throw new AMQException("No default queue defined on channel and queue was null"); + throw body.getChannelException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null"); } + if (body.routingKey == null) { body.routingKey = queue.getName(); @@ -80,14 +90,25 @@ public class QueueBindHandler implements StateAwareMethodListener if (queue == null) { - throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), "Queue " + body.queue + " does not exist."); + throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.queue + " does not exist."); } final Exchange exch = exchangeRegistry.getExchange(body.exchange); if (exch == null) { - throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), "Exchange " + body.exchange + " does not exist."); + throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.exchange + " does not exist."); + } + try + { + exch.registerQueue(body.routingKey, queue, body.arguments); + } + catch (AMQInvalidRoutingKeyException rke) + { + throw body.getChannelException(AMQConstant.INVALID_ROUTING_KEY, body.routingKey.toString()); + } + catch (AMQException e) + { + throw body.getChannelException(AMQConstant.CHANNEL_ERROR, e.toString()); } - exch.registerQueue(body.routingKey, queue, body.arguments); queue.bind(body.routingKey, exch); if (_log.isInfoEnabled()) { @@ -98,7 +119,7 @@ public class QueueBindHandler implements StateAwareMethodListener // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - final AMQFrame response = QueueBindOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0); + final AMQFrame response = QueueBindOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0); session.writeFrame(response); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index 1e4b7c9e57..2218ff604f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -43,6 +43,7 @@ import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.AMQChannel; public class QueueDeclareHandler implements StateAwareMethodListener { @@ -83,7 +84,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener { @@ -65,7 +67,15 @@ public class QueueDeleteHandler implements StateAwareMethodListener { @@ -39,18 +40,27 @@ public class QueuePurgeHandler implements StateAwareMethodListener public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); + AMQProtocolSession session = stateManager.getProtocolSession(); try { @@ -56,14 +56,20 @@ public class TxCommitHandler implements StateAwareMethodListener _log.debug("Commit received on channel " + evt.getChannelId()); } AMQChannel channel = session.getChannel(evt.getChannelId()); + + if (channel == null) + { + throw evt.getMethod().getChannelNotFoundException(evt.getChannelId()); + } + channel.commit(); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - session.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0)); + session.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0)); channel.processReturns(session); } - catch(AMQException e) + catch (AMQException e) { throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage()); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java index 9088240351..8ce5a0ea73 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java @@ -45,18 +45,27 @@ public class TxRollbackHandler implements StateAwareMethodListener evt) throws AMQException { AMQProtocolSession session = stateManager.getProtocolSession(); - - try{ + + try + { AMQChannel channel = session.getChannel(evt.getChannelId()); + + if (channel == null) + { + throw evt.getMethod().getChannelNotFoundException(evt.getChannelId()); + } + channel.rollback(); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - session.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0)); + session.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0)); //Now resend all the unacknowledged messages back to the original subscribers. //(Must be done after the TxnRollback-ok response). channel.resend(session, false); - }catch(AMQException e){ + } + catch (AMQException e) + { throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to rollback: " + e.getMessage()); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java index 29795e50ca..a9e478e301 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java @@ -27,6 +27,7 @@ import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.AMQChannel; public class TxSelectHandler implements StateAwareMethodListener { @@ -44,11 +45,19 @@ public class TxSelectHandler implements StateAwareMethodListener public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException { AMQProtocolSession session = stateManager.getProtocolSession(); - - session.getChannel(evt.getChannelId()).setLocalTransactional(); + + AMQChannel channel = session.getChannel(evt.getChannelId()); + + if (channel == null) + { + throw evt.getMethod().getChannelNotFoundException(evt.getChannelId()); + } + + channel.setLocalTransactional(); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - session.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0)); + session.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0)); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index e53410420f..309fa4663a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -325,6 +325,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.debug("Content header frame received: " + frame); } + //fixme what happens if getChannel returns null getChannel(frame.getChannel()).publishContentHeader((ContentHeaderBody) frame.getBodyFrame()); } @@ -334,6 +335,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.debug("Content body frame received: " + frame); } + //fixme what happens if getChannel returns null getChannel(frame.getChannel()).publishContentBody((ContentBody) frame.getBodyFrame(), this); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java index 2049189e0f..c63490f019 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java @@ -35,8 +35,8 @@ public class NoConsumersException extends RequiredDeliveryException super("Immediate delivery is not possible.", message); } - public int getReplyCode() + public AMQConstant getReplyCode() { - return AMQConstant.NO_CONSUMERS.getCode(); + return AMQConstant.NO_CONSUMERS; } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index 05841ccfc0..6bdfeccc0f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -24,6 +24,8 @@ import java.util.Queue; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQChannelException; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.common.ClientProperties; import org.apache.qpid.framing.AMQShortString; @@ -37,11 +39,8 @@ import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; /** - * Encapsulation of a supscription to a queue. - *

- * Ties together the protocol session of a subscriber, the consumer tag that - * was given out by the broker and the channel id. - *

+ * Encapsulation of a supscription to a queue.

Ties together the protocol session of a subscriber, the consumer tag + * that was given out by the broker and the channel id.

*/ public class SubscriptionImpl implements Subscription { @@ -59,9 +58,7 @@ public class SubscriptionImpl implements Subscription private final boolean _noLocal; - /** - * True if messages need to be acknowledged - */ + /** True if messages need to be acknowledged */ private final boolean _acks; private FilterManager _filters; private final boolean _isBrowser; @@ -96,8 +93,8 @@ public class SubscriptionImpl implements Subscription { AMQChannel channel = protocolSession.getChannel(channelId); if (channel == null) - { - throw new NullPointerException("channel not found in protocol session"); + { + throw new AMQException(AMQConstant.NOT_FOUND, "channel :" + channelId + " not found in protocol session"); } this.channel = channel; @@ -172,9 +169,7 @@ public class SubscriptionImpl implements Subscription return (o instanceof SubscriptionImpl) && equals((SubscriptionImpl) o); } - /** - * Equality holds if the session matches and the channel and consumer tag are the same. - */ + /** Equality holds if the session matches and the channel and consumer tag are the same. */ private boolean equals(SubscriptionImpl psc) { return sessionKey.equals(psc.sessionKey) @@ -193,11 +188,12 @@ public class SubscriptionImpl implements Subscription } /** - * This method can be called by each of the publisher threads. - * As a result all changes to the channel object must be thread safe. + * This method can be called by each of the publisher threads. As a result all changes to the channel object must be + * thread safe. * * @param msg * @param queue + * * @throws AMQException */ public void send(AMQMessage msg, AMQQueue queue) throws AMQException @@ -224,7 +220,7 @@ public class SubscriptionImpl implements Subscription // We don't decrement the reference here as we don't want to consume the message // but we do want to send it to the client. - synchronized(channel) + synchronized (channel) { long deliveryTag = channel.getNextDeliveryTag(); @@ -260,7 +256,7 @@ public class SubscriptionImpl implements Subscription } queue.dequeue(storeContext, msg); } - synchronized(channel) + synchronized (channel) { long deliveryTag = channel.getNextDeliveryTag(); @@ -309,11 +305,11 @@ public class SubscriptionImpl implements Subscription Object localInstance; Object msgInstance; - if((protocolSession.getClientProperties() != null) && - (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) + if ((protocolSession.getClientProperties() != null) && + (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) { - if((msg.getPublisher().getClientProperties() != null) && - (msgInstance = msg.getPublisher().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) + if ((msg.getPublisher().getClientProperties() != null) && + (msgInstance = msg.getPublisher().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) { if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance))) { @@ -402,10 +398,10 @@ public class SubscriptionImpl implements Subscription // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(), - protocolSession.getProtocolMajorVersion(), - protocolSession.getProtocolMinorVersion(), - consumerTag // consumerTag - )); + protocolSession.getProtocolMajorVersion(), + protocolSession.getProtocolMinorVersion(), + consumerTag // consumerTag + )); _closed = true; } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java index 6d1e9ce99d..29efdd9513 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java @@ -55,7 +55,6 @@ import org.apache.qpid.framing.QueuePurgeBody; import org.apache.qpid.framing.TxCommitBody; import org.apache.qpid.framing.TxRollbackBody; import org.apache.qpid.framing.TxSelectBody; -import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.server.handler.BasicAckMethodHandler; @@ -231,7 +230,7 @@ public class AMQStateManager implements AMQMethodListener && (protocolSession.getChannel(evt.getChannelId()) == null) && !protocolSession.channelAwaitingClosure(evt.getChannelId())) { - throw evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR.getCode(), "No such channel: " + evt.getChannelId()); + throw evt.getMethod().getChannelNotFoundException(evt.getChannelId()); } } -- cgit v1.2.1