diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2007-02-16 13:00:39 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2007-02-16 13:00:39 +0000 |
| commit | 267709b3bb825f6cfb652510307687bc67f3dbb0 (patch) | |
| tree | 4a87d5e6b71ece9a51c3c46e5a6748d1d26f210d /java/broker | |
| parent | 0c7c587920299fdc5b2639e5ed7efc143a83c532 (diff) | |
| download | qpid-python-267709b3bb825f6cfb652510307687bc67f3dbb0.tar.gz | |
QPID-11 remove protocol literals from code.
QPID-376 use of getChannel() does not correct handle error cases when null is returned.
Updated AMQMethodBody - to have a convenience method getChannelNotFoundException to be used for QPID-376 when channel is null.
This allows the replyCode NOT_FOUND=404 to be changed to changed easily if required.
QPID-376 - Updated All Handlers to throw channel exception when channel is null.
QPID-11 Updated all handlers to use AMQConstant values rather than hardcoded literals.
- Updated AMQException to use AMQConstant values rather than int to ensure that no more literal values creep back in to the code base. Replaced all usages of int above framing to store replycode with AMQConstant to prevent creep.
Had to create new constants for literals used in code base but not yet part of spec.
405=Already Exists
406=In Use
323=Invalid Routing Key
Remove non spec constant
500=Unknown_Exchange_Name replaced with generic NOT_FOUND
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@508381 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
23 files changed, 193 insertions, 92 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 8b36576a30..0879b77f37 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -602,7 +602,7 @@ public class AMQChannel for (RequiredDeliveryException bouncedMessage : _returnMessages) { AMQMessage message = bouncedMessage.getAMQMessage(); - message.writeReturn(session, _channelId, bouncedMessage.getReplyCode(), new AMQShortString(bouncedMessage.getMessage())); + message.writeReturn(session, _channelId, bouncedMessage.getReplyCode().getCode(), new AMQShortString(bouncedMessage.getMessage())); } _returnMessages.clear(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java index b85e3603b7..820f0122f5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java @@ -21,6 +21,7 @@ package org.apache.qpid.server; import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.queue.AMQMessage; /** @@ -44,10 +45,10 @@ public abstract class RequiredDeliveryException extends AMQException return _amqMessage; } - public int getErrorCode() + public AMQConstant getErrorCode() { return getReplyCode(); } - public abstract int getReplyCode(); + public abstract AMQConstant getReplyCode(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java index 6688318a0a..f93b2b25e6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java @@ -47,13 +47,19 @@ public class BasicAckMethodHandler implements StateAwareMethodListener<BasicAckB public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicAckBody> evt) throws AMQException { AMQProtocolSession protocolSession = stateManager.getProtocolSession(); - + if (_log.isDebugEnabled()) { _log.debug("Ack received on channel " + evt.getChannelId()); } BasicAckBody body = evt.getMethod(); final AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); + + if (channel == null) + { + throw body.getChannelNotFoundException(evt.getChannelId()); + } + // this method throws an AMQException if the delivery tag is not known channel.acknowledgeMessage(body.deliveryTag, body.multiple); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java index 1e56542b2b..7d18043f5c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java @@ -49,15 +49,21 @@ public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicC final AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); final BasicCancelBody body = evt.getMethod(); + + if (channel == null) + { + throw body.getChannelNotFoundException(evt.getChannelId()); + } + channel.unsubscribeConsumer(protocolSession, body.consumerTag); - if(!body.nowait) + if (!body.nowait) { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. final AMQFrame responseFrame = BasicCancelOkBody.createAMQFrame(evt.getChannelId(), - (byte)8, (byte)0, // AMQP version (major, minor) - body.consumerTag); // consumerTag + (byte) 8, (byte) 0, // AMQP version (major, minor) + body.consumerTag); // consumerTag protocolSession.writeFrame(responseFrame); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java index feb6f6b1fa..090988d304 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java @@ -61,11 +61,12 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic final int channelId = evt.getChannelId(); AMQChannel channel = session.getChannel(channelId); + VirtualHost vHost = session.getVirtualHost(); + if (channel == null) { - _log.error("Channel " + channelId + " not found"); - // TODO: either alert or error that the + throw body.getChannelNotFoundException(evt.getChannelId()); } else { @@ -78,12 +79,12 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic if (body.queue != null) { String msg = "No such queue, '" + body.queue + "'"; - throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), msg); + throw body.getChannelException(AMQConstant.NOT_FOUND, msg); } else { String msg = "No queue name provided, no default queue defined."; - throw body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(), msg); + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, msg); } } else @@ -108,24 +109,24 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic catch (AMQInvalidSelectorException ise) { _log.info("Closing connection due to invalid selector"); - throw body.getChannelException(AMQConstant.INVALID_SELECTOR.getCode(), ise.getMessage()); + throw body.getChannelException(AMQConstant.INVALID_SELECTOR, ise.getMessage()); } catch (ConsumerTagNotUniqueException e) { AMQShortString msg = new AMQShortString("Non-unique consumer tag, '" + body.consumerTag + "'"); - throw body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(), + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Non-unique consumer tag, '" + body.consumerTag + "'"); } catch (AMQQueue.ExistingExclusiveSubscription e) { - throw body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(), + throw body.getChannelException(AMQConstant.ACCESS_REFUSED, "Cannot subscribe to queue " + queue.getName() + " as it already has an existing exclusive consumer"); } catch (AMQQueue.ExistingSubscriptionPreventsExclusive e) { - throw body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(), + throw body.getChannelException(AMQConstant.ACCESS_REFUSED, "Cannot subscribe to queue " + queue.getName() + " exclusively as it already has a consumer"); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java index 72d7e8b8b9..b88c2ebf3a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java @@ -39,8 +39,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB AMQChannel channel = session.getChannel(channelId);
if (channel == null)
{
- _log.error("Channel " + channelId + " not found");
- // TODO: either alert or error that the
+ throw body.getChannelNotFoundException(evt.getChannelId());
}
else
{
@@ -51,12 +50,12 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB _log.info("No queue for '" + body.queue + "'");
if(body.queue!=null)
{
- throw body.getConnectionException(AMQConstant.NOT_FOUND.getCode(),
+ throw body.getConnectionException(AMQConstant.NOT_FOUND,
"No such queue, '" + body.queue + "'");
}
else
{
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(),
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
"No queue name provided, no default queue defined.");
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java index a30cc2ca3c..7e378dfd01 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java @@ -25,6 +25,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -70,8 +71,7 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi // if the exchange does not exist we raise a channel exception if (e == null) { - throw body.getChannelException(500, "Unknown exchange name"); - + throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange name"); } else { @@ -79,6 +79,12 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi // is stored in the channel. Once the final body frame has been received // it is routed to the exchange. AMQChannel channel = session.getChannel(evt.getChannelId()); + + if (channel == null) + { + throw body.getChannelNotFoundException(evt.getChannelId()); + } + channel.setPublishFrame(body, session); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java index 4bc1439e53..3cd6a87f64 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java @@ -27,6 +27,7 @@ import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.AMQChannel; public class BasicQosHandler implements StateAwareMethodListener<BasicQosBody> { @@ -40,12 +41,18 @@ public class BasicQosHandler implements StateAwareMethodListener<BasicQosBody> public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicQosBody> evt) throws AMQException { AMQProtocolSession session = stateManager.getProtocolSession(); - session.getChannel(evt.getChannelId()).setPrefetchCount(evt.getMethod().prefetchCount); - session.getChannel(evt.getChannelId()).setPrefetchSize(evt.getMethod().prefetchSize); + AMQChannel channel = session.getChannel(evt.getChannelId()); + if (channel == null) + { + throw evt.getMethod().getChannelNotFoundException(evt.getChannelId()); + } + + channel.setPrefetchCount(evt.getMethod().prefetchCount); + channel.setPrefetchSize(evt.getMethod().prefetchSize); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - session.writeFrame(BasicQosOkBody.createAMQFrame(evt.getChannelId(),(byte)8, (byte)0)); + session.writeFrame(BasicQosOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0)); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java index 9f0d096a73..5f5b7ccad1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java @@ -46,12 +46,13 @@ public class BasicRecoverMethodHandler implements StateAwareMethodListener<Basic _logger.debug("Recover received on protocol session " + session + " and channel " + evt.getChannelId()); AMQChannel channel = session.getChannel(evt.getChannelId()); + BasicRecoverBody body = evt.getMethod(); + if (channel == null) { - throw new AMQException("Unknown channel " + evt.getChannelId()); + throw body.getChannelNotFoundException(evt.getChannelId()); } - BasicRecoverBody body = evt.getMethod(); - channel.resend(session, body.requeue); + channel.resend(session, body.requeue); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java index bdb877b7ac..bfa170cfc5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java @@ -52,6 +52,12 @@ public class ChannelFlowHandler implements StateAwareMethodListener<ChannelFlowB ChannelFlowBody body = evt.getMethod(); AMQChannel channel = session.getChannel(evt.getChannelId()); + + if (channel == null) + { + throw body.getChannelNotFoundException(evt.getChannelId()); + } + channel.setSuspended(!body.active); _logger.debug("Channel.Flow for channel " + evt.getChannelId() + ", active=" + body.active); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java index 809676cfbe..a85af61327 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java @@ -71,7 +71,7 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con if(virtualHost == null) { - throw body.getConnectionException(AMQConstant.NOT_FOUND.getCode(), "Unknown virtual host: " + virtualHostName); + throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown virtual host: " + virtualHostName); } else { diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java index 575833a68f..be3ffcc698 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java @@ -76,7 +76,7 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange { if(body.passive && ((body.type == null) || body.type.length() ==0)) { - throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), "Unknown exchange: " + body.exchange); + throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + body.exchange); } else { @@ -89,14 +89,14 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange } catch(AMQUnknownExchangeType e) { - throw body.getConnectionException(AMQConstant.COMMAND_INVALID.getCode(), "Unknown exchange: " + body.exchange,e); + throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + body.exchange,e); } } } else if (!exchange.getType().equals(body.type)) { - throw new AMQConnectionException(AMQConstant.NOT_ALLOWED.getCode(), "Attempt to redeclare exchange: " + body.exchange + " of type " + exchange.getType() + " to " + body.type +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor()); + throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + body.exchange + " of type " + exchange.getType() + " to " + body.type +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor()); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java index bccb9db967..3c903b471d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQInvalidRoutingKeyException; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.QueueBindBody; import org.apache.qpid.framing.QueueBindOkBody; @@ -35,6 +36,7 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.AMQChannel; public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> { @@ -57,17 +59,25 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> VirtualHost virtualHost = session.getVirtualHost(); ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); - final QueueBindBody body = evt.getMethod(); final AMQQueue queue; if (body.queue == null) { - queue = session.getChannel(evt.getChannelId()).getDefaultQueue(); + AMQChannel channel = session.getChannel(evt.getChannelId()); + +// if (channel == null) +// { +// throw body.getChannelNotFoundException(evt.getChannelId()); +// } + + queue = channel.getDefaultQueue(); + if (queue == null) { - throw new AMQException("No default queue defined on channel and queue was null"); + throw body.getChannelException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null"); } + if (body.routingKey == null) { body.routingKey = queue.getName(); @@ -80,14 +90,25 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> if (queue == null) { - throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), "Queue " + body.queue + " does not exist."); + throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.queue + " does not exist."); } final Exchange exch = exchangeRegistry.getExchange(body.exchange); if (exch == null) { - throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), "Exchange " + body.exchange + " does not exist."); + throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.exchange + " does not exist."); + } + try + { + exch.registerQueue(body.routingKey, queue, body.arguments); + } + catch (AMQInvalidRoutingKeyException rke) + { + throw body.getChannelException(AMQConstant.INVALID_ROUTING_KEY, body.routingKey.toString()); + } + catch (AMQException e) + { + throw body.getChannelException(AMQConstant.CHANNEL_ERROR, e.toString()); } - exch.registerQueue(body.routingKey, queue, body.arguments); queue.bind(body.routingKey, exch); if (_log.isInfoEnabled()) { @@ -98,7 +119,7 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - final AMQFrame response = QueueBindOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0); + final AMQFrame response = QueueBindOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0); session.writeFrame(response); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index 1e4b7c9e57..2218ff604f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -43,6 +43,7 @@ import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.AMQChannel; public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclareBody> { @@ -83,7 +84,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar body.queue = createName(); } - AMQQueue queue = null; + AMQQueue queue; //TODO: do we need to check that the queue already exists with exactly the same "configuration"? synchronized (queueRegistry) @@ -94,8 +95,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar if(body.passive) { String msg = "Queue: " + body.queue + " not found."; - throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(),msg ); - + throw body.getChannelException(AMQConstant.NOT_FOUND,msg ); } else { @@ -116,12 +116,18 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar } else if(queue.getOwner() != null && !session.getContextKey().equals(queue.getOwner())) { - // todo - constant - throw body.getChannelException(405, "Cannot declare queue, as exclusive queue with same name declared on another connection"); + throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue, as exclusive queue with same name declared on another connection"); + } + + AMQChannel channel = session.getChannel(evt.getChannelId()); + if (channel == null) + { + throw body.getChannelNotFoundException(evt.getChannelId()); } + //set this as the default queue on the channel: - session.getChannel(evt.getChannelId()).setDefaultQueue(queue); + channel.setDefaultQueue(queue); } if (!body.nowait) diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java index 4c875692f0..0c7de312a7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java @@ -24,6 +24,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.QueueDeleteBody; import org.apache.qpid.framing.QueueDeleteOkBody; import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; @@ -31,6 +32,7 @@ import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.AMQChannel; public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteBody> { @@ -65,7 +67,15 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDelete AMQQueue queue; if(body.queue == null) { - queue = session.getChannel(evt.getChannelId()).getDefaultQueue(); + AMQChannel channel = session.getChannel(evt.getChannelId()); + + if (channel == null) + { + throw body.getChannelNotFoundException(evt.getChannelId()); + } + + //get the default queue on the channel: + queue = channel.getDefaultQueue(); } else { @@ -76,19 +86,19 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDelete { if(_failIfNotFound) { - throw body.getChannelException(404, "Queue " + body.queue + " does not exist."); + throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.queue + " does not exist."); } } else { if(body.ifEmpty && !queue.isEmpty()) { - throw body.getChannelException(406, "Queue: " + body.queue + " is not empty." ); + throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.queue + " is not empty." ); } else if(body.ifUnused && !queue.isUnused()) { // TODO - Error code - throw body.getChannelException(406, "Queue: " + body.queue + " is still used." ); + throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.queue + " is still used." ); } else diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java index 3ccc61fff0..0c00436470 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java @@ -11,6 +11,7 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.AMQChannel;
public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBody>
{
@@ -39,18 +40,27 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod VirtualHost virtualHost = session.getVirtualHost();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+ AMQChannel channel = session.getChannel(evt.getChannelId());
+
QueuePurgeBody body = evt.getMethod();
AMQQueue queue;
if(body.queue == null)
{
- queue = session.getChannel(evt.getChannelId()).getDefaultQueue();
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(evt.getChannelId());
+ }
+
+ //get the default queue on the channel:
+ queue = channel.getDefaultQueue();
+
if(queue == null)
{
if(_failIfNotFound)
{
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(),"No queue specified.");
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,"No queue specified.");
}
-
}
}
else
@@ -62,12 +72,12 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod {
if(_failIfNotFound)
{
- throw body.getChannelException(404, "Queue " + body.queue + " does not exist.");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.queue + " does not exist.");
}
}
else
{
- long purged = queue.clearQueue(session.getChannel(evt.getChannelId()).getStoreContext());
+ long purged = queue.clearQueue(channel.getStoreContext());
if(!body.nowait)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java index caf0efad67..3d7ec286f9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java @@ -47,7 +47,7 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody> public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxCommitBody> evt) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); + AMQProtocolSession session = stateManager.getProtocolSession(); try { @@ -56,14 +56,20 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody> _log.debug("Commit received on channel " + evt.getChannelId()); } AMQChannel channel = session.getChannel(evt.getChannelId()); + + if (channel == null) + { + throw evt.getMethod().getChannelNotFoundException(evt.getChannelId()); + } + channel.commit(); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - session.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0)); + session.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0)); channel.processReturns(session); } - catch(AMQException e) + catch (AMQException e) { throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage()); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java index 9088240351..8ce5a0ea73 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java @@ -45,18 +45,27 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxRollbackBody> evt) throws AMQException { AMQProtocolSession session = stateManager.getProtocolSession(); - - try{ + + try + { AMQChannel channel = session.getChannel(evt.getChannelId()); + + if (channel == null) + { + throw evt.getMethod().getChannelNotFoundException(evt.getChannelId()); + } + channel.rollback(); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - session.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0)); + session.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0)); //Now resend all the unacknowledged messages back to the original subscribers. //(Must be done after the TxnRollback-ok response). channel.resend(session, false); - }catch(AMQException e){ + } + catch (AMQException e) + { throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to rollback: " + e.getMessage()); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java index 29795e50ca..a9e478e301 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java @@ -27,6 +27,7 @@ import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.AMQChannel; public class TxSelectHandler implements StateAwareMethodListener<TxSelectBody> { @@ -44,11 +45,19 @@ public class TxSelectHandler implements StateAwareMethodListener<TxSelectBody> public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxSelectBody> evt) throws AMQException { AMQProtocolSession session = stateManager.getProtocolSession(); - - session.getChannel(evt.getChannelId()).setLocalTransactional(); + + AMQChannel channel = session.getChannel(evt.getChannelId()); + + if (channel == null) + { + throw evt.getMethod().getChannelNotFoundException(evt.getChannelId()); + } + + channel.setLocalTransactional(); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - session.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0)); + session.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0)); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index e53410420f..309fa4663a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -325,6 +325,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.debug("Content header frame received: " + frame); } + //fixme what happens if getChannel returns null getChannel(frame.getChannel()).publishContentHeader((ContentHeaderBody) frame.getBodyFrame()); } @@ -334,6 +335,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.debug("Content body frame received: " + frame); } + //fixme what happens if getChannel returns null getChannel(frame.getChannel()).publishContentBody((ContentBody) frame.getBodyFrame(), this); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java index 2049189e0f..c63490f019 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java @@ -35,8 +35,8 @@ public class NoConsumersException extends RequiredDeliveryException super("Immediate delivery is not possible.", message); } - public int getReplyCode() + public AMQConstant getReplyCode() { - return AMQConstant.NO_CONSUMERS.getCode(); + return AMQConstant.NO_CONSUMERS; } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index 05841ccfc0..6bdfeccc0f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -24,6 +24,8 @@ import java.util.Queue; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQChannelException; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.common.ClientProperties; import org.apache.qpid.framing.AMQShortString; @@ -37,11 +39,8 @@ import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; /** - * Encapsulation of a supscription to a queue. - * <p/> - * Ties together the protocol session of a subscriber, the consumer tag that - * was given out by the broker and the channel id. - * <p/> + * Encapsulation of a supscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag + * that was given out by the broker and the channel id. <p/> */ public class SubscriptionImpl implements Subscription { @@ -59,9 +58,7 @@ public class SubscriptionImpl implements Subscription private final boolean _noLocal; - /** - * True if messages need to be acknowledged - */ + /** True if messages need to be acknowledged */ private final boolean _acks; private FilterManager _filters; private final boolean _isBrowser; @@ -96,8 +93,8 @@ public class SubscriptionImpl implements Subscription { AMQChannel channel = protocolSession.getChannel(channelId); if (channel == null) - { - throw new NullPointerException("channel not found in protocol session"); + { + throw new AMQException(AMQConstant.NOT_FOUND, "channel :" + channelId + " not found in protocol session"); } this.channel = channel; @@ -172,9 +169,7 @@ public class SubscriptionImpl implements Subscription return (o instanceof SubscriptionImpl) && equals((SubscriptionImpl) o); } - /** - * Equality holds if the session matches and the channel and consumer tag are the same. - */ + /** Equality holds if the session matches and the channel and consumer tag are the same. */ private boolean equals(SubscriptionImpl psc) { return sessionKey.equals(psc.sessionKey) @@ -193,11 +188,12 @@ public class SubscriptionImpl implements Subscription } /** - * This method can be called by each of the publisher threads. - * As a result all changes to the channel object must be thread safe. + * This method can be called by each of the publisher threads. As a result all changes to the channel object must be + * thread safe. * * @param msg * @param queue + * * @throws AMQException */ public void send(AMQMessage msg, AMQQueue queue) throws AMQException @@ -224,7 +220,7 @@ public class SubscriptionImpl implements Subscription // We don't decrement the reference here as we don't want to consume the message // but we do want to send it to the client. - synchronized(channel) + synchronized (channel) { long deliveryTag = channel.getNextDeliveryTag(); @@ -260,7 +256,7 @@ public class SubscriptionImpl implements Subscription } queue.dequeue(storeContext, msg); } - synchronized(channel) + synchronized (channel) { long deliveryTag = channel.getNextDeliveryTag(); @@ -309,11 +305,11 @@ public class SubscriptionImpl implements Subscription Object localInstance; Object msgInstance; - if((protocolSession.getClientProperties() != null) && - (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) + if ((protocolSession.getClientProperties() != null) && + (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) { - if((msg.getPublisher().getClientProperties() != null) && - (msgInstance = msg.getPublisher().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) + if ((msg.getPublisher().getClientProperties() != null) && + (msgInstance = msg.getPublisher().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) { if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance))) { @@ -402,10 +398,10 @@ public class SubscriptionImpl implements Subscription // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(), - protocolSession.getProtocolMajorVersion(), - protocolSession.getProtocolMinorVersion(), - consumerTag // consumerTag - )); + protocolSession.getProtocolMajorVersion(), + protocolSession.getProtocolMinorVersion(), + consumerTag // consumerTag + )); _closed = true; } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java index 6d1e9ce99d..29efdd9513 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java @@ -55,7 +55,6 @@ import org.apache.qpid.framing.QueuePurgeBody; import org.apache.qpid.framing.TxCommitBody; import org.apache.qpid.framing.TxRollbackBody; import org.apache.qpid.framing.TxSelectBody; -import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.server.handler.BasicAckMethodHandler; @@ -231,7 +230,7 @@ public class AMQStateManager implements AMQMethodListener && (protocolSession.getChannel(evt.getChannelId()) == null) && !protocolSession.channelAwaitingClosure(evt.getChannelId())) { - throw evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR.getCode(), "No such channel: " + evt.getChannelId()); + throw evt.getMethod().getChannelNotFoundException(evt.getChannelId()); } } |
