diff options
| author | Rupert Smith <rupertlssmith@apache.org> | 2007-05-21 11:26:55 +0000 |
|---|---|---|
| committer | Rupert Smith <rupertlssmith@apache.org> | 2007-05-21 11:26:55 +0000 |
| commit | 21d2df094acb8530b2fb902b5ed9a1d7db8463fd (patch) | |
| tree | 2db927674c4edc9809348b078d38705fa24ee965 /java/broker/src | |
| parent | 225e38f7c110fca5b7e6f3738dfdad8de96cd50e (diff) | |
| download | qpid-python-21d2df094acb8530b2fb902b5ed9a1d7db8463fd.tar.gz | |
Merged revisions 540107 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2
........
r540107 | rupertlssmith | 2007-05-21 11:57:30 +0100 (Mon, 21 May 2007) | 1 line
Documented all exception.
........
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@540119 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
11 files changed, 218 insertions, 110 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java index ff933d3c0b..d61bb8916a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java @@ -25,9 +25,17 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.queue.AMQMessage; /** - * Signals that a required delivery could not be made. This could be bacuse of - * the immediate flag being set and the queue having no consumers, or the mandatory - * flag being set and the exchange having no valid bindings. + * Signals that a required delivery could not be made. This could be bacuse of the immediate flag being set and the + * queue having no consumers, or the mandatory flag being set and the exchange having no valid bindings. + * + * <p/>The failed message is associated with this error condition, by taking a reference to it. This enables the + * correct compensating action to be taken against the message, for example, bouncing it back to the sender. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Represent failure to deliver a message that must be delivered. + * <tr><td> Associate the failed message with the error condition. <td> {@link AMQMessage} + * </table> */ public abstract class RequiredDeliveryException extends AMQException { @@ -40,10 +48,10 @@ public abstract class RequiredDeliveryException extends AMQException // Increment the reference as this message is in the routing phase // and so will have the ref decremented as routing fails. // we need to keep this message around so we can return it in the - // handler. So increment here. - _amqMessage = payload.takeReference(); - - //payload.incrementReference(); + // handler. So increment here. + _amqMessage = payload.takeReference(); + + // payload.incrementReference(); } public AMQMessage getAMQMessage() diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInUseException.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInUseException.java index 6b2891c573..c77f114428 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInUseException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInUseException.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,6 +22,20 @@ package org.apache.qpid.server.exchange; import org.apache.qpid.AMQException; +/** + * ExchangeInUseRegistry indicates that an exchange cannot be unregistered because it is currently being used. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Represents failure to unregister exchange that is in use. + * </table> + * + * @todo Not an AMQP exception as no status code. + * + * @todo This exception is not used. However, it is part of the ExchangeRegistry interface, and looks like code is + * going to need to be added to throw/deal with this. Alternatively ExchangeResitries may be able to handle the + * issue internally. + */ public class ExchangeInUseException extends AMQException { public ExchangeInUseException(String exchangeName) diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/NoRouteException.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/NoRouteException.java index c972b9d078..1d6ab3842d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/NoRouteException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/NoRouteException.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -25,8 +25,14 @@ import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.queue.AMQMessage; /** - * Thrown by an exchange if there is no way to route a message with the - * mandatory flag set. + * NoRouteException is a {@link RequiredDeliveryException} that represents the failure case where a manadatory message + * cannot be delivered because there is no route for the message. The AMQP status code, 312, is always used to report + * this condition. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Represent failure to deliver a message that must be delivered. + * </table> */ public class NoRouteException extends RequiredDeliveryException { diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java index 16d74b6fc0..a7599a3e0d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java @@ -20,13 +20,25 @@ */
package org.apache.qpid.server.protocol;
+import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.AMQException;
+/**
+ * AMQNoMethodHandlerException represents the case where no method handler exists to handle an AQMP method.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represents failure to handle an AMQP method.
+ * </table>
+ *
+ * @todo Not an AMQP exception as no status code.
+ *
+ * @todo Missing method handler. Unlikely to ever happen, and if it does its a coding error. Consider replacing with a
+ * Runtime.
+ */
public class AMQNoMethodHandlerException extends AMQException
{
-
public AMQNoMethodHandlerException(AMQMethodEvent<AMQMethodBody> evt)
{
super("AMQMethodEvent " + evt + " was not processed by any listener on Broker.");
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java index 45d09e8f3e..6e72aa062f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java @@ -20,14 +20,27 @@ */
package org.apache.qpid.server.protocol;
-import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQDataBlock;
+/**
+ * UnknnownMessageTypeException represents a failure when Mina passes an unexpected frame type.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represents failure to cast a frame to its expected type.
+ * </table>
+ *
+ * @todo Not an AMQP exception as no status code.
+ *
+ * @todo Seems like this exception was created to handle an unsafe type cast that will never happen in practice. Would
+ * be better just to leave that as a ClassCastException. However, check the framing layer catches this error
+ * first.
+ */
public class UnknnownMessageTypeException extends AMQException
{
public UnknnownMessageTypeException(AMQDataBlock message)
{
super("Unknown message type: " + message.getClass().getName() + ": " + message);
-
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index fa3b34a634..a17cbb87ff 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicLong; import javax.management.JMException; import org.apache.log4j.Logger; + import org.apache.qpid.AMQException; import org.apache.qpid.configuration.Configured; import org.apache.qpid.framing.AMQShortString; @@ -54,7 +55,19 @@ import org.apache.qpid.server.virtualhost.VirtualHost; */ public class AMQQueue implements Managable, Comparable, StorableQueue { - + /** + * ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription + * already exists. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Represent failure to create a subscription, because an exclusive subscription already exists. + * </table> + * + * @todo Not an AMQP exception as no status code. + * + * @todo Move to top level, used outside this class. + */ public static int s_queueID =0; public static final class ExistingExclusiveSubscription extends AMQException { @@ -65,19 +78,27 @@ public class AMQQueue implements Managable, Comparable, StorableQueue } } + /** + * ExistingSubscriptionPreventsExclusive signals a failure to create an exclusize subscription, as a subscription + * already exists. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Represent failure to create an exclusize subscription, as a subscription already exists. + * </table> + * + * @todo Not an AMQP exception as no status code. + * + * @todo Move to top level, used outside this class. + */ public static final class ExistingSubscriptionPreventsExclusive extends AMQException { - public ExistingSubscriptionPreventsExclusive() { super(""); } } - private static final ExistingExclusiveSubscription EXISTING_EXCLUSIVE = new ExistingExclusiveSubscription(); - private static final ExistingSubscriptionPreventsExclusive EXISTING_SUBSCRIPTION = new ExistingSubscriptionPreventsExclusive(); - - private static final Logger _logger = Logger.getLogger(AMQQueue.class); private final AMQShortString _name; @@ -134,7 +155,6 @@ public class AMQQueue implements Managable, Comparable, StorableQueue private final VirtualHost _virtualHost; - /** * max allowed size(KB) of a single message */ @@ -175,40 +195,34 @@ public class AMQQueue implements Managable, Comparable, StorableQueue return _name.compareTo(((AMQQueue) o).getName()); } - public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, - boolean autoDelete, VirtualHost virtualHost) - throws - AMQException + public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) + throws AMQException { - this(name, durable, owner, autoDelete, virtualHost, - AsyncDeliveryConfig.getAsyncDeliveryExecutor(), new SubscriptionSet(), new SubscriptionImpl.Factory()); + this(name, durable, owner, autoDelete, virtualHost, AsyncDeliveryConfig.getAsyncDeliveryExecutor(), + new SubscriptionSet(), new SubscriptionImpl.Factory()); } - - protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, - boolean autoDelete, VirtualHost virtualHost, - SubscriptionSet subscribers) - throws - AMQException + protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, + VirtualHost virtualHost, SubscriptionSet subscribers) throws AMQException { - this(name, durable, owner, autoDelete, virtualHost, - AsyncDeliveryConfig.getAsyncDeliveryExecutor(), subscribers, new SubscriptionImpl.Factory()); + this(name, durable, owner, autoDelete, virtualHost, AsyncDeliveryConfig.getAsyncDeliveryExecutor(), subscribers, + new SubscriptionImpl.Factory()); } - protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, - boolean autoDelete, VirtualHost virtualHost, - Executor asyncDelivery, SubscriptionSet subscribers, SubscriptionFactory subscriptionFactory) - throws - AMQException + protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, + VirtualHost virtualHost, Executor asyncDelivery, SubscriptionSet subscribers, + SubscriptionFactory subscriptionFactory) throws AMQException { if (name == null) { throw new IllegalArgumentException("Queue name must not be null"); } + if (virtualHost == null) { throw new IllegalArgumentException("Virtual Host must not be null"); } + _name = name; _durable = durable; _owner = owner; @@ -304,10 +318,11 @@ public class AMQQueue implements Managable, Comparable, StorableQueue public AMQMessage getMessageOnTheQueue(long messageId) { List<AMQMessage> list = getMessagesOnTheQueue(messageId, messageId); - if (list == null || list.size() == 0) + if ((list == null) || (list.size() == 0)) { return null; } + return list.get(0); } @@ -324,7 +339,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue * @param storeContext */ public synchronized void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, - StoreContext storeContext) + StoreContext storeContext) { // prepare the delivery manager for moving messages by stopping the async delivery and creating a lock AMQQueue anotherQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName)); @@ -460,6 +475,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue throw new AMQException("Problem binding queue ", e); } } + _bindings.addBinding(routingKey, arguments, exchange); } @@ -478,25 +494,23 @@ public class AMQQueue implements Managable, Comparable, StorableQueue throw new AMQException("problem unbinding queue", e); } } + _bindings.remove(routingKey, arguments, exchange); } - public void registerProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag, boolean acks, - FieldTable filters, boolean noLocal, boolean exclusive) - throws - AMQException + FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException { if (incrementSubscriberCount() > 1) { if (isExclusive()) { decrementSubscriberCount(); - throw EXISTING_EXCLUSIVE; + throw new ExistingExclusiveSubscription(); } else if (exclusive) { decrementSubscriberCount(); - throw EXISTING_SUBSCRIPTION; + throw new ExistingSubscriptionPreventsExclusive(); } } else if (exclusive) @@ -506,12 +520,13 @@ public class AMQQueue implements Managable, Comparable, StorableQueue if (_logger.isDebugEnabled()) { - _logger.debug(MessageFormat.format("Registering protocol session {0} with channel {1} and " + - "consumer tag {2} with {3}", ps, channel, consumerTag, this)); + _logger.debug(MessageFormat.format( + "Registering protocol session {0} with channel {1} and " + "consumer tag {2} with {3}", ps, channel, + consumerTag, this)); } - Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, - filters, noLocal, this); + Subscription subscription = + _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal, this); if (subscription.filtersMessages()) { @@ -524,7 +539,6 @@ public class AMQQueue implements Managable, Comparable, StorableQueue _subscribers.addSubscriber(subscription); } - private boolean isExclusive() { return _isExclusive.get(); @@ -545,25 +559,25 @@ public class AMQQueue implements Managable, Comparable, StorableQueue return _subscriberCount.decrementAndGet(); } - public void unregisterProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag) throws AMQException { if (_logger.isDebugEnabled()) { - _logger.debug(MessageFormat.format("Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}", ps, channel, consumerTag, - this)); + _logger.debug(MessageFormat.format( + "Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}", ps, channel, + consumerTag, this)); } Subscription removedSubscription; - if ((removedSubscription = _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel, - ps, - consumerTag))) + + if ((removedSubscription = + _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel, ps, consumerTag))) == null) { - throw new AMQException("Protocol session with channel " + channel + " and consumer tag " + consumerTag + - " and protocol session key " + ps.getKey() + " not registered with queue " + this); + throw new AMQException("Protocol session with channel " + channel + " and consumer tag " + consumerTag + + " and protocol session key " + ps.getKey() + " not registered with queue " + this); } removedSubscription.close(); @@ -577,6 +591,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue { _logger.info("Auto-deleteing queue:" + this); } + autodelete(); // we need to manually fire the event to the removed subscription (which was the last one left for this // queue. This is because the delete method uses the subscription set which has just been cleared @@ -594,7 +609,6 @@ public class AMQQueue implements Managable, Comparable, StorableQueue return !_deliveryMgr.hasQueuedMessages(); } - public int delete(boolean checkUnused, boolean checkEmpty) throws AMQException @@ -602,14 +616,17 @@ public class AMQQueue implements Managable, Comparable, StorableQueue if (checkUnused && !_subscribers.isEmpty()) { _logger.info("Will not delete " + this + " as it is in use."); + return 0; } else if (checkEmpty && _deliveryMgr.hasQueuedMessages()) { _logger.info("Will not delete " + this + " as it is not empty."); + return 0; } else { delete(); + return _deliveryMgr.getQueueMessageCount(); } } @@ -628,6 +645,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue { task.doTask(this); } + _deleteTaskList.clear(); } } @@ -640,6 +658,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue { _logger.debug(MessageFormat.format("autodeleting {0}", this)); } + delete(); } @@ -647,7 +666,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue throws AMQException { - //fixme not sure what this is doing. should we be passing deliverFirst through here? + // fixme not sure what this is doing. should we be passing deliverFirst through here? // This code is not used so when it is perhaps it should _deliveryMgr.deliver(storeContext, getName(), msg, deliverFirst); try @@ -663,10 +682,10 @@ public class AMQQueue implements Managable, Comparable, StorableQueue } } -// public DeliveryManager getDeliveryManager() -// { -// return _deliveryMgr; -// } + // public DeliveryManager getDeliveryManager() + // { + // return _deliveryMgr; + // } public void process(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws @@ -696,10 +715,10 @@ public class AMQQueue implements Managable, Comparable, StorableQueue } catch (MessageCleanupException e) { - //Message was dequeued, but could not then be deleted - //though it is no longer referenced. This should be very - //rare and can be detected and cleaned up on recovery or - //done through some form of manual intervention. + // Message was dequeued, but could not then be deleted + // though it is no longer referenced. This should be very + // rare and can be detected and cleaned up on recovery or + // done through some form of manual intervention. _logger.error(e, e); } catch (AMQException e) @@ -743,7 +762,8 @@ public class AMQQueue implements Managable, Comparable, StorableQueue { return true; } - if (o == null || getClass() != o.getClass()) + + if ((o == null) || (getClass() != o.getClass())) { return false; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java index b74c49e6e1..6466e81dd2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,7 +23,18 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; /** - * Signals that the dequeue of a message from a queue failed + * Signals that the dequeue of a message from a queue failed. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Indicates the a message could not be dequeued from a queue. + * <tr><td> + * </table> + * + * @todo Not an AMQP exception as no status code. + * + * @todo Happens as a consequence of a message store failure, or reference counting error. Both of which migh become + * runtime exceptions, as unrecoverable conditions? In which case this one might be dropped too. */ public class FailedDequeueException extends AMQException { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java index 1e7e6f03d2..090096d3c3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,8 +23,20 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; /** - * Signals that the removal of a message once its refcount reached - * zero failed. + * MessageCleanupException represents the failure to perform reference counting on messages correctly. This should not + * happen, but there may be programming errors giving race conditions that cause the reference counting to go wrong. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Signals that the reference count of a message has gone below zero. + * <tr><td> Indicates that a message store has lost a message which is still referenced. + * </table> + * + * @todo Not an AMQP exception as no status code. + * + * @todo The race conditions leading to this error should be cleaned up, and a runtime exception used instead. If the + * message store loses messages, then something is seriously wrong and it would be sensible to terminate the + * broker. This may be disguising out of memory errors. */ public class MessageCleanupException extends AMQException { @@ -32,6 +44,7 @@ public class MessageCleanupException extends AMQException { super("Failed to cleanup message with id " + messageId, e); } + public MessageCleanupException(String message) { super(message); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java index c63490f019..d6fd1eec89 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -24,9 +24,14 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.RequiredDeliveryException; /** - * Signals that no consumers exist for a message at a given point in time. - * Used if a message has immediate=true and there are no consumers registered - * with the queue. + * NoConsumersException is a {@link RequiredDeliveryException} that represents the failure case where an immediate + * message cannot be delivered because there are presently no consumers for the message. The AMQP status code, 313, is + * always used to report this condition. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Represent failure to deliver a message that must be delivered. + * </table> */ public class NoConsumersException extends RequiredDeliveryException { diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java index 50129ec274..f96900d0a9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.concurrent.CopyOnWriteArraySet; import org.apache.log4j.Logger; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.BasicAckBody; @@ -35,6 +36,7 @@ import org.apache.qpid.framing.BasicGetBody; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.BasicQosBody; import org.apache.qpid.framing.BasicRecoverBody; +import org.apache.qpid.framing.BasicRejectBody; import org.apache.qpid.framing.ChannelCloseBody; import org.apache.qpid.framing.ChannelCloseOkBody; import org.apache.qpid.framing.ChannelFlowBody; @@ -55,7 +57,6 @@ import org.apache.qpid.framing.QueuePurgeBody; import org.apache.qpid.framing.TxCommitBody; import org.apache.qpid.framing.TxRollbackBody; import org.apache.qpid.framing.TxSelectBody; -import org.apache.qpid.framing.BasicRejectBody; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.server.handler.BasicAckMethodHandler; @@ -65,6 +66,7 @@ import org.apache.qpid.server.handler.BasicGetMethodHandler; import org.apache.qpid.server.handler.BasicPublishMethodHandler; import org.apache.qpid.server.handler.BasicQosHandler; import org.apache.qpid.server.handler.BasicRecoverMethodHandler; +import org.apache.qpid.server.handler.BasicRejectMethodHandler; import org.apache.qpid.server.handler.ChannelCloseHandler; import org.apache.qpid.server.handler.ChannelCloseOkHandler; import org.apache.qpid.server.handler.ChannelFlowHandler; @@ -83,9 +85,8 @@ import org.apache.qpid.server.handler.QueueDeclareHandler; import org.apache.qpid.server.handler.QueueDeleteHandler; import org.apache.qpid.server.handler.QueuePurgeHandler; import org.apache.qpid.server.handler.TxCommitHandler; -import org.apache.qpid.server.handler.BasicRejectMethodHandler; -import org.apache.qpid.server.handler.TxSelectHandler; import org.apache.qpid.server.handler.TxRollbackHandler; +import org.apache.qpid.server.handler.TxSelectHandler; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; @@ -107,18 +108,18 @@ public class AMQStateManager implements AMQMethodListener * AMQFrame. */ private final EnumMap<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>> _state2HandlersMap = - new EnumMap<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>>(AMQState.class); - + new EnumMap<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>>( + AMQState.class); private CopyOnWriteArraySet<StateListener> _stateListeners = new CopyOnWriteArraySet<StateListener>(); - public AMQStateManager(VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession) { this(AMQState.CONNECTION_NOT_STARTED, true, virtualHostRegistry, protocolSession); } - protected AMQStateManager(AMQState initial, boolean register, VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession) + protected AMQStateManager(AMQState initial, boolean register, VirtualHostRegistry virtualHostRegistry, + AMQProtocolSession protocolSession) { _virtualHostRegistry = virtualHostRegistry; _protocolSession = protocolSession; @@ -220,37 +221,38 @@ public class AMQStateManager implements AMQMethodListener checkChannel(evt, _protocolSession); handler.methodReceived(this, evt); + return true; } + return false; } private <B extends AMQMethodBody> void checkChannel(AMQMethodEvent<B> evt, AMQProtocolSession protocolSession) - throws AMQException + throws AMQException { - if (evt.getChannelId() != 0 - && !(evt.getMethod() instanceof ChannelOpenBody) - && (protocolSession.getChannel(evt.getChannelId()) == null) - && !protocolSession.channelAwaitingClosure(evt.getChannelId())) + if ((evt.getChannelId() != 0) && !(evt.getMethod() instanceof ChannelOpenBody) + && (protocolSession.getChannel(evt.getChannelId()) == null) + && !protocolSession.channelAwaitingClosure(evt.getChannelId())) { throw evt.getMethod().getChannelNotFoundException(evt.getChannelId()); } } protected <B extends AMQMethodBody> StateAwareMethodListener<B> findStateTransitionHandler(AMQState currentState, - B frame) - throws IllegalStateTransitionException + B frame) + // throws IllegalStateTransitionException { - final Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>> - classToHandlerMap = _state2HandlersMap.get(currentState); + final Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>> classToHandlerMap = + _state2HandlersMap.get(currentState); - final StateAwareMethodListener<B> handler = classToHandlerMap == null - ? null - : (StateAwareMethodListener<B>) classToHandlerMap.get(frame.getClass()); + final StateAwareMethodListener<B> handler = + (classToHandlerMap == null) ? null : (StateAwareMethodListener<B>) classToHandlerMap.get(frame.getClass()); if (handler == null) { _logger.debug("No state transition handler defined for receiving frame " + frame); + return null; } else diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/IllegalStateTransitionException.java b/java/broker/src/main/java/org/apache/qpid/server/state/IllegalStateTransitionException.java index 2d7cc27a85..cec67a8a6d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/state/IllegalStateTransitionException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/state/IllegalStateTransitionException.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,6 +22,11 @@ package org.apache.qpid.server.state; import org.apache.qpid.AMQException; +/** + * @todo Not an AMQP exception as no status code. + * + * @todo Not used! Delete. + */ public class IllegalStateTransitionException extends AMQException { private AMQState _originalState; @@ -30,8 +35,7 @@ public class IllegalStateTransitionException extends AMQException public IllegalStateTransitionException(AMQState originalState, Class frame) { - super("No valid state transition defined for receiving frame " + frame + - " from state " + originalState); + super("No valid state transition defined for receiving frame " + frame + " from state " + originalState); _originalState = originalState; _frame = frame; } |
