summaryrefslogtreecommitdiff
path: root/java/broker/src
diff options
context:
space:
mode:
authorRupert Smith <rupertlssmith@apache.org>2007-05-21 11:26:55 +0000
committerRupert Smith <rupertlssmith@apache.org>2007-05-21 11:26:55 +0000
commit21d2df094acb8530b2fb902b5ed9a1d7db8463fd (patch)
tree2db927674c4edc9809348b078d38705fa24ee965 /java/broker/src
parent225e38f7c110fca5b7e6f3738dfdad8de96cd50e (diff)
downloadqpid-python-21d2df094acb8530b2fb902b5ed9a1d7db8463fd.tar.gz
Merged revisions 540107 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2 ........ r540107 | rupertlssmith | 2007-05-21 11:57:30 +0100 (Mon, 21 May 2007) | 1 line Documented all exception. ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@540119 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java22
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInUseException.java18
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/NoRouteException.java14
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java16
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java136
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java21
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java15
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java40
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/state/IllegalStateTransitionException.java12
11 files changed, 218 insertions, 110 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
index ff933d3c0b..d61bb8916a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
@@ -25,9 +25,17 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.queue.AMQMessage;
/**
- * Signals that a required delivery could not be made. This could be bacuse of
- * the immediate flag being set and the queue having no consumers, or the mandatory
- * flag being set and the exchange having no valid bindings.
+ * Signals that a required delivery could not be made. This could be bacuse of the immediate flag being set and the
+ * queue having no consumers, or the mandatory flag being set and the exchange having no valid bindings.
+ *
+ * <p/>The failed message is associated with this error condition, by taking a reference to it. This enables the
+ * correct compensating action to be taken against the message, for example, bouncing it back to the sender.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represent failure to deliver a message that must be delivered.
+ * <tr><td> Associate the failed message with the error condition. <td> {@link AMQMessage}
+ * </table>
*/
public abstract class RequiredDeliveryException extends AMQException
{
@@ -40,10 +48,10 @@ public abstract class RequiredDeliveryException extends AMQException
// Increment the reference as this message is in the routing phase
// and so will have the ref decremented as routing fails.
// we need to keep this message around so we can return it in the
- // handler. So increment here.
- _amqMessage = payload.takeReference();
-
- //payload.incrementReference();
+ // handler. So increment here.
+ _amqMessage = payload.takeReference();
+
+ // payload.incrementReference();
}
public AMQMessage getAMQMessage()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInUseException.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInUseException.java
index 6b2891c573..c77f114428 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInUseException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInUseException.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,6 +22,20 @@ package org.apache.qpid.server.exchange;
import org.apache.qpid.AMQException;
+/**
+ * ExchangeInUseRegistry indicates that an exchange cannot be unregistered because it is currently being used.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represents failure to unregister exchange that is in use.
+ * </table>
+ *
+ * @todo Not an AMQP exception as no status code.
+ *
+ * @todo This exception is not used. However, it is part of the ExchangeRegistry interface, and looks like code is
+ * going to need to be added to throw/deal with this. Alternatively ExchangeResitries may be able to handle the
+ * issue internally.
+ */
public class ExchangeInUseException extends AMQException
{
public ExchangeInUseException(String exchangeName)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/NoRouteException.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/NoRouteException.java
index c972b9d078..1d6ab3842d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/NoRouteException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/NoRouteException.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -25,8 +25,14 @@ import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.queue.AMQMessage;
/**
- * Thrown by an exchange if there is no way to route a message with the
- * mandatory flag set.
+ * NoRouteException is a {@link RequiredDeliveryException} that represents the failure case where a manadatory message
+ * cannot be delivered because there is no route for the message. The AMQP status code, 312, is always used to report
+ * this condition.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represent failure to deliver a message that must be delivered.
+ * </table>
*/
public class NoRouteException extends RequiredDeliveryException
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java
index 16d74b6fc0..a7599a3e0d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java
@@ -20,13 +20,25 @@
*/
package org.apache.qpid.server.protocol;
+import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.AMQException;
+/**
+ * AMQNoMethodHandlerException represents the case where no method handler exists to handle an AQMP method.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represents failure to handle an AMQP method.
+ * </table>
+ *
+ * @todo Not an AMQP exception as no status code.
+ *
+ * @todo Missing method handler. Unlikely to ever happen, and if it does its a coding error. Consider replacing with a
+ * Runtime.
+ */
public class AMQNoMethodHandlerException extends AMQException
{
-
public AMQNoMethodHandlerException(AMQMethodEvent<AMQMethodBody> evt)
{
super("AMQMethodEvent " + evt + " was not processed by any listener on Broker.");
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java
index 45d09e8f3e..6e72aa062f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java
@@ -20,14 +20,27 @@
*/
package org.apache.qpid.server.protocol;
-import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQDataBlock;
+/**
+ * UnknnownMessageTypeException represents a failure when Mina passes an unexpected frame type.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represents failure to cast a frame to its expected type.
+ * </table>
+ *
+ * @todo Not an AMQP exception as no status code.
+ *
+ * @todo Seems like this exception was created to handle an unsafe type cast that will never happen in practice. Would
+ * be better just to leave that as a ClassCastException. However, check the framing layer catches this error
+ * first.
+ */
public class UnknnownMessageTypeException extends AMQException
{
public UnknnownMessageTypeException(AMQDataBlock message)
{
super("Unknown message type: " + message.getClass().getName() + ": " + message);
-
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index fa3b34a634..a17cbb87ff 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.management.JMException;
import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.configuration.Configured;
import org.apache.qpid.framing.AMQShortString;
@@ -54,7 +55,19 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
*/
public class AMQQueue implements Managable, Comparable, StorableQueue
{
-
+ /**
+ * ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription
+ * already exists.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represent failure to create a subscription, because an exclusive subscription already exists.
+ * </table>
+ *
+ * @todo Not an AMQP exception as no status code.
+ *
+ * @todo Move to top level, used outside this class.
+ */
public static int s_queueID =0;
public static final class ExistingExclusiveSubscription extends AMQException
{
@@ -65,19 +78,27 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
}
}
+ /**
+ * ExistingSubscriptionPreventsExclusive signals a failure to create an exclusize subscription, as a subscription
+ * already exists.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represent failure to create an exclusize subscription, as a subscription already exists.
+ * </table>
+ *
+ * @todo Not an AMQP exception as no status code.
+ *
+ * @todo Move to top level, used outside this class.
+ */
public static final class ExistingSubscriptionPreventsExclusive extends AMQException
{
-
public ExistingSubscriptionPreventsExclusive()
{
super("");
}
}
- private static final ExistingExclusiveSubscription EXISTING_EXCLUSIVE = new ExistingExclusiveSubscription();
- private static final ExistingSubscriptionPreventsExclusive EXISTING_SUBSCRIPTION = new ExistingSubscriptionPreventsExclusive();
-
-
private static final Logger _logger = Logger.getLogger(AMQQueue.class);
private final AMQShortString _name;
@@ -134,7 +155,6 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
private final VirtualHost _virtualHost;
-
/**
* max allowed size(KB) of a single message
*/
@@ -175,40 +195,34 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
return _name.compareTo(((AMQQueue) o).getName());
}
- public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner,
- boolean autoDelete, VirtualHost virtualHost)
- throws
- AMQException
+ public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
+ throws AMQException
{
- this(name, durable, owner, autoDelete, virtualHost,
- AsyncDeliveryConfig.getAsyncDeliveryExecutor(), new SubscriptionSet(), new SubscriptionImpl.Factory());
+ this(name, durable, owner, autoDelete, virtualHost, AsyncDeliveryConfig.getAsyncDeliveryExecutor(),
+ new SubscriptionSet(), new SubscriptionImpl.Factory());
}
-
- protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner,
- boolean autoDelete, VirtualHost virtualHost,
- SubscriptionSet subscribers)
- throws
- AMQException
+ protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete,
+ VirtualHost virtualHost, SubscriptionSet subscribers) throws AMQException
{
- this(name, durable, owner, autoDelete, virtualHost,
- AsyncDeliveryConfig.getAsyncDeliveryExecutor(), subscribers, new SubscriptionImpl.Factory());
+ this(name, durable, owner, autoDelete, virtualHost, AsyncDeliveryConfig.getAsyncDeliveryExecutor(), subscribers,
+ new SubscriptionImpl.Factory());
}
- protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner,
- boolean autoDelete, VirtualHost virtualHost,
- Executor asyncDelivery, SubscriptionSet subscribers, SubscriptionFactory subscriptionFactory)
- throws
- AMQException
+ protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete,
+ VirtualHost virtualHost, Executor asyncDelivery, SubscriptionSet subscribers,
+ SubscriptionFactory subscriptionFactory) throws AMQException
{
if (name == null)
{
throw new IllegalArgumentException("Queue name must not be null");
}
+
if (virtualHost == null)
{
throw new IllegalArgumentException("Virtual Host must not be null");
}
+
_name = name;
_durable = durable;
_owner = owner;
@@ -304,10 +318,11 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
public AMQMessage getMessageOnTheQueue(long messageId)
{
List<AMQMessage> list = getMessagesOnTheQueue(messageId, messageId);
- if (list == null || list.size() == 0)
+ if ((list == null) || (list.size() == 0))
{
return null;
}
+
return list.get(0);
}
@@ -324,7 +339,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
* @param storeContext
*/
public synchronized void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName,
- StoreContext storeContext)
+ StoreContext storeContext)
{
// prepare the delivery manager for moving messages by stopping the async delivery and creating a lock
AMQQueue anotherQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
@@ -460,6 +475,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
throw new AMQException("Problem binding queue ", e);
}
}
+
_bindings.addBinding(routingKey, arguments, exchange);
}
@@ -478,25 +494,23 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
throw new AMQException("problem unbinding queue", e);
}
}
+
_bindings.remove(routingKey, arguments, exchange);
}
-
public void registerProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag, boolean acks,
- FieldTable filters, boolean noLocal, boolean exclusive)
- throws
- AMQException
+ FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException
{
if (incrementSubscriberCount() > 1)
{
if (isExclusive())
{
decrementSubscriberCount();
- throw EXISTING_EXCLUSIVE;
+ throw new ExistingExclusiveSubscription();
} else if (exclusive)
{
decrementSubscriberCount();
- throw EXISTING_SUBSCRIPTION;
+ throw new ExistingSubscriptionPreventsExclusive();
}
} else if (exclusive)
@@ -506,12 +520,13 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
if (_logger.isDebugEnabled())
{
- _logger.debug(MessageFormat.format("Registering protocol session {0} with channel {1} and " +
- "consumer tag {2} with {3}", ps, channel, consumerTag, this));
+ _logger.debug(MessageFormat.format(
+ "Registering protocol session {0} with channel {1} and " + "consumer tag {2} with {3}", ps, channel,
+ consumerTag, this));
}
- Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks,
- filters, noLocal, this);
+ Subscription subscription =
+ _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal, this);
if (subscription.filtersMessages())
{
@@ -524,7 +539,6 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
_subscribers.addSubscriber(subscription);
}
-
private boolean isExclusive()
{
return _isExclusive.get();
@@ -545,25 +559,25 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
return _subscriberCount.decrementAndGet();
}
-
public void unregisterProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag)
throws
AMQException
{
if (_logger.isDebugEnabled())
{
- _logger.debug(MessageFormat.format("Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}", ps, channel, consumerTag,
- this));
+ _logger.debug(MessageFormat.format(
+ "Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}", ps, channel,
+ consumerTag, this));
}
Subscription removedSubscription;
- if ((removedSubscription = _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel,
- ps,
- consumerTag)))
+
+ if ((removedSubscription =
+ _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel, ps, consumerTag)))
== null)
{
- throw new AMQException("Protocol session with channel " + channel + " and consumer tag " + consumerTag +
- " and protocol session key " + ps.getKey() + " not registered with queue " + this);
+ throw new AMQException("Protocol session with channel " + channel + " and consumer tag " + consumerTag
+ + " and protocol session key " + ps.getKey() + " not registered with queue " + this);
}
removedSubscription.close();
@@ -577,6 +591,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
{
_logger.info("Auto-deleteing queue:" + this);
}
+
autodelete();
// we need to manually fire the event to the removed subscription (which was the last one left for this
// queue. This is because the delete method uses the subscription set which has just been cleared
@@ -594,7 +609,6 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
return !_deliveryMgr.hasQueuedMessages();
}
-
public int delete(boolean checkUnused, boolean checkEmpty)
throws
AMQException
@@ -602,14 +616,17 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
if (checkUnused && !_subscribers.isEmpty())
{
_logger.info("Will not delete " + this + " as it is in use.");
+
return 0;
} else if (checkEmpty && _deliveryMgr.hasQueuedMessages())
{
_logger.info("Will not delete " + this + " as it is not empty.");
+
return 0;
} else
{
delete();
+
return _deliveryMgr.getQueueMessageCount();
}
}
@@ -628,6 +645,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
{
task.doTask(this);
}
+
_deleteTaskList.clear();
}
}
@@ -640,6 +658,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
{
_logger.debug(MessageFormat.format("autodeleting {0}", this));
}
+
delete();
}
@@ -647,7 +666,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
throws
AMQException
{
- //fixme not sure what this is doing. should we be passing deliverFirst through here?
+ // fixme not sure what this is doing. should we be passing deliverFirst through here?
// This code is not used so when it is perhaps it should
_deliveryMgr.deliver(storeContext, getName(), msg, deliverFirst);
try
@@ -663,10 +682,10 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
}
}
-// public DeliveryManager getDeliveryManager()
-// {
-// return _deliveryMgr;
-// }
+ // public DeliveryManager getDeliveryManager()
+ // {
+ // return _deliveryMgr;
+ // }
public void process(StoreContext storeContext, AMQMessage msg, boolean deliverFirst)
throws
@@ -696,10 +715,10 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
}
catch (MessageCleanupException e)
{
- //Message was dequeued, but could not then be deleted
- //though it is no longer referenced. This should be very
- //rare and can be detected and cleaned up on recovery or
- //done through some form of manual intervention.
+ // Message was dequeued, but could not then be deleted
+ // though it is no longer referenced. This should be very
+ // rare and can be detected and cleaned up on recovery or
+ // done through some form of manual intervention.
_logger.error(e, e);
}
catch (AMQException e)
@@ -743,7 +762,8 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
{
return true;
}
- if (o == null || getClass() != o.getClass())
+
+ if ((o == null) || (getClass() != o.getClass()))
{
return false;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java
index b74c49e6e1..6466e81dd2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,7 +23,18 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
/**
- * Signals that the dequeue of a message from a queue failed
+ * Signals that the dequeue of a message from a queue failed.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Indicates the a message could not be dequeued from a queue.
+ * <tr><td>
+ * </table>
+ *
+ * @todo Not an AMQP exception as no status code.
+ *
+ * @todo Happens as a consequence of a message store failure, or reference counting error. Both of which migh become
+ * runtime exceptions, as unrecoverable conditions? In which case this one might be dropped too.
*/
public class FailedDequeueException extends AMQException
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java
index 1e7e6f03d2..090096d3c3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,8 +23,20 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
/**
- * Signals that the removal of a message once its refcount reached
- * zero failed.
+ * MessageCleanupException represents the failure to perform reference counting on messages correctly. This should not
+ * happen, but there may be programming errors giving race conditions that cause the reference counting to go wrong.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Signals that the reference count of a message has gone below zero.
+ * <tr><td> Indicates that a message store has lost a message which is still referenced.
+ * </table>
+ *
+ * @todo Not an AMQP exception as no status code.
+ *
+ * @todo The race conditions leading to this error should be cleaned up, and a runtime exception used instead. If the
+ * message store loses messages, then something is seriously wrong and it would be sensible to terminate the
+ * broker. This may be disguising out of memory errors.
*/
public class MessageCleanupException extends AMQException
{
@@ -32,6 +44,7 @@ public class MessageCleanupException extends AMQException
{
super("Failed to cleanup message with id " + messageId, e);
}
+
public MessageCleanupException(String message)
{
super(message);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java
index c63490f019..d6fd1eec89 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,9 +24,14 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.RequiredDeliveryException;
/**
- * Signals that no consumers exist for a message at a given point in time.
- * Used if a message has immediate=true and there are no consumers registered
- * with the queue.
+ * NoConsumersException is a {@link RequiredDeliveryException} that represents the failure case where an immediate
+ * message cannot be delivered because there are presently no consumers for the message. The AMQP status code, 313, is
+ * always used to report this condition.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represent failure to deliver a message that must be delivered.
+ * </table>
*/
public class NoConsumersException extends RequiredDeliveryException
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
index 50129ec274..f96900d0a9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
@@ -26,6 +26,7 @@ import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.BasicAckBody;
@@ -35,6 +36,7 @@ import org.apache.qpid.framing.BasicGetBody;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.BasicQosBody;
import org.apache.qpid.framing.BasicRecoverBody;
+import org.apache.qpid.framing.BasicRejectBody;
import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.framing.ChannelCloseOkBody;
import org.apache.qpid.framing.ChannelFlowBody;
@@ -55,7 +57,6 @@ import org.apache.qpid.framing.QueuePurgeBody;
import org.apache.qpid.framing.TxCommitBody;
import org.apache.qpid.framing.TxRollbackBody;
import org.apache.qpid.framing.TxSelectBody;
-import org.apache.qpid.framing.BasicRejectBody;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.server.handler.BasicAckMethodHandler;
@@ -65,6 +66,7 @@ import org.apache.qpid.server.handler.BasicGetMethodHandler;
import org.apache.qpid.server.handler.BasicPublishMethodHandler;
import org.apache.qpid.server.handler.BasicQosHandler;
import org.apache.qpid.server.handler.BasicRecoverMethodHandler;
+import org.apache.qpid.server.handler.BasicRejectMethodHandler;
import org.apache.qpid.server.handler.ChannelCloseHandler;
import org.apache.qpid.server.handler.ChannelCloseOkHandler;
import org.apache.qpid.server.handler.ChannelFlowHandler;
@@ -83,9 +85,8 @@ import org.apache.qpid.server.handler.QueueDeclareHandler;
import org.apache.qpid.server.handler.QueueDeleteHandler;
import org.apache.qpid.server.handler.QueuePurgeHandler;
import org.apache.qpid.server.handler.TxCommitHandler;
-import org.apache.qpid.server.handler.BasicRejectMethodHandler;
-import org.apache.qpid.server.handler.TxSelectHandler;
import org.apache.qpid.server.handler.TxRollbackHandler;
+import org.apache.qpid.server.handler.TxSelectHandler;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
@@ -107,18 +108,18 @@ public class AMQStateManager implements AMQMethodListener
* AMQFrame.
*/
private final EnumMap<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>> _state2HandlersMap =
- new EnumMap<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>>(AMQState.class);
-
+ new EnumMap<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>>(
+ AMQState.class);
private CopyOnWriteArraySet<StateListener> _stateListeners = new CopyOnWriteArraySet<StateListener>();
-
public AMQStateManager(VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession)
{
this(AMQState.CONNECTION_NOT_STARTED, true, virtualHostRegistry, protocolSession);
}
- protected AMQStateManager(AMQState initial, boolean register, VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession)
+ protected AMQStateManager(AMQState initial, boolean register, VirtualHostRegistry virtualHostRegistry,
+ AMQProtocolSession protocolSession)
{
_virtualHostRegistry = virtualHostRegistry;
_protocolSession = protocolSession;
@@ -220,37 +221,38 @@ public class AMQStateManager implements AMQMethodListener
checkChannel(evt, _protocolSession);
handler.methodReceived(this, evt);
+
return true;
}
+
return false;
}
private <B extends AMQMethodBody> void checkChannel(AMQMethodEvent<B> evt, AMQProtocolSession protocolSession)
- throws AMQException
+ throws AMQException
{
- if (evt.getChannelId() != 0
- && !(evt.getMethod() instanceof ChannelOpenBody)
- && (protocolSession.getChannel(evt.getChannelId()) == null)
- && !protocolSession.channelAwaitingClosure(evt.getChannelId()))
+ if ((evt.getChannelId() != 0) && !(evt.getMethod() instanceof ChannelOpenBody)
+ && (protocolSession.getChannel(evt.getChannelId()) == null)
+ && !protocolSession.channelAwaitingClosure(evt.getChannelId()))
{
throw evt.getMethod().getChannelNotFoundException(evt.getChannelId());
}
}
protected <B extends AMQMethodBody> StateAwareMethodListener<B> findStateTransitionHandler(AMQState currentState,
- B frame)
- throws IllegalStateTransitionException
+ B frame)
+ // throws IllegalStateTransitionException
{
- final Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>
- classToHandlerMap = _state2HandlersMap.get(currentState);
+ final Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>> classToHandlerMap =
+ _state2HandlersMap.get(currentState);
- final StateAwareMethodListener<B> handler = classToHandlerMap == null
- ? null
- : (StateAwareMethodListener<B>) classToHandlerMap.get(frame.getClass());
+ final StateAwareMethodListener<B> handler =
+ (classToHandlerMap == null) ? null : (StateAwareMethodListener<B>) classToHandlerMap.get(frame.getClass());
if (handler == null)
{
_logger.debug("No state transition handler defined for receiving frame " + frame);
+
return null;
}
else
diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/IllegalStateTransitionException.java b/java/broker/src/main/java/org/apache/qpid/server/state/IllegalStateTransitionException.java
index 2d7cc27a85..cec67a8a6d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/state/IllegalStateTransitionException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/state/IllegalStateTransitionException.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,6 +22,11 @@ package org.apache.qpid.server.state;
import org.apache.qpid.AMQException;
+/**
+ * @todo Not an AMQP exception as no status code.
+ *
+ * @todo Not used! Delete.
+ */
public class IllegalStateTransitionException extends AMQException
{
private AMQState _originalState;
@@ -30,8 +35,7 @@ public class IllegalStateTransitionException extends AMQException
public IllegalStateTransitionException(AMQState originalState, Class frame)
{
- super("No valid state transition defined for receiving frame " + frame +
- " from state " + originalState);
+ super("No valid state transition defined for receiving frame " + frame + " from state " + originalState);
_originalState = originalState;
_frame = frame;
}