summaryrefslogtreecommitdiff
path: root/java/broker
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-02-16 13:00:39 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-02-16 13:00:39 +0000
commit267709b3bb825f6cfb652510307687bc67f3dbb0 (patch)
tree4a87d5e6b71ece9a51c3c46e5a6748d1d26f210d /java/broker
parent0c7c587920299fdc5b2639e5ed7efc143a83c532 (diff)
downloadqpid-python-267709b3bb825f6cfb652510307687bc67f3dbb0.tar.gz
QPID-11 remove protocol literals from code.
QPID-376 use of getChannel() does not correct handle error cases when null is returned. Updated AMQMethodBody - to have a convenience method getChannelNotFoundException to be used for QPID-376 when channel is null. This allows the replyCode NOT_FOUND=404 to be changed to changed easily if required. QPID-376 - Updated All Handlers to throw channel exception when channel is null. QPID-11 Updated all handlers to use AMQConstant values rather than hardcoded literals. - Updated AMQException to use AMQConstant values rather than int to ensure that no more literal values creep back in to the code base. Replaced all usages of int above framing to store replycode with AMQConstant to prevent creep. Had to create new constants for literals used in code base but not yet part of spec. 405=Already Exists 406=In Use 323=Invalid Routing Key Remove non spec constant 500=Unknown_Exchange_Name replaced with generic NOT_FOUND git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@508381 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java13
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java35
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java18
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java18
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java20
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java15
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java46
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java3
23 files changed, 193 insertions, 92 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 8b36576a30..0879b77f37 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -602,7 +602,7 @@ public class AMQChannel
for (RequiredDeliveryException bouncedMessage : _returnMessages)
{
AMQMessage message = bouncedMessage.getAMQMessage();
- message.writeReturn(session, _channelId, bouncedMessage.getReplyCode(), new AMQShortString(bouncedMessage.getMessage()));
+ message.writeReturn(session, _channelId, bouncedMessage.getReplyCode().getCode(), new AMQShortString(bouncedMessage.getMessage()));
}
_returnMessages.clear();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
index b85e3603b7..820f0122f5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.queue.AMQMessage;
/**
@@ -44,10 +45,10 @@ public abstract class RequiredDeliveryException extends AMQException
return _amqMessage;
}
- public int getErrorCode()
+ public AMQConstant getErrorCode()
{
return getReplyCode();
}
- public abstract int getReplyCode();
+ public abstract AMQConstant getReplyCode();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java
index 6688318a0a..f93b2b25e6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java
@@ -47,13 +47,19 @@ public class BasicAckMethodHandler implements StateAwareMethodListener<BasicAckB
public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicAckBody> evt) throws AMQException
{
AMQProtocolSession protocolSession = stateManager.getProtocolSession();
-
+
if (_log.isDebugEnabled())
{
_log.debug("Ack received on channel " + evt.getChannelId());
}
BasicAckBody body = evt.getMethod();
final AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(evt.getChannelId());
+ }
+
// this method throws an AMQException if the delivery tag is not known
channel.acknowledgeMessage(body.deliveryTag, body.multiple);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
index 1e56542b2b..7d18043f5c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
@@ -49,15 +49,21 @@ public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicC
final AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
final BasicCancelBody body = evt.getMethod();
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(evt.getChannelId());
+ }
+
channel.unsubscribeConsumer(protocolSession, body.consumerTag);
- if(!body.nowait)
+ if (!body.nowait)
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
final AMQFrame responseFrame = BasicCancelOkBody.createAMQFrame(evt.getChannelId(),
- (byte)8, (byte)0, // AMQP version (major, minor)
- body.consumerTag); // consumerTag
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ body.consumerTag); // consumerTag
protocolSession.writeFrame(responseFrame);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
index feb6f6b1fa..090988d304 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
@@ -61,11 +61,12 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
final int channelId = evt.getChannelId();
AMQChannel channel = session.getChannel(channelId);
+
VirtualHost vHost = session.getVirtualHost();
+
if (channel == null)
{
- _log.error("Channel " + channelId + " not found");
- // TODO: either alert or error that the
+ throw body.getChannelNotFoundException(evt.getChannelId());
}
else
{
@@ -78,12 +79,12 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
if (body.queue != null)
{
String msg = "No such queue, '" + body.queue + "'";
- throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), msg);
+ throw body.getChannelException(AMQConstant.NOT_FOUND, msg);
}
else
{
String msg = "No queue name provided, no default queue defined.";
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(), msg);
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED, msg);
}
}
else
@@ -108,24 +109,24 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
catch (AMQInvalidSelectorException ise)
{
_log.info("Closing connection due to invalid selector");
- throw body.getChannelException(AMQConstant.INVALID_SELECTOR.getCode(), ise.getMessage());
+ throw body.getChannelException(AMQConstant.INVALID_SELECTOR, ise.getMessage());
}
catch (ConsumerTagNotUniqueException e)
{
AMQShortString msg = new AMQShortString("Non-unique consumer tag, '" + body.consumerTag + "'");
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(),
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
"Non-unique consumer tag, '" + body.consumerTag + "'");
}
catch (AMQQueue.ExistingExclusiveSubscription e)
{
- throw body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(),
+ throw body.getChannelException(AMQConstant.ACCESS_REFUSED,
"Cannot subscribe to queue "
+ queue.getName()
+ " as it already has an existing exclusive consumer");
}
catch (AMQQueue.ExistingSubscriptionPreventsExclusive e)
{
- throw body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(),
+ throw body.getChannelException(AMQConstant.ACCESS_REFUSED,
"Cannot subscribe to queue "
+ queue.getName()
+ " exclusively as it already has a consumer");
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
index 72d7e8b8b9..b88c2ebf3a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
@@ -39,8 +39,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
AMQChannel channel = session.getChannel(channelId);
if (channel == null)
{
- _log.error("Channel " + channelId + " not found");
- // TODO: either alert or error that the
+ throw body.getChannelNotFoundException(evt.getChannelId());
}
else
{
@@ -51,12 +50,12 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
_log.info("No queue for '" + body.queue + "'");
if(body.queue!=null)
{
- throw body.getConnectionException(AMQConstant.NOT_FOUND.getCode(),
+ throw body.getConnectionException(AMQConstant.NOT_FOUND,
"No such queue, '" + body.queue + "'");
}
else
{
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(),
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
"No queue name provided, no default queue defined.");
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
index a30cc2ca3c..7e378dfd01 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
@@ -25,6 +25,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -70,8 +71,7 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi
// if the exchange does not exist we raise a channel exception
if (e == null)
{
- throw body.getChannelException(500, "Unknown exchange name");
-
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange name");
}
else
{
@@ -79,6 +79,12 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi
// is stored in the channel. Once the final body frame has been received
// it is routed to the exchange.
AMQChannel channel = session.getChannel(evt.getChannelId());
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(evt.getChannelId());
+ }
+
channel.setPublishFrame(body, session);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
index 4bc1439e53..3cd6a87f64 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
@@ -27,6 +27,7 @@ import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.AMQChannel;
public class BasicQosHandler implements StateAwareMethodListener<BasicQosBody>
{
@@ -40,12 +41,18 @@ public class BasicQosHandler implements StateAwareMethodListener<BasicQosBody>
public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicQosBody> evt) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
- session.getChannel(evt.getChannelId()).setPrefetchCount(evt.getMethod().prefetchCount);
- session.getChannel(evt.getChannelId()).setPrefetchSize(evt.getMethod().prefetchSize);
+ AMQChannel channel = session.getChannel(evt.getChannelId());
+ if (channel == null)
+ {
+ throw evt.getMethod().getChannelNotFoundException(evt.getChannelId());
+ }
+
+ channel.setPrefetchCount(evt.getMethod().prefetchCount);
+ channel.setPrefetchSize(evt.getMethod().prefetchSize);
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- session.writeFrame(BasicQosOkBody.createAMQFrame(evt.getChannelId(),(byte)8, (byte)0));
+ session.writeFrame(BasicQosOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0));
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
index 9f0d096a73..5f5b7ccad1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
@@ -46,12 +46,13 @@ public class BasicRecoverMethodHandler implements StateAwareMethodListener<Basic
_logger.debug("Recover received on protocol session " + session + " and channel " + evt.getChannelId());
AMQChannel channel = session.getChannel(evt.getChannelId());
+ BasicRecoverBody body = evt.getMethod();
+
if (channel == null)
{
- throw new AMQException("Unknown channel " + evt.getChannelId());
+ throw body.getChannelNotFoundException(evt.getChannelId());
}
- BasicRecoverBody body = evt.getMethod();
- channel.resend(session, body.requeue);
+ channel.resend(session, body.requeue);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
index bdb877b7ac..bfa170cfc5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
@@ -52,6 +52,12 @@ public class ChannelFlowHandler implements StateAwareMethodListener<ChannelFlowB
ChannelFlowBody body = evt.getMethod();
AMQChannel channel = session.getChannel(evt.getChannelId());
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(evt.getChannelId());
+ }
+
channel.setSuspended(!body.active);
_logger.debug("Channel.Flow for channel " + evt.getChannelId() + ", active=" + body.active);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
index 809676cfbe..a85af61327 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
@@ -71,7 +71,7 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con
if(virtualHost == null)
{
- throw body.getConnectionException(AMQConstant.NOT_FOUND.getCode(), "Unknown virtual host: " + virtualHostName);
+ throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown virtual host: " + virtualHostName);
}
else
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
index 575833a68f..be3ffcc698 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
@@ -76,7 +76,7 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange
{
if(body.passive && ((body.type == null) || body.type.length() ==0))
{
- throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), "Unknown exchange: " + body.exchange);
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + body.exchange);
}
else
{
@@ -89,14 +89,14 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange
}
catch(AMQUnknownExchangeType e)
{
- throw body.getConnectionException(AMQConstant.COMMAND_INVALID.getCode(), "Unknown exchange: " + body.exchange,e);
+ throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + body.exchange,e);
}
}
}
else if (!exchange.getType().equals(body.type))
{
- throw new AMQConnectionException(AMQConstant.NOT_ALLOWED.getCode(), "Attempt to redeclare exchange: " + body.exchange + " of type " + exchange.getType() + " to " + body.type +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor());
+ throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + body.exchange + " of type " + exchange.getType() + " to " + body.type +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor());
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
index bccb9db967..3c903b471d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.handler;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidRoutingKeyException;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.QueueBindBody;
import org.apache.qpid.framing.QueueBindOkBody;
@@ -35,6 +36,7 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.AMQChannel;
public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
{
@@ -57,17 +59,25 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
VirtualHost virtualHost = session.getVirtualHost();
ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
-
final QueueBindBody body = evt.getMethod();
final AMQQueue queue;
if (body.queue == null)
{
- queue = session.getChannel(evt.getChannelId()).getDefaultQueue();
+ AMQChannel channel = session.getChannel(evt.getChannelId());
+
+// if (channel == null)
+// {
+// throw body.getChannelNotFoundException(evt.getChannelId());
+// }
+
+ queue = channel.getDefaultQueue();
+
if (queue == null)
{
- throw new AMQException("No default queue defined on channel and queue was null");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null");
}
+
if (body.routingKey == null)
{
body.routingKey = queue.getName();
@@ -80,14 +90,25 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
if (queue == null)
{
- throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), "Queue " + body.queue + " does not exist.");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.queue + " does not exist.");
}
final Exchange exch = exchangeRegistry.getExchange(body.exchange);
if (exch == null)
{
- throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), "Exchange " + body.exchange + " does not exist.");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.exchange + " does not exist.");
+ }
+ try
+ {
+ exch.registerQueue(body.routingKey, queue, body.arguments);
+ }
+ catch (AMQInvalidRoutingKeyException rke)
+ {
+ throw body.getChannelException(AMQConstant.INVALID_ROUTING_KEY, body.routingKey.toString());
+ }
+ catch (AMQException e)
+ {
+ throw body.getChannelException(AMQConstant.CHANNEL_ERROR, e.toString());
}
- exch.registerQueue(body.routingKey, queue, body.arguments);
queue.bind(body.routingKey, exch);
if (_log.isInfoEnabled())
{
@@ -98,7 +119,7 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- final AMQFrame response = QueueBindOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
+ final AMQFrame response = QueueBindOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0);
session.writeFrame(response);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
index 1e4b7c9e57..2218ff604f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
@@ -43,6 +43,7 @@ import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.AMQChannel;
public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclareBody>
{
@@ -83,7 +84,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
body.queue = createName();
}
- AMQQueue queue = null;
+ AMQQueue queue;
//TODO: do we need to check that the queue already exists with exactly the same "configuration"?
synchronized (queueRegistry)
@@ -94,8 +95,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
if(body.passive)
{
String msg = "Queue: " + body.queue + " not found.";
- throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(),msg );
-
+ throw body.getChannelException(AMQConstant.NOT_FOUND,msg );
}
else
{
@@ -116,12 +116,18 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
}
else if(queue.getOwner() != null && !session.getContextKey().equals(queue.getOwner()))
{
- // todo - constant
- throw body.getChannelException(405, "Cannot declare queue, as exclusive queue with same name declared on another connection");
+ throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue, as exclusive queue with same name declared on another connection");
+ }
+
+ AMQChannel channel = session.getChannel(evt.getChannelId());
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(evt.getChannelId());
}
+
//set this as the default queue on the channel:
- session.getChannel(evt.getChannelId()).setDefaultQueue(queue);
+ channel.setDefaultQueue(queue);
}
if (!body.nowait)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
index 4c875692f0..0c7de312a7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
@@ -24,6 +24,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.QueueDeleteBody;
import org.apache.qpid.framing.QueueDeleteOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
@@ -31,6 +32,7 @@ import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.AMQChannel;
public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteBody>
{
@@ -65,7 +67,15 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDelete
AMQQueue queue;
if(body.queue == null)
{
- queue = session.getChannel(evt.getChannelId()).getDefaultQueue();
+ AMQChannel channel = session.getChannel(evt.getChannelId());
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(evt.getChannelId());
+ }
+
+ //get the default queue on the channel:
+ queue = channel.getDefaultQueue();
}
else
{
@@ -76,19 +86,19 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDelete
{
if(_failIfNotFound)
{
- throw body.getChannelException(404, "Queue " + body.queue + " does not exist.");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.queue + " does not exist.");
}
}
else
{
if(body.ifEmpty && !queue.isEmpty())
{
- throw body.getChannelException(406, "Queue: " + body.queue + " is not empty." );
+ throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.queue + " is not empty." );
}
else if(body.ifUnused && !queue.isUnused())
{
// TODO - Error code
- throw body.getChannelException(406, "Queue: " + body.queue + " is still used." );
+ throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.queue + " is still used." );
}
else
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
index 3ccc61fff0..0c00436470 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
@@ -11,6 +11,7 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.AMQChannel;
public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBody>
{
@@ -39,18 +40,27 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod
VirtualHost virtualHost = session.getVirtualHost();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+ AMQChannel channel = session.getChannel(evt.getChannelId());
+
QueuePurgeBody body = evt.getMethod();
AMQQueue queue;
if(body.queue == null)
{
- queue = session.getChannel(evt.getChannelId()).getDefaultQueue();
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(evt.getChannelId());
+ }
+
+ //get the default queue on the channel:
+ queue = channel.getDefaultQueue();
+
if(queue == null)
{
if(_failIfNotFound)
{
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(),"No queue specified.");
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,"No queue specified.");
}
-
}
}
else
@@ -62,12 +72,12 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod
{
if(_failIfNotFound)
{
- throw body.getChannelException(404, "Queue " + body.queue + " does not exist.");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.queue + " does not exist.");
}
}
else
{
- long purged = queue.clearQueue(session.getChannel(evt.getChannelId()).getStoreContext());
+ long purged = queue.clearQueue(channel.getStoreContext());
if(!body.nowait)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
index caf0efad67..3d7ec286f9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
@@ -47,7 +47,7 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody>
public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxCommitBody> evt) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
+ AMQProtocolSession session = stateManager.getProtocolSession();
try
{
@@ -56,14 +56,20 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody>
_log.debug("Commit received on channel " + evt.getChannelId());
}
AMQChannel channel = session.getChannel(evt.getChannelId());
+
+ if (channel == null)
+ {
+ throw evt.getMethod().getChannelNotFoundException(evt.getChannelId());
+ }
+
channel.commit();
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- session.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
+ session.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0));
channel.processReturns(session);
}
- catch(AMQException e)
+ catch (AMQException e)
{
throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage());
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
index 9088240351..8ce5a0ea73 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
@@ -45,18 +45,27 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod
public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxRollbackBody> evt) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
-
- try{
+
+ try
+ {
AMQChannel channel = session.getChannel(evt.getChannelId());
+
+ if (channel == null)
+ {
+ throw evt.getMethod().getChannelNotFoundException(evt.getChannelId());
+ }
+
channel.rollback();
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- session.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
+ session.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0));
//Now resend all the unacknowledged messages back to the original subscribers.
//(Must be done after the TxnRollback-ok response).
channel.resend(session, false);
- }catch(AMQException e){
+ }
+ catch (AMQException e)
+ {
throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to rollback: " + e.getMessage());
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
index 29795e50ca..a9e478e301 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
@@ -27,6 +27,7 @@ import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.AMQChannel;
public class TxSelectHandler implements StateAwareMethodListener<TxSelectBody>
{
@@ -44,11 +45,19 @@ public class TxSelectHandler implements StateAwareMethodListener<TxSelectBody>
public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxSelectBody> evt) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
-
- session.getChannel(evt.getChannelId()).setLocalTransactional();
+
+ AMQChannel channel = session.getChannel(evt.getChannelId());
+
+ if (channel == null)
+ {
+ throw evt.getMethod().getChannelNotFoundException(evt.getChannelId());
+ }
+
+ channel.setLocalTransactional();
+
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- session.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
+ session.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0));
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index e53410420f..309fa4663a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -325,6 +325,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_logger.debug("Content header frame received: " + frame);
}
+ //fixme what happens if getChannel returns null
getChannel(frame.getChannel()).publishContentHeader((ContentHeaderBody) frame.getBodyFrame());
}
@@ -334,6 +335,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_logger.debug("Content body frame received: " + frame);
}
+ //fixme what happens if getChannel returns null
getChannel(frame.getChannel()).publishContentBody((ContentBody) frame.getBodyFrame(), this);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java
index 2049189e0f..c63490f019 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java
@@ -35,8 +35,8 @@ public class NoConsumersException extends RequiredDeliveryException
super("Immediate delivery is not possible.", message);
}
- public int getReplyCode()
+ public AMQConstant getReplyCode()
{
- return AMQConstant.NO_CONSUMERS.getCode();
+ return AMQConstant.NO_CONSUMERS;
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index 05841ccfc0..6bdfeccc0f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -24,6 +24,8 @@ import java.util.Queue;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQChannelException;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.framing.AMQShortString;
@@ -37,11 +39,8 @@ import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
/**
- * Encapsulation of a supscription to a queue.
- * <p/>
- * Ties together the protocol session of a subscriber, the consumer tag that
- * was given out by the broker and the channel id.
- * <p/>
+ * Encapsulation of a supscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag
+ * that was given out by the broker and the channel id. <p/>
*/
public class SubscriptionImpl implements Subscription
{
@@ -59,9 +58,7 @@ public class SubscriptionImpl implements Subscription
private final boolean _noLocal;
- /**
- * True if messages need to be acknowledged
- */
+ /** True if messages need to be acknowledged */
private final boolean _acks;
private FilterManager _filters;
private final boolean _isBrowser;
@@ -96,8 +93,8 @@ public class SubscriptionImpl implements Subscription
{
AMQChannel channel = protocolSession.getChannel(channelId);
if (channel == null)
- {
- throw new NullPointerException("channel not found in protocol session");
+ {
+ throw new AMQException(AMQConstant.NOT_FOUND, "channel :" + channelId + " not found in protocol session");
}
this.channel = channel;
@@ -172,9 +169,7 @@ public class SubscriptionImpl implements Subscription
return (o instanceof SubscriptionImpl) && equals((SubscriptionImpl) o);
}
- /**
- * Equality holds if the session matches and the channel and consumer tag are the same.
- */
+ /** Equality holds if the session matches and the channel and consumer tag are the same. */
private boolean equals(SubscriptionImpl psc)
{
return sessionKey.equals(psc.sessionKey)
@@ -193,11 +188,12 @@ public class SubscriptionImpl implements Subscription
}
/**
- * This method can be called by each of the publisher threads.
- * As a result all changes to the channel object must be thread safe.
+ * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
+ * thread safe.
*
* @param msg
* @param queue
+ *
* @throws AMQException
*/
public void send(AMQMessage msg, AMQQueue queue) throws AMQException
@@ -224,7 +220,7 @@ public class SubscriptionImpl implements Subscription
// We don't decrement the reference here as we don't want to consume the message
// but we do want to send it to the client.
- synchronized(channel)
+ synchronized (channel)
{
long deliveryTag = channel.getNextDeliveryTag();
@@ -260,7 +256,7 @@ public class SubscriptionImpl implements Subscription
}
queue.dequeue(storeContext, msg);
}
- synchronized(channel)
+ synchronized (channel)
{
long deliveryTag = channel.getNextDeliveryTag();
@@ -309,11 +305,11 @@ public class SubscriptionImpl implements Subscription
Object localInstance;
Object msgInstance;
- if((protocolSession.getClientProperties() != null) &&
- (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
+ if ((protocolSession.getClientProperties() != null) &&
+ (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
{
- if((msg.getPublisher().getClientProperties() != null) &&
- (msgInstance = msg.getPublisher().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
+ if ((msg.getPublisher().getClientProperties() != null) &&
+ (msgInstance = msg.getPublisher().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
{
if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance)))
{
@@ -402,10 +398,10 @@ public class SubscriptionImpl implements Subscription
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(),
- protocolSession.getProtocolMajorVersion(),
- protocolSession.getProtocolMinorVersion(),
- consumerTag // consumerTag
- ));
+ protocolSession.getProtocolMajorVersion(),
+ protocolSession.getProtocolMinorVersion(),
+ consumerTag // consumerTag
+ ));
_closed = true;
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
index 6d1e9ce99d..29efdd9513 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
@@ -55,7 +55,6 @@ import org.apache.qpid.framing.QueuePurgeBody;
import org.apache.qpid.framing.TxCommitBody;
import org.apache.qpid.framing.TxRollbackBody;
import org.apache.qpid.framing.TxSelectBody;
-import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.server.handler.BasicAckMethodHandler;
@@ -231,7 +230,7 @@ public class AMQStateManager implements AMQMethodListener
&& (protocolSession.getChannel(evt.getChannelId()) == null)
&& !protocolSession.channelAwaitingClosure(evt.getChannelId()))
{
- throw evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR.getCode(), "No such channel: " + evt.getChannelId());
+ throw evt.getMethod().getChannelNotFoundException(evt.getChannelId());
}
}