diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-02-16 13:00:39 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-02-16 13:00:39 +0000 |
commit | 267709b3bb825f6cfb652510307687bc67f3dbb0 (patch) | |
tree | 4a87d5e6b71ece9a51c3c46e5a6748d1d26f210d | |
parent | 0c7c587920299fdc5b2639e5ed7efc143a83c532 (diff) | |
download | qpid-python-267709b3bb825f6cfb652510307687bc67f3dbb0.tar.gz |
QPID-11 remove protocol literals from code.
QPID-376 use of getChannel() does not correct handle error cases when null is returned.
Updated AMQMethodBody - to have a convenience method getChannelNotFoundException to be used for QPID-376 when channel is null.
This allows the replyCode NOT_FOUND=404 to be changed to changed easily if required.
QPID-376 - Updated All Handlers to throw channel exception when channel is null.
QPID-11 Updated all handlers to use AMQConstant values rather than hardcoded literals.
- Updated AMQException to use AMQConstant values rather than int to ensure that no more literal values creep back in to the code base. Replaced all usages of int above framing to store replycode with AMQConstant to prevent creep.
Had to create new constants for literals used in code base but not yet part of spec.
405=Already Exists
406=In Use
323=Invalid Routing Key
Remove non spec constant
500=Unknown_Exchange_Name replaced with generic NOT_FOUND
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@508381 13f79535-47bb-0310-9956-ffa450edef68
43 files changed, 325 insertions, 150 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 8b36576a30..0879b77f37 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -602,7 +602,7 @@ public class AMQChannel for (RequiredDeliveryException bouncedMessage : _returnMessages) { AMQMessage message = bouncedMessage.getAMQMessage(); - message.writeReturn(session, _channelId, bouncedMessage.getReplyCode(), new AMQShortString(bouncedMessage.getMessage())); + message.writeReturn(session, _channelId, bouncedMessage.getReplyCode().getCode(), new AMQShortString(bouncedMessage.getMessage())); } _returnMessages.clear(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java index b85e3603b7..820f0122f5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java @@ -21,6 +21,7 @@ package org.apache.qpid.server; import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.queue.AMQMessage; /** @@ -44,10 +45,10 @@ public abstract class RequiredDeliveryException extends AMQException return _amqMessage; } - public int getErrorCode() + public AMQConstant getErrorCode() { return getReplyCode(); } - public abstract int getReplyCode(); + public abstract AMQConstant getReplyCode(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java index 6688318a0a..f93b2b25e6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java @@ -47,13 +47,19 @@ public class BasicAckMethodHandler implements StateAwareMethodListener<BasicAckB public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicAckBody> evt) throws AMQException { AMQProtocolSession protocolSession = stateManager.getProtocolSession(); - + if (_log.isDebugEnabled()) { _log.debug("Ack received on channel " + evt.getChannelId()); } BasicAckBody body = evt.getMethod(); final AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); + + if (channel == null) + { + throw body.getChannelNotFoundException(evt.getChannelId()); + } + // this method throws an AMQException if the delivery tag is not known channel.acknowledgeMessage(body.deliveryTag, body.multiple); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java index 1e56542b2b..7d18043f5c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java @@ -49,15 +49,21 @@ public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicC final AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); final BasicCancelBody body = evt.getMethod(); + + if (channel == null) + { + throw body.getChannelNotFoundException(evt.getChannelId()); + } + channel.unsubscribeConsumer(protocolSession, body.consumerTag); - if(!body.nowait) + if (!body.nowait) { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. final AMQFrame responseFrame = BasicCancelOkBody.createAMQFrame(evt.getChannelId(), - (byte)8, (byte)0, // AMQP version (major, minor) - body.consumerTag); // consumerTag + (byte) 8, (byte) 0, // AMQP version (major, minor) + body.consumerTag); // consumerTag protocolSession.writeFrame(responseFrame); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java index feb6f6b1fa..090988d304 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java @@ -61,11 +61,12 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic final int channelId = evt.getChannelId(); AMQChannel channel = session.getChannel(channelId); + VirtualHost vHost = session.getVirtualHost(); + if (channel == null) { - _log.error("Channel " + channelId + " not found"); - // TODO: either alert or error that the + throw body.getChannelNotFoundException(evt.getChannelId()); } else { @@ -78,12 +79,12 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic if (body.queue != null) { String msg = "No such queue, '" + body.queue + "'"; - throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), msg); + throw body.getChannelException(AMQConstant.NOT_FOUND, msg); } else { String msg = "No queue name provided, no default queue defined."; - throw body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(), msg); + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, msg); } } else @@ -108,24 +109,24 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic catch (AMQInvalidSelectorException ise) { _log.info("Closing connection due to invalid selector"); - throw body.getChannelException(AMQConstant.INVALID_SELECTOR.getCode(), ise.getMessage()); + throw body.getChannelException(AMQConstant.INVALID_SELECTOR, ise.getMessage()); } catch (ConsumerTagNotUniqueException e) { AMQShortString msg = new AMQShortString("Non-unique consumer tag, '" + body.consumerTag + "'"); - throw body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(), + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Non-unique consumer tag, '" + body.consumerTag + "'"); } catch (AMQQueue.ExistingExclusiveSubscription e) { - throw body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(), + throw body.getChannelException(AMQConstant.ACCESS_REFUSED, "Cannot subscribe to queue " + queue.getName() + " as it already has an existing exclusive consumer"); } catch (AMQQueue.ExistingSubscriptionPreventsExclusive e) { - throw body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(), + throw body.getChannelException(AMQConstant.ACCESS_REFUSED, "Cannot subscribe to queue " + queue.getName() + " exclusively as it already has a consumer"); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java index 72d7e8b8b9..b88c2ebf3a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java @@ -39,8 +39,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB AMQChannel channel = session.getChannel(channelId);
if (channel == null)
{
- _log.error("Channel " + channelId + " not found");
- // TODO: either alert or error that the
+ throw body.getChannelNotFoundException(evt.getChannelId());
}
else
{
@@ -51,12 +50,12 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB _log.info("No queue for '" + body.queue + "'");
if(body.queue!=null)
{
- throw body.getConnectionException(AMQConstant.NOT_FOUND.getCode(),
+ throw body.getConnectionException(AMQConstant.NOT_FOUND,
"No such queue, '" + body.queue + "'");
}
else
{
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(),
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
"No queue name provided, no default queue defined.");
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java index a30cc2ca3c..7e378dfd01 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java @@ -25,6 +25,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -70,8 +71,7 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi // if the exchange does not exist we raise a channel exception if (e == null) { - throw body.getChannelException(500, "Unknown exchange name"); - + throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange name"); } else { @@ -79,6 +79,12 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi // is stored in the channel. Once the final body frame has been received // it is routed to the exchange. AMQChannel channel = session.getChannel(evt.getChannelId()); + + if (channel == null) + { + throw body.getChannelNotFoundException(evt.getChannelId()); + } + channel.setPublishFrame(body, session); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java index 4bc1439e53..3cd6a87f64 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java @@ -27,6 +27,7 @@ import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.AMQChannel; public class BasicQosHandler implements StateAwareMethodListener<BasicQosBody> { @@ -40,12 +41,18 @@ public class BasicQosHandler implements StateAwareMethodListener<BasicQosBody> public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicQosBody> evt) throws AMQException { AMQProtocolSession session = stateManager.getProtocolSession(); - session.getChannel(evt.getChannelId()).setPrefetchCount(evt.getMethod().prefetchCount); - session.getChannel(evt.getChannelId()).setPrefetchSize(evt.getMethod().prefetchSize); + AMQChannel channel = session.getChannel(evt.getChannelId()); + if (channel == null) + { + throw evt.getMethod().getChannelNotFoundException(evt.getChannelId()); + } + + channel.setPrefetchCount(evt.getMethod().prefetchCount); + channel.setPrefetchSize(evt.getMethod().prefetchSize); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - session.writeFrame(BasicQosOkBody.createAMQFrame(evt.getChannelId(),(byte)8, (byte)0)); + session.writeFrame(BasicQosOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0)); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java index 9f0d096a73..5f5b7ccad1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java @@ -46,12 +46,13 @@ public class BasicRecoverMethodHandler implements StateAwareMethodListener<Basic _logger.debug("Recover received on protocol session " + session + " and channel " + evt.getChannelId()); AMQChannel channel = session.getChannel(evt.getChannelId()); + BasicRecoverBody body = evt.getMethod(); + if (channel == null) { - throw new AMQException("Unknown channel " + evt.getChannelId()); + throw body.getChannelNotFoundException(evt.getChannelId()); } - BasicRecoverBody body = evt.getMethod(); - channel.resend(session, body.requeue); + channel.resend(session, body.requeue); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java index bdb877b7ac..bfa170cfc5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java @@ -52,6 +52,12 @@ public class ChannelFlowHandler implements StateAwareMethodListener<ChannelFlowB ChannelFlowBody body = evt.getMethod(); AMQChannel channel = session.getChannel(evt.getChannelId()); + + if (channel == null) + { + throw body.getChannelNotFoundException(evt.getChannelId()); + } + channel.setSuspended(!body.active); _logger.debug("Channel.Flow for channel " + evt.getChannelId() + ", active=" + body.active); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java index 809676cfbe..a85af61327 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java @@ -71,7 +71,7 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con if(virtualHost == null) { - throw body.getConnectionException(AMQConstant.NOT_FOUND.getCode(), "Unknown virtual host: " + virtualHostName); + throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown virtual host: " + virtualHostName); } else { diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java index 575833a68f..be3ffcc698 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java @@ -76,7 +76,7 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange { if(body.passive && ((body.type == null) || body.type.length() ==0)) { - throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), "Unknown exchange: " + body.exchange); + throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + body.exchange); } else { @@ -89,14 +89,14 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange } catch(AMQUnknownExchangeType e) { - throw body.getConnectionException(AMQConstant.COMMAND_INVALID.getCode(), "Unknown exchange: " + body.exchange,e); + throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + body.exchange,e); } } } else if (!exchange.getType().equals(body.type)) { - throw new AMQConnectionException(AMQConstant.NOT_ALLOWED.getCode(), "Attempt to redeclare exchange: " + body.exchange + " of type " + exchange.getType() + " to " + body.type +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor()); + throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + body.exchange + " of type " + exchange.getType() + " to " + body.type +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor()); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java index bccb9db967..3c903b471d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQInvalidRoutingKeyException; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.QueueBindBody; import org.apache.qpid.framing.QueueBindOkBody; @@ -35,6 +36,7 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.AMQChannel; public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> { @@ -57,17 +59,25 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> VirtualHost virtualHost = session.getVirtualHost(); ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); - final QueueBindBody body = evt.getMethod(); final AMQQueue queue; if (body.queue == null) { - queue = session.getChannel(evt.getChannelId()).getDefaultQueue(); + AMQChannel channel = session.getChannel(evt.getChannelId()); + +// if (channel == null) +// { +// throw body.getChannelNotFoundException(evt.getChannelId()); +// } + + queue = channel.getDefaultQueue(); + if (queue == null) { - throw new AMQException("No default queue defined on channel and queue was null"); + throw body.getChannelException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null"); } + if (body.routingKey == null) { body.routingKey = queue.getName(); @@ -80,14 +90,25 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> if (queue == null) { - throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), "Queue " + body.queue + " does not exist."); + throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.queue + " does not exist."); } final Exchange exch = exchangeRegistry.getExchange(body.exchange); if (exch == null) { - throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), "Exchange " + body.exchange + " does not exist."); + throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.exchange + " does not exist."); + } + try + { + exch.registerQueue(body.routingKey, queue, body.arguments); + } + catch (AMQInvalidRoutingKeyException rke) + { + throw body.getChannelException(AMQConstant.INVALID_ROUTING_KEY, body.routingKey.toString()); + } + catch (AMQException e) + { + throw body.getChannelException(AMQConstant.CHANNEL_ERROR, e.toString()); } - exch.registerQueue(body.routingKey, queue, body.arguments); queue.bind(body.routingKey, exch); if (_log.isInfoEnabled()) { @@ -98,7 +119,7 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - final AMQFrame response = QueueBindOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0); + final AMQFrame response = QueueBindOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0); session.writeFrame(response); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index 1e4b7c9e57..2218ff604f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -43,6 +43,7 @@ import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.AMQChannel; public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclareBody> { @@ -83,7 +84,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar body.queue = createName(); } - AMQQueue queue = null; + AMQQueue queue; //TODO: do we need to check that the queue already exists with exactly the same "configuration"? synchronized (queueRegistry) @@ -94,8 +95,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar if(body.passive) { String msg = "Queue: " + body.queue + " not found."; - throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(),msg ); - + throw body.getChannelException(AMQConstant.NOT_FOUND,msg ); } else { @@ -116,12 +116,18 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar } else if(queue.getOwner() != null && !session.getContextKey().equals(queue.getOwner())) { - // todo - constant - throw body.getChannelException(405, "Cannot declare queue, as exclusive queue with same name declared on another connection"); + throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue, as exclusive queue with same name declared on another connection"); + } + + AMQChannel channel = session.getChannel(evt.getChannelId()); + if (channel == null) + { + throw body.getChannelNotFoundException(evt.getChannelId()); } + //set this as the default queue on the channel: - session.getChannel(evt.getChannelId()).setDefaultQueue(queue); + channel.setDefaultQueue(queue); } if (!body.nowait) diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java index 4c875692f0..0c7de312a7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java @@ -24,6 +24,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.QueueDeleteBody; import org.apache.qpid.framing.QueueDeleteOkBody; import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; @@ -31,6 +32,7 @@ import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.AMQChannel; public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteBody> { @@ -65,7 +67,15 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDelete AMQQueue queue; if(body.queue == null) { - queue = session.getChannel(evt.getChannelId()).getDefaultQueue(); + AMQChannel channel = session.getChannel(evt.getChannelId()); + + if (channel == null) + { + throw body.getChannelNotFoundException(evt.getChannelId()); + } + + //get the default queue on the channel: + queue = channel.getDefaultQueue(); } else { @@ -76,19 +86,19 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDelete { if(_failIfNotFound) { - throw body.getChannelException(404, "Queue " + body.queue + " does not exist."); + throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.queue + " does not exist."); } } else { if(body.ifEmpty && !queue.isEmpty()) { - throw body.getChannelException(406, "Queue: " + body.queue + " is not empty." ); + throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.queue + " is not empty." ); } else if(body.ifUnused && !queue.isUnused()) { // TODO - Error code - throw body.getChannelException(406, "Queue: " + body.queue + " is still used." ); + throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.queue + " is still used." ); } else diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java index 3ccc61fff0..0c00436470 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java @@ -11,6 +11,7 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.AMQChannel;
public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBody>
{
@@ -39,18 +40,27 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod VirtualHost virtualHost = session.getVirtualHost();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+ AMQChannel channel = session.getChannel(evt.getChannelId());
+
QueuePurgeBody body = evt.getMethod();
AMQQueue queue;
if(body.queue == null)
{
- queue = session.getChannel(evt.getChannelId()).getDefaultQueue();
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(evt.getChannelId());
+ }
+
+ //get the default queue on the channel:
+ queue = channel.getDefaultQueue();
+
if(queue == null)
{
if(_failIfNotFound)
{
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(),"No queue specified.");
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,"No queue specified.");
}
-
}
}
else
@@ -62,12 +72,12 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod {
if(_failIfNotFound)
{
- throw body.getChannelException(404, "Queue " + body.queue + " does not exist.");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.queue + " does not exist.");
}
}
else
{
- long purged = queue.clearQueue(session.getChannel(evt.getChannelId()).getStoreContext());
+ long purged = queue.clearQueue(channel.getStoreContext());
if(!body.nowait)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java index caf0efad67..3d7ec286f9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java @@ -47,7 +47,7 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody> public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxCommitBody> evt) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); + AMQProtocolSession session = stateManager.getProtocolSession(); try { @@ -56,14 +56,20 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody> _log.debug("Commit received on channel " + evt.getChannelId()); } AMQChannel channel = session.getChannel(evt.getChannelId()); + + if (channel == null) + { + throw evt.getMethod().getChannelNotFoundException(evt.getChannelId()); + } + channel.commit(); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - session.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0)); + session.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0)); channel.processReturns(session); } - catch(AMQException e) + catch (AMQException e) { throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage()); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java index 9088240351..8ce5a0ea73 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java @@ -45,18 +45,27 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxRollbackBody> evt) throws AMQException { AMQProtocolSession session = stateManager.getProtocolSession(); - - try{ + + try + { AMQChannel channel = session.getChannel(evt.getChannelId()); + + if (channel == null) + { + throw evt.getMethod().getChannelNotFoundException(evt.getChannelId()); + } + channel.rollback(); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - session.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0)); + session.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0)); //Now resend all the unacknowledged messages back to the original subscribers. //(Must be done after the TxnRollback-ok response). channel.resend(session, false); - }catch(AMQException e){ + } + catch (AMQException e) + { throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to rollback: " + e.getMessage()); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java index 29795e50ca..a9e478e301 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java @@ -27,6 +27,7 @@ import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.AMQChannel; public class TxSelectHandler implements StateAwareMethodListener<TxSelectBody> { @@ -44,11 +45,19 @@ public class TxSelectHandler implements StateAwareMethodListener<TxSelectBody> public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxSelectBody> evt) throws AMQException { AMQProtocolSession session = stateManager.getProtocolSession(); - - session.getChannel(evt.getChannelId()).setLocalTransactional(); + + AMQChannel channel = session.getChannel(evt.getChannelId()); + + if (channel == null) + { + throw evt.getMethod().getChannelNotFoundException(evt.getChannelId()); + } + + channel.setLocalTransactional(); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - session.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0)); + session.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0)); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index e53410420f..309fa4663a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -325,6 +325,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.debug("Content header frame received: " + frame); } + //fixme what happens if getChannel returns null getChannel(frame.getChannel()).publishContentHeader((ContentHeaderBody) frame.getBodyFrame()); } @@ -334,6 +335,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.debug("Content body frame received: " + frame); } + //fixme what happens if getChannel returns null getChannel(frame.getChannel()).publishContentBody((ContentBody) frame.getBodyFrame(), this); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java index 2049189e0f..c63490f019 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java @@ -35,8 +35,8 @@ public class NoConsumersException extends RequiredDeliveryException super("Immediate delivery is not possible.", message); } - public int getReplyCode() + public AMQConstant getReplyCode() { - return AMQConstant.NO_CONSUMERS.getCode(); + return AMQConstant.NO_CONSUMERS; } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index 05841ccfc0..6bdfeccc0f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -24,6 +24,8 @@ import java.util.Queue; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQChannelException; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.common.ClientProperties; import org.apache.qpid.framing.AMQShortString; @@ -37,11 +39,8 @@ import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; /** - * Encapsulation of a supscription to a queue. - * <p/> - * Ties together the protocol session of a subscriber, the consumer tag that - * was given out by the broker and the channel id. - * <p/> + * Encapsulation of a supscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag + * that was given out by the broker and the channel id. <p/> */ public class SubscriptionImpl implements Subscription { @@ -59,9 +58,7 @@ public class SubscriptionImpl implements Subscription private final boolean _noLocal; - /** - * True if messages need to be acknowledged - */ + /** True if messages need to be acknowledged */ private final boolean _acks; private FilterManager _filters; private final boolean _isBrowser; @@ -96,8 +93,8 @@ public class SubscriptionImpl implements Subscription { AMQChannel channel = protocolSession.getChannel(channelId); if (channel == null) - { - throw new NullPointerException("channel not found in protocol session"); + { + throw new AMQException(AMQConstant.NOT_FOUND, "channel :" + channelId + " not found in protocol session"); } this.channel = channel; @@ -172,9 +169,7 @@ public class SubscriptionImpl implements Subscription return (o instanceof SubscriptionImpl) && equals((SubscriptionImpl) o); } - /** - * Equality holds if the session matches and the channel and consumer tag are the same. - */ + /** Equality holds if the session matches and the channel and consumer tag are the same. */ private boolean equals(SubscriptionImpl psc) { return sessionKey.equals(psc.sessionKey) @@ -193,11 +188,12 @@ public class SubscriptionImpl implements Subscription } /** - * This method can be called by each of the publisher threads. - * As a result all changes to the channel object must be thread safe. + * This method can be called by each of the publisher threads. As a result all changes to the channel object must be + * thread safe. * * @param msg * @param queue + * * @throws AMQException */ public void send(AMQMessage msg, AMQQueue queue) throws AMQException @@ -224,7 +220,7 @@ public class SubscriptionImpl implements Subscription // We don't decrement the reference here as we don't want to consume the message // but we do want to send it to the client. - synchronized(channel) + synchronized (channel) { long deliveryTag = channel.getNextDeliveryTag(); @@ -260,7 +256,7 @@ public class SubscriptionImpl implements Subscription } queue.dequeue(storeContext, msg); } - synchronized(channel) + synchronized (channel) { long deliveryTag = channel.getNextDeliveryTag(); @@ -309,11 +305,11 @@ public class SubscriptionImpl implements Subscription Object localInstance; Object msgInstance; - if((protocolSession.getClientProperties() != null) && - (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) + if ((protocolSession.getClientProperties() != null) && + (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) { - if((msg.getPublisher().getClientProperties() != null) && - (msgInstance = msg.getPublisher().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) + if ((msg.getPublisher().getClientProperties() != null) && + (msgInstance = msg.getPublisher().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) { if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance))) { @@ -402,10 +398,10 @@ public class SubscriptionImpl implements Subscription // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(), - protocolSession.getProtocolMajorVersion(), - protocolSession.getProtocolMinorVersion(), - consumerTag // consumerTag - )); + protocolSession.getProtocolMajorVersion(), + protocolSession.getProtocolMinorVersion(), + consumerTag // consumerTag + )); _closed = true; } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java index 6d1e9ce99d..29efdd9513 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java @@ -55,7 +55,6 @@ import org.apache.qpid.framing.QueuePurgeBody; import org.apache.qpid.framing.TxCommitBody; import org.apache.qpid.framing.TxRollbackBody; import org.apache.qpid.framing.TxSelectBody; -import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.server.handler.BasicAckMethodHandler; @@ -231,7 +230,7 @@ public class AMQStateManager implements AMQMethodListener && (protocolSession.getChannel(evt.getChannelId()) == null) && !protocolSession.channelAwaitingClosure(evt.getChannelId())) { - throw evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR.getCode(), "No such channel: " + evt.getChannelId()); + throw evt.getMethod().getChannelNotFoundException(evt.getChannelId()); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java b/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java index 52dcfcfbfb..0bc474f6e6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java @@ -21,10 +21,11 @@ package org.apache.qpid.client; import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; public class AMQAuthenticationException extends AMQException { - public AMQAuthenticationException(int error, String msg) + public AMQAuthenticationException(AMQConstant error, String msg) { super(error,msg); } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index cb2533c2bb..ebaa22ce44 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -966,7 +966,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { if (cause instanceof AMQException) { - je = new JMSException(Integer.toString(((AMQException) cause).getErrorCode()), "Exception thrown against " + toString() + ": " + cause); + je = new JMSException(Integer.toString(((AMQException) cause).getErrorCode().getCode()), "Exception thrown against " + toString() + ": " + cause); } else { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQNoConsumersException.java b/java/client/src/main/java/org/apache/qpid/client/AMQNoConsumersException.java index 277e3f7eaf..bec2958cb9 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQNoConsumersException.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQNoConsumersException.java @@ -28,7 +28,7 @@ public class AMQNoConsumersException extends AMQUndeliveredException { public AMQNoConsumersException(String msg, Object bounced) { - super(AMQConstant.NO_CONSUMERS.getCode(), msg, bounced); + super(AMQConstant.NO_CONSUMERS, msg, bounced); } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQNoRouteException.java b/java/client/src/main/java/org/apache/qpid/client/AMQNoRouteException.java index 0e84ad75f2..6ea8413446 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQNoRouteException.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQNoRouteException.java @@ -28,7 +28,7 @@ public class AMQNoRouteException extends AMQUndeliveredException { public AMQNoRouteException(String msg, Object bounced) { - super(AMQConstant.NO_ROUTE.getCode(), msg, bounced); + super(AMQConstant.NO_ROUTE, msg, bounced); } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java index d011d02a91..6ef187286b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java @@ -24,6 +24,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQChannelClosedException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInvalidSelectorException; +import org.apache.qpid.AMQInvalidRoutingKeyException; import org.apache.qpid.client.AMQNoConsumersException; import org.apache.qpid.client.AMQNoRouteException; import org.apache.qpid.client.protocol.AMQProtocolSession; @@ -52,34 +53,39 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener _logger.debug("ChannelClose method received"); ChannelCloseBody method = (ChannelCloseBody) evt.getMethod(); - int errorCode = method.replyCode; + AMQConstant errorCode = AMQConstant.getConstant(method.replyCode); AMQShortString reason = method.replyText; if (_logger.isDebugEnabled()) { _logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason); } - // TODO: Be aware of possible changes to parameter order as versions change. AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), method.getMajor(), method.getMinor()); protocolSession.writeFrame(frame); - if (errorCode != AMQConstant.REPLY_SUCCESS.getCode()) + if (errorCode != AMQConstant.REPLY_SUCCESS) { _logger.error("Channel close received with errorCode " + errorCode + ", and reason " + reason); - if (errorCode == AMQConstant.NO_CONSUMERS.getCode()) + if (errorCode == AMQConstant.NO_CONSUMERS) { throw new AMQNoConsumersException("Error: " + reason, null); } - else if (errorCode == AMQConstant.NO_ROUTE.getCode()) + else if (errorCode == AMQConstant.NO_ROUTE) { throw new AMQNoRouteException("Error: " + reason, null); } - else if (errorCode == AMQConstant.INVALID_SELECTOR.getCode()) + else if (errorCode == AMQConstant.INVALID_SELECTOR) { - _logger.info("Broker responded with Invalid Selector."); + _logger.debug("Broker responded with Invalid Selector."); throw new AMQInvalidSelectorException(String.valueOf(reason)); } + else if (errorCode == AMQConstant.INVALID_ROUTING_KEY) + { + _logger.debug("Broker responded with Invalid Routing Key."); + + throw new AMQInvalidRoutingKeyException(String.valueOf(reason)); + } else { throw new AMQChannelClosedException(errorCode, "Error: " + reason); diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java index f928ab56eb..57d987712a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java @@ -57,16 +57,16 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener // does it matter //stateManager.changeState(AMQState.CONNECTION_CLOSING); - int errorCode = method.replyCode; + AMQConstant errorCode = AMQConstant.getConstant(method.replyCode); AMQShortString reason = method.replyText; // TODO: check whether channel id of zero is appropriate // Be aware of possible changes to parameter order as versions change. protocolSession.writeFrame(ConnectionCloseOkBody.createAMQFrame((short)0, method.getMajor(), method.getMinor())); - if (errorCode != 200) + if (errorCode != AMQConstant.REPLY_SUCCESS) { - if(errorCode == AMQConstant.NOT_ALLOWED.getCode()) + if(errorCode == AMQConstant.NOT_ALLOWED) { _logger.info("Authentication Error:"+Thread.currentThread().getName()); diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnexpectedBodyReceivedException.java b/java/client/src/main/java/org/apache/qpid/client/message/UnexpectedBodyReceivedException.java index 51a9aa7226..21526ac6d2 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/UnexpectedBodyReceivedException.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/UnexpectedBodyReceivedException.java @@ -22,6 +22,7 @@ package org.apache.qpid.client.message; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; public class UnexpectedBodyReceivedException extends AMQException { @@ -36,7 +37,7 @@ public class UnexpectedBodyReceivedException extends AMQException super(logger, msg); } - public UnexpectedBodyReceivedException(Logger logger, int errorCode, String msg) + public UnexpectedBodyReceivedException(Logger logger, AMQConstant errorCode, String msg) { super(logger, errorCode, msg); } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index d4d700966a..055109d3be 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -48,6 +48,7 @@ import org.apache.qpid.framing.ProtocolInitiation; import org.apache.qpid.framing.ProtocolVersionList; import org.apache.qpid.framing.VersionSpecificRegistry; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.protocol.AMQConstant; /** * Wrapper for protocol session that provides type-safe access to session attributes. @@ -389,7 +390,7 @@ public class AMQProtocolSession implements ProtocolVersionList, AMQVersionAwareP * initiated the channel close, false if the channel close is just the server * responding to the client's earlier request to close the channel. */ - public boolean channelClosed(int channelId, int code, String text) throws AMQException + public boolean channelClosed(int channelId, AMQConstant code, String text) throws AMQException { final Integer chId = channelId; // if this is not a response to an earlier request to close the channel diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java index 9b214e88f9..f957df2c34 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java @@ -63,7 +63,7 @@ public class DurableSubscriptionTest extends TestCase Session session2 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE); TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription"); - con.start(); + con.start(); producer.send(session1.createTextMessage("A")); diff --git a/java/common/src/main/java/org/apache/qpid/AMQChannelClosedException.java b/java/common/src/main/java/org/apache/qpid/AMQChannelClosedException.java index cd8b40c6da..272933ca04 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQChannelClosedException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQChannelClosedException.java @@ -20,12 +20,14 @@ */ package org.apache.qpid; +import org.apache.qpid.protocol.AMQConstant; + /** * AMQ channel closed exception. */ public class AMQChannelClosedException extends AMQException { - public AMQChannelClosedException(int errorCode, String msg) + public AMQChannelClosedException(AMQConstant errorCode, String msg) { super(errorCode, msg); } diff --git a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java index d1750ebbb5..d8c9b287bd 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java @@ -23,6 +23,7 @@ package org.apache.qpid; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ChannelCloseBody; +import org.apache.qpid.protocol.AMQConstant; public class AMQChannelException extends AMQException { @@ -32,7 +33,7 @@ public class AMQChannelException extends AMQException private final byte major; private final byte minor; - public AMQChannelException(int errorCode, String msg, int classId, int methodId, byte major, byte minor, Throwable t) + public AMQChannelException(AMQConstant errorCode, String msg, int classId, int methodId, byte major, byte minor, Throwable t) { super(errorCode, msg, t); _classId = classId; @@ -41,7 +42,7 @@ public class AMQChannelException extends AMQException this.minor = minor; } - public AMQChannelException(int errorCode, String msg, int classId, int methodId, byte major, byte minor) + public AMQChannelException(AMQConstant errorCode, String msg, int classId, int methodId, byte major, byte minor) { super(errorCode, msg); _classId = classId; @@ -52,6 +53,6 @@ public class AMQChannelException extends AMQException public AMQFrame getCloseFrame(int channel) { - return ChannelCloseBody.createAMQFrame(channel, major, minor, _classId, _methodId, getErrorCode(), new AMQShortString(getMessage())); + return ChannelCloseBody.createAMQFrame(channel, major, minor, _classId, _methodId, getErrorCode().getCode(), new AMQShortString(getMessage())); } } diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java index 6ec18bad20..e0ed16a9f0 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java @@ -20,12 +20,14 @@ */ package org.apache.qpid; +import org.apache.qpid.protocol.AMQConstant; + /** * AMQ channel closed exception. */ public class AMQConnectionClosedException extends AMQException { - public AMQConnectionClosedException(int errorCode, String msg) + public AMQConnectionClosedException(AMQConstant errorCode, String msg) { super(errorCode, msg); } diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java index c6a874bcf3..c4f80191a3 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java @@ -24,6 +24,7 @@ package org.apache.qpid; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.protocol.AMQConstant; public class AMQConnectionException extends AMQException { @@ -34,7 +35,7 @@ public class AMQConnectionException extends AMQException private final byte minor; boolean _closeConnetion; - public AMQConnectionException(int errorCode, String msg, int classId, int methodId, byte major, byte minor, Throwable t) + public AMQConnectionException(AMQConstant errorCode, String msg, int classId, int methodId, byte major, byte minor, Throwable t) { super(errorCode, msg, t); _classId = classId; @@ -43,7 +44,7 @@ public class AMQConnectionException extends AMQException this.minor = minor; } - public AMQConnectionException(int errorCode, String msg, int classId, int methodId, byte major, byte minor) + public AMQConnectionException(AMQConstant errorCode, String msg, int classId, int methodId, byte major, byte minor) { super(errorCode, msg); _classId = classId; @@ -56,7 +57,7 @@ public class AMQConnectionException extends AMQException public AMQFrame getCloseFrame(int channel) { - return ConnectionCloseBody.createAMQFrame(channel, major, minor, _classId, _methodId, getErrorCode(), new AMQShortString(getMessage())); + return ConnectionCloseBody.createAMQFrame(channel, major, minor, _classId, _methodId, getErrorCode().getCode(), new AMQShortString(getMessage())); } diff --git a/java/common/src/main/java/org/apache/qpid/AMQException.java b/java/common/src/main/java/org/apache/qpid/AMQException.java index 93c31e4fa8..5c11ec18ca 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQException.java @@ -21,31 +21,36 @@ package org.apache.qpid; import org.apache.log4j.Logger; +import org.apache.qpid.protocol.AMQConstant; /** * Generic AMQ exception. */ public class AMQException extends Exception { - private int _errorCode; + private AMQConstant _errorCode; public AMQException(String message) { super(message); + //fixme This method needs removed and all AMQExceptions need a valid error code + _errorCode = AMQConstant.getConstant(-1); } public AMQException(String msg, Throwable t) { super(msg, t); + //fixme This method needs removed and all AMQExceptions need a valid error code + _errorCode = AMQConstant.getConstant(-1); } - public AMQException(int errorCode, String msg, Throwable t) + public AMQException(AMQConstant errorCode, String msg, Throwable t) { super(msg + " [error code " + errorCode + ']', t); _errorCode = errorCode; } - public AMQException(int errorCode, String msg) + public AMQException(AMQConstant errorCode, String msg) { super(msg + " [error code " + errorCode + ']'); _errorCode = errorCode; @@ -63,13 +68,13 @@ public class AMQException extends Exception logger.error(getMessage(), this); } - public AMQException(Logger logger, int errorCode, String msg) + public AMQException(Logger logger, AMQConstant errorCode, String msg) { this(errorCode, msg); logger.error(getMessage(), this); } - public int getErrorCode() + public AMQConstant getErrorCode() { return _errorCode; } diff --git a/java/common/src/main/java/org/apache/qpid/AMQInvalidRoutingKeyException.java b/java/common/src/main/java/org/apache/qpid/AMQInvalidRoutingKeyException.java new file mode 100644 index 0000000000..3293e2523d --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/AMQInvalidRoutingKeyException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid; + +import org.apache.qpid.protocol.AMQConstant; + +public class AMQInvalidRoutingKeyException extends AMQException +{ + public AMQInvalidRoutingKeyException(String message) + { + super(AMQConstant.INVALID_ROUTING_KEY,message); + } +} diff --git a/java/common/src/main/java/org/apache/qpid/AMQInvalidSelectorException.java b/java/common/src/main/java/org/apache/qpid/AMQInvalidSelectorException.java index dcd039b789..9d003514ad 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQInvalidSelectorException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQInvalidSelectorException.java @@ -26,6 +26,6 @@ public class AMQInvalidSelectorException extends AMQException { public AMQInvalidSelectorException(String message) { - super(AMQConstant.INVALID_SELECTOR.getCode(),message); + super(AMQConstant.INVALID_SELECTOR,message); } } diff --git a/java/common/src/main/java/org/apache/qpid/AMQUndeliveredException.java b/java/common/src/main/java/org/apache/qpid/AMQUndeliveredException.java index 4944ccc371..ad5aff7bb6 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQUndeliveredException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQUndeliveredException.java @@ -20,6 +20,8 @@ */ package org.apache.qpid; +import org.apache.qpid.protocol.AMQConstant; + /** * Generic AMQ exception. */ @@ -27,7 +29,7 @@ public class AMQUndeliveredException extends AMQException { private Object _bounced; - public AMQUndeliveredException(int errorCode, String msg, Object bounced) + public AMQUndeliveredException(AMQConstant errorCode, String msg, Object bounced) { super(errorCode, msg); diff --git a/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java b/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java index 67af0b0b74..958f59191f 100644 --- a/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java +++ b/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java @@ -22,6 +22,7 @@ package org.apache.qpid.configuration; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; /** * Indicates an error parsing a property expansion. @@ -38,12 +39,12 @@ public class PropertyException extends AMQException super(msg, t); } - public PropertyException(int errorCode, String msg, Throwable t) + public PropertyException(AMQConstant errorCode, String msg, Throwable t) { super(errorCode, msg, t); } - public PropertyException(int errorCode, String msg) + public PropertyException(AMQConstant errorCode, String msg) { super(errorCode, msg); } @@ -58,7 +59,7 @@ public class PropertyException extends AMQException super(logger, msg); } - public PropertyException(Logger logger, int errorCode, String msg) + public PropertyException(Logger logger, AMQConstant errorCode, String msg) { super(logger, errorCode, msg); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java index 3fa5b150ab..111d9a8f20 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java @@ -23,19 +23,26 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; +import org.apache.qpid.protocol.AMQConstant; public abstract class AMQMethodBody extends AMQBody { - public static final byte TYPE = 1; - - /** - * AMQP version - */ + public static final byte TYPE = 1; + + /** AMQP version */ protected byte major; protected byte minor; - public byte getMajor() { return major; } - public byte getMinor() { return minor; } - + + public byte getMajor() + { + return major; + } + + public byte getMinor() + { + return minor; + } + public AMQMethodBody(byte major, byte minor) { this.major = major; @@ -45,14 +52,10 @@ public abstract class AMQMethodBody extends AMQBody /** unsigned short */ protected abstract int getBodySize(); - /** - * @return unsigned short - */ + /** @return unsigned short */ protected abstract int getClazz(); - /** - * @return unsigned short - */ + /** @return unsigned short */ protected abstract int getMethod(); protected abstract void writeMethodPayload(ByteBuffer buffer); @@ -90,27 +93,38 @@ public abstract class AMQMethodBody extends AMQBody } /** - * Creates an AMQChannelException for the corresponding body type (a channel exception - * should include the class and method ids of the body it resulted from). + * Creates an AMQChannelException for the corresponding body type (a channel exception should include the class and + * method ids of the body it resulted from). + */ + + /** + * Convenience Method to create a channel not found exception + * + * @param channelId The channel id that is not found + * + * @return new AMQChannelException */ - public AMQChannelException getChannelException(int code, String message) + public AMQChannelException getChannelNotFoundException(int channelId) + { + return getChannelException(AMQConstant.NOT_FOUND, "Channel not found for id:" + channelId); + } + + public AMQChannelException getChannelException(AMQConstant code, String message) { return new AMQChannelException(code, message, getClazz(), getMethod(), major, minor); } - public AMQChannelException getChannelException(int code, String message, Throwable cause) + public AMQChannelException getChannelException(AMQConstant code, String message, Throwable cause) { return new AMQChannelException(code, message, getClazz(), getMethod(), major, minor, cause); } - public AMQConnectionException getConnectionException(int code, String message) + public AMQConnectionException getConnectionException(AMQConstant code, String message) { return new AMQConnectionException(code, message, getClazz(), getMethod(), major, minor); } - - - public AMQConnectionException getConnectionException(int code, String message, Throwable cause) + public AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause) { return new AMQConnectionException(code, message, getClazz(), getMethod(), major, minor, cause); } diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java index a4d90e9ee3..05365de137 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java @@ -69,7 +69,7 @@ public final class AMQConstant public static final AMQConstant MESSAGE_TOO_LARGE = new AMQConstant(311, "message too large", true); public static final AMQConstant NO_ROUTE = new AMQConstant(312, "no route", true); - + public static final AMQConstant NO_CONSUMERS = new AMQConstant(313, "no consumers", true); public static final AMQConstant CONTEXT_IN_USE = new AMQConstant(320, "context in use", true); @@ -78,12 +78,18 @@ public final class AMQConstant public static final AMQConstant INVALID_SELECTOR = new AMQConstant(322, "selector invalid", true); + public static final AMQConstant INVALID_ROUTING_KEY = new AMQConstant(323, "routing key invalid", true); + public static final AMQConstant INVALID_PATH = new AMQConstant(402, "invalid path", true); public static final AMQConstant ACCESS_REFUSED = new AMQConstant(403, "access refused", true); public static final AMQConstant NOT_FOUND = new AMQConstant(404, "not found", true); + public static final AMQConstant ALREADY_EXISTS = new AMQConstant(405, "Already exists", true); + + public static final AMQConstant IN_USE = new AMQConstant(406, "In use", true); + public static final AMQConstant FRAME_ERROR = new AMQConstant(501, "frame error", true); public static final AMQConstant SYNTAX_ERROR = new AMQConstant(502, "syntax error", true); |