summaryrefslogtreecommitdiff
path: root/java/broker
diff options
context:
space:
mode:
authorRupert Smith <rupertlssmith@apache.org>2007-05-21 15:11:23 +0000
committerRupert Smith <rupertlssmith@apache.org>2007-05-21 15:11:23 +0000
commit625e140b590838df60d603f42d552a9275aae2ca (patch)
tree6b0acb350f3ead0da52b0301bfee74101d6bb7d4 /java/broker
parent21d2df094acb8530b2fb902b5ed9a1d7db8463fd (diff)
downloadqpid-python-625e140b590838df60d603f42d552a9275aae2ca.tar.gz
Refactored exceptions to have single constructors and made room for wrapped causes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@540165 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
-rw-r--r--java/broker/src/main/grammar/SelectorParser.jj2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ConsumerTagNotUniqueException.java20
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInUseException.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/NoRouteException.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java14
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java559
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java157
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ExistingExclusiveSubscriptionException.java22
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ExistingSubscriptionPreventsExclusiveException.java22
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/state/IllegalStateTransitionException.java52
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java14
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java6
37 files changed, 456 insertions, 540 deletions
diff --git a/java/broker/src/main/grammar/SelectorParser.jj b/java/broker/src/main/grammar/SelectorParser.jj
index adec1b348d..61638e5e26 100644
--- a/java/broker/src/main/grammar/SelectorParser.jj
+++ b/java/broker/src/main/grammar/SelectorParser.jj
@@ -94,7 +94,7 @@ public class SelectorParser {
return this.JmsSelector();
}
catch (Throwable e) {
- throw (AMQInvalidArgumentException)new AMQInvalidArgumentException(sql).initCause(e);
+ throw new AMQInvalidArgumentException(sql, e);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 1de4d16ad4..5de59f47a3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -212,7 +212,7 @@ public class AMQChannel
{
if (_currentMessage == null)
{
- throw new AMQException("Received content header without previously receiving a BasicPublish frame");
+ throw new AMQException(null, "Received content header without previously receiving a BasicPublish frame", null);
}
else
{
@@ -239,7 +239,7 @@ public class AMQChannel
{
if (_currentMessage == null)
{
- throw new AMQException("Received content body without previously receiving a JmsPublishBody");
+ throw new AMQException(null, "Received content body without previously receiving a JmsPublishBody", null);
}
if (_log.isTraceEnabled())
@@ -883,7 +883,7 @@ public class AMQChannel
{
if (!isTransactional())
{
- throw new AMQException("Fatal error: commit called on non-transactional channel");
+ throw new AMQException(null, "Fatal error: commit called on non-transactional channel", null);
}
_txnContext.commit();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ConsumerTagNotUniqueException.java b/java/broker/src/main/java/org/apache/qpid/server/ConsumerTagNotUniqueException.java
index 9a98af5689..3253650d14 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ConsumerTagNotUniqueException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ConsumerTagNotUniqueException.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,6 +20,16 @@
*/
package org.apache.qpid.server;
-public class ConsumerTagNotUniqueException extends Exception
-{
-}
+/**
+ * ConsumerTagNotUniqueException indicates that a client has attempted to connect with a consumer tag that is already
+ * used.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represents error when clients connects with a non-unique tag.
+ * </table>
+ *
+ * @todo Consider replacing with an AMQNotAllowedException, as this is the status code returned when this happens.
+ */
+public class ConsumerTagNotUniqueException extends Exception
+{ }
diff --git a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
index d61bb8916a..37c5f38ea3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
@@ -41,9 +41,9 @@ public abstract class RequiredDeliveryException extends AMQException
{
private final AMQMessage _amqMessage;
- public RequiredDeliveryException(String message, AMQMessage payload)
+ public RequiredDeliveryException(String message, AMQMessage payload, Throwable cause)
{
- super(message);
+ super(null, message, cause);
// Increment the reference as this message is in the routing phase
// and so will have the ref decremented as routing fails.
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
index 30bbdea2ef..1604d94539 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
@@ -181,8 +181,8 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
if (unacked.getKey() > deliveryTag)
{
//This should not occur now.
- throw new AMQException("UnacknowledgedMessageMap is out of order:" + unacked.getKey() +
- " When deliveryTag is:" + deliveryTag + "ES:" + _map.entrySet().toString());
+ throw new AMQException(null, "UnacknowledgedMessageMap is out of order:" + unacked.getKey() +
+ " When deliveryTag is:" + deliveryTag + "ES:" + _map.entrySet().toString(), null);
}
it.remove();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
index c349b44d6d..39a5bba8b7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
@@ -55,7 +55,7 @@ public class DefaultExchangeFactory implements ExchangeFactory
if (exchClass == null)
{
- throw new AMQUnknownExchangeType("Unknown exchange type: " + type);
+ throw new AMQUnknownExchangeType("Unknown exchange type: " + type, null);
}
try
{
@@ -65,11 +65,11 @@ public class DefaultExchangeFactory implements ExchangeFactory
}
catch (InstantiationException e)
{
- throw new AMQException("Unable to create exchange: " + e, e);
+ throw new AMQException(null, "Unable to create exchange: " + e, e);
}
catch (IllegalAccessException e)
{
- throw new AMQException("Unable to create exchange: " + e, e);
+ throw new AMQException(null, "Unable to create exchange: " + e, e);
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
index 9066af70d9..f3bdecc32e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
@@ -71,7 +71,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
getMessageStore().createExchange(exchange);
} catch (InternalErrorException e)
{
- throw new AMQException("problem registering excahgne " + exchange, e);
+ throw new AMQException(null, "problem registering excahgne " + exchange, e);
}
}
}
@@ -99,14 +99,14 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
getMessageStore().removeExchange(e);
} catch (InternalErrorException e1)
{
- throw new AMQException("Problem unregistering Exchange " + name, e1);
+ throw new AMQException(null, "Problem unregistering Exchange " + name, e1);
}
}
e.close();
}
else
{
- throw new AMQException("Unknown exchange " + name);
+ throw new AMQException(null, "Unknown exchange " + name, null);
}
}
@@ -138,7 +138,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
// TODO: check where the exchange is validated
if (exch == null)
{
- throw new AMQException("Exchange '" + exchange + "' does not exist");
+ throw new AMQException(null, "Exchange '" + exchange + "' does not exist", null);
}
exch.route(payload);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
index ab103fbd2a..01242f90de 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
@@ -126,7 +126,7 @@ public class DestNameExchange extends AbstractExchange
catch (JMException ex)
{
_logger.error("Exception occured in creating the direct exchange mbean", ex);
- throw new AMQException("Exception occured in creating the direct exchange mbean", ex);
+ throw new AMQException(null, "Exception occured in creating the direct exchange mbean", ex);
}
}
@@ -156,8 +156,8 @@ public class DestNameExchange extends AbstractExchange
if (!_index.remove(routingKey, queue))
{
- throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() +
- " with routing key " + routingKey + ". No queue was registered with that routing key");
+ throw new AMQException(null, "Queue " + queue + " was not registered with exchange " + this.getName() +
+ " with routing key " + routingKey + ". No queue was registered with that routing key", null);
}
}
@@ -171,7 +171,7 @@ public class DestNameExchange extends AbstractExchange
String msg = "Routing key " + routingKey + " is not known to " + this;
if (info.isMandatory())
{
- throw new NoRouteException(msg, payload);
+ throw new NoRouteException(msg, payload, null);
}
else
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
index e9c5b0024c..25ec0c3a2d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
@@ -216,7 +216,7 @@ public class DestWildExchange extends AbstractExchange
if (info.isMandatory())
{
String msg = "Topic " + routingKey + " is not known to " + this;
- throw new NoRouteException(msg, payload);
+ throw new NoRouteException(msg, payload, null);
}
else
{
@@ -276,15 +276,15 @@ public class DestWildExchange extends AbstractExchange
List<AMQQueue> queues = _routingKey2queues.get(routingKey);
if (queues == null)
{
- throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() +
- " with routing key " + routingKey + ". No queue was registered with that routing key");
+ throw new AMQException(null, "Queue " + queue + " was not registered with exchange " + this.getName() +
+ " with routing key " + routingKey + ". No queue was registered with that routing key", null);
}
boolean removedQ = queues.remove(queue);
if (!removedQ)
{
- throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() +
- " with routing key " + routingKey);
+ throw new AMQException(null, "Queue " + queue + " was not registered with exchange " + this.getName() +
+ " with routing key " + routingKey, null);
}
if (queues.isEmpty())
{
@@ -301,7 +301,7 @@ public class DestWildExchange extends AbstractExchange
catch (JMException ex)
{
_logger.error("Exception occured in creating the topic exchenge mbean", ex);
- throw new AMQException("Exception occured in creating the topic exchenge mbean", ex);
+ throw new AMQException(null, "Exception occured in creating the topic exchenge mbean", ex);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInUseException.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInUseException.java
index c77f114428..07550dd808 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInUseException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInUseException.java
@@ -38,8 +38,8 @@ import org.apache.qpid.AMQException;
*/
public class ExchangeInUseException extends AMQException
{
- public ExchangeInUseException(String exchangeName)
+ public ExchangeInUseException(String exchangeName, Throwable cause)
{
- super("Exchange " + exchangeName + " is currently in use");
+ super(null, "Exchange " + exchangeName + " is currently in use", cause);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
index b3690d3e10..28d4b19f2e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
@@ -98,7 +98,7 @@ public class FanoutExchange extends AbstractExchange
catch (JMException ex)
{
_logger.error("Exception occured in creating the direct exchange mbean", ex);
- throw new AMQException("Exception occured in creating the direct exchange mbean", ex);
+ throw new AMQException(null, "Exception occured in creating the direct exchange mbean", ex);
}
}
@@ -129,8 +129,8 @@ public class FanoutExchange extends AbstractExchange
if (!_queues.remove(queue))
{
- throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() +
- ". ");
+ throw new AMQException(null, "Queue " + queue + " was not registered with exchange " + this.getName() +
+ ". ", null);
}
}
@@ -143,7 +143,7 @@ public class FanoutExchange extends AbstractExchange
String msg = "No queues bound to " + this;
if (publishInfo.isMandatory())
{
- throw new NoRouteException(msg, payload);
+ throw new NoRouteException(msg, payload, null);
}
else
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
index b4b2bc20bc..8205924207 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
@@ -231,7 +231,7 @@ public class HeadersExchange extends AbstractExchange
if (payload.getMessagePublishInfo().isMandatory())
{
- throw new NoRouteException(msg, payload);
+ throw new NoRouteException(msg, payload, null);
}
else
{
@@ -284,7 +284,7 @@ public class HeadersExchange extends AbstractExchange
catch (JMException ex)
{
_logger.error("Exception occured in creating the HeadersExchangeMBean", ex);
- throw new AMQException("Exception occured in creating the HeadersExchangeMBean", ex);
+ throw new AMQException(null, "Exception occured in creating the HeadersExchangeMBean", ex);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/NoRouteException.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/NoRouteException.java
index 1d6ab3842d..c787103c00 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/NoRouteException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/NoRouteException.java
@@ -36,9 +36,9 @@ import org.apache.qpid.server.queue.AMQMessage;
*/
public class NoRouteException extends RequiredDeliveryException
{
- public NoRouteException(String msg, AMQMessage message)
+ public NoRouteException(String msg, AMQMessage message, Throwable cause)
{
- super(msg, message);
+ super(msg, message, cause);
}
public AMQConstant getReplyCode()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
index 56eae279dc..9346eecbb2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
@@ -33,6 +33,8 @@ import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.ConsumerTagNotUniqueException;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.ExistingExclusiveSubscriptionException;
+import org.apache.qpid.server.queue.ExistingSubscriptionPreventsExclusiveException;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -146,14 +148,14 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
AMQConstant.NOT_ALLOWED.getCode(), // replyCode
msg)); // replyText
}
- catch (AMQQueue.ExistingExclusiveSubscription e)
+ catch (ExistingExclusiveSubscriptionException e)
{
throw body.getChannelException(AMQConstant.ACCESS_REFUSED,
"Cannot subscribe to queue "
+ queue.getName()
+ " as it already has an existing exclusive consumer");
}
- catch (AMQQueue.ExistingSubscriptionPreventsExclusive e)
+ catch (ExistingSubscriptionPreventsExclusiveException e)
{
throw body.getChannelException(AMQConstant.ACCESS_REFUSED,
"Cannot subscribe to queue "
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
index fef00942a0..43986adea7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
@@ -69,7 +69,7 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener
SaslServer ss = session.getSaslServer();
if (ss == null)
{
- throw new AMQException("No SASL context set up in session");
+ throw new AMQException(null, "No SASL context set up in session", null);
}
AuthenticationResult authResult = authMgr.authenticate(ss, body.response);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
index 29d6c26b66..5dbd1b18de 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
@@ -138,7 +138,7 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener<
catch (SaslException e)
{
disposeSaslServer(session);
- throw new AMQException("SASL error: " + e, e);
+ throw new AMQException(null, "SASL error: " + e, e);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
index 2b123bcb2d..f24c96f87f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
@@ -79,7 +79,7 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
AMQShortString routingKey = body.routingKey;
if (exchangeName == null)
{
- throw new AMQException("Exchange exchange must not be null");
+ throw new AMQException(null, "Exchange exchange must not be null", null);
}
Exchange exchange = virtualHost.getExchangeRegistry().getExchange(exchangeName);
AMQFrame response;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
index be3ffcc698..855d1a2add 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
@@ -96,7 +96,7 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange
else if (!exchange.getType().equals(body.type))
{
- throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + body.exchange + " of type " + exchange.getType() + " to " + body.type +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor());
+ throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + body.exchange + " of type " + exchange.getType() + " to " + body.type +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(), null);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
index ec9041c309..f9e94af697 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
@@ -110,7 +110,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
store.createQueue(queue);
} catch (Exception e)
{
- throw new AMQException("Problem when creating queue " + queue, e);
+ throw new AMQException(null, "Problem when creating queue " + queue, e);
}
}
queueRegistry.registerQueue(queue);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
index cfea4637ab..eb89bf78e5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
@@ -114,7 +114,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB
store.destroyQueue(queue);
} catch (Exception e)
{
- throw new AMQException("problem when destroying queue " + queue, e);
+ throw new AMQException(null, "problem when destroying queue " + queue, e);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java b/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java
index 84526dbc11..31313cf024 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java
@@ -71,7 +71,7 @@ public abstract class DefaultManagedObject extends StandardMBean implements Mana
}
catch (JMException e)
{
- throw new AMQException("Error registering managed object " + this + ": " + e, e);
+ throw new AMQException(null, "Error registering managed object " + this + ": " + e, e);
}
}
@@ -88,7 +88,7 @@ public abstract class DefaultManagedObject extends StandardMBean implements Mana
}
catch (JMException e)
{
- throw new AMQException("Error unregistering managed object: " + this + ": " + e, e);
+ throw new AMQException(null, "Error unregistering managed object: " + this + ": " + e, e);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index d430f1af94..82e969b496 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -168,7 +168,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
catch (JMException ex)
{
_logger.error("AMQProtocolSession MBean creation has failed ", ex);
- throw new AMQException("AMQProtocolSession MBean creation has failed ", ex);
+ throw new AMQException(null, "AMQProtocolSession MBean creation has failed ", ex);
}
}
@@ -199,7 +199,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
}
else
{
- throw new UnknnownMessageTypeException(message);
+ throw new UnknnownMessageTypeException(message, null);
}
}
@@ -321,7 +321,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
}
if (!wasAnyoneInterested)
{
- throw new AMQNoMethodHandlerException(evt);
+ throw new AMQNoMethodHandlerException(evt, null);
}
}
catch (AMQChannelException e)
@@ -425,7 +425,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
AMQChannel channel = getChannel(channelId);
if (channel == null)
{
- throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId);
+ throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId, null);
}
return channel;
}
@@ -454,14 +454,14 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
if (_closed)
{
- throw new AMQException("Session is closed");
+ throw new AMQException(null, "Session is closed", null);
}
final int channelId = channel.getChannelId();
if (_closingChannelsList.contains(channelId))
{
- throw new AMQException("Session is marked awaiting channel close");
+ throw new AMQException(null, "Session is marked awaiting channel close", null);
}
if (_channelMap.size() == _maxNoOfChannels)
@@ -469,7 +469,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
String errorMessage = toString() + ": maximum number of channels has been reached (" +
_maxNoOfChannels + "); can't create channel";
_logger.error(errorMessage);
- throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage);
+ throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage, null);
}
else
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java
index a7599a3e0d..ee6e090e24 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java
@@ -39,8 +39,8 @@ import org.apache.qpid.protocol.AMQMethodEvent;
*/
public class AMQNoMethodHandlerException extends AMQException
{
- public AMQNoMethodHandlerException(AMQMethodEvent<AMQMethodBody> evt)
+ public AMQNoMethodHandlerException(AMQMethodEvent<AMQMethodBody> evt, Throwable cause)
{
- super("AMQMethodEvent " + evt + " was not processed by any listener on Broker.");
+ super(null, "AMQMethodEvent " + evt + " was not processed by any listener on Broker.", cause);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java
index 6e72aa062f..d053884e69 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java
@@ -39,8 +39,8 @@ import org.apache.qpid.framing.AMQDataBlock;
*/
public class UnknnownMessageTypeException extends AMQException
{
- public UnknnownMessageTypeException(AMQDataBlock message)
+ public UnknnownMessageTypeException(AMQDataBlock message, Throwable cause)
{
- super("Unknown message type: " + message.getClass().getName() + ": " + message);
+ super(null, "Unknown message type: " + message.getClass().getName() + ": " + message, cause);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index 6ffe1af018..95f75fdb36 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -20,31 +20,33 @@
*/
package org.apache.qpid.server.queue;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+/** Combines the information that make up a deliverable message into a more manageable form. */
+
+import org.apache.log4j.Logger;
+
+import org.apache.mina.common.ByteBuffer;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.messageStore.MessageStore;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.messageStore.StorableMessage;
import org.apache.qpid.server.messageStore.StorableQueue;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.registry.ApplicationRegistry;
-
-/** Combines the information that make up a deliverable message into a more manageable form. */
-
-import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
-
-import java.util.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.txn.TransactionalContext;
/**
* Combines the information that make up a deliverable message into a more manageable form.
@@ -56,7 +58,7 @@ public class AMQMessage implements StorableMessage
// The ordered list of queues into which this message is enqueued.
private List<StorableQueue> _queues = new LinkedList<StorableQueue>();
// Indicates whether this message is staged
- private boolean _isStaged = false;
+ private boolean _isStaged = false;
/**
* Used in clustering
@@ -89,18 +91,15 @@ public class AMQMessage implements StorableMessage
*/
private boolean _immediate;
- // private Subscription _takenBySubcription;
- // private AtomicBoolean _taken = new AtomicBoolean(false);
+ // private Subscription _takenBySubcription;
+ // private AtomicBoolean _taken = new AtomicBoolean(false);
private TransientMessageData _transientMessageData = new TransientMessageData();
-
private Set<Subscription> _rejectedBy = null;
-
private Map<AMQQueue, AtomicBoolean> _takenMap = new HashMap<AMQQueue, AtomicBoolean>();
private Map<AMQQueue, Subscription> _takenBySubcriptionMap = new HashMap<AMQQueue, Subscription>();
-
private final int hashcode = System.identityHashCode(this);
private long _expiration;
@@ -111,8 +110,10 @@ public class AMQMessage implements StorableMessage
public void setExpiration()
{
- long expiration = ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getExpiration();
- long timestamp = ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getTimestamp();
+ long expiration =
+ ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getExpiration();
+ long timestamp =
+ ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getTimestamp();
if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("advanced.synced-clocks", false))
{
@@ -125,10 +126,10 @@ public class AMQMessage implements StorableMessage
{
if (timestamp != 0L)
{
- //todo perhaps use arrival time
+ // todo perhaps use arrival time
long diff = (System.currentTimeMillis() - timestamp);
- if (diff > 1000L || diff < 1000L)
+ if ((diff > 1000L) || (diff < 1000L))
{
_expiration = expiration + diff;
}
@@ -159,11 +160,12 @@ public class AMQMessage implements StorableMessage
{
try
{
- return _index < _messageHandle.getBodyCount(getStoreContext(), _messageId) - 1;
+ return _index < (_messageHandle.getBodyCount(getStoreContext(), _messageId) - 1);
}
catch (AMQException e)
{
_log.error("Unable to get body count: " + e, e);
+
return false;
}
}
@@ -173,7 +175,10 @@ public class AMQMessage implements StorableMessage
try
{
- AMQBody cb = getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(), _messageId, ++_index));
+ AMQBody cb =
+ getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(),
+ _messageId, ++_index));
+
return new AMQFrame(_channel, cb);
}
catch (AMQException e)
@@ -209,11 +214,12 @@ public class AMQMessage implements StorableMessage
{
try
{
- return _index < _messageHandle.getBodyCount(getStoreContext(), _messageId) - 1;
+ return _index < (_messageHandle.getBodyCount(getStoreContext(), _messageId) - 1);
}
catch (AMQException e)
{
_log.error("Error getting body count: " + e, e);
+
return false;
}
}
@@ -236,8 +242,7 @@ public class AMQMessage implements StorableMessage
}
}
- public AMQMessage(Long messageId, MessagePublishInfo info,
- TransactionalContext txnContext)
+ public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext)
{
_messageId = messageId;
_txnContext = txnContext;
@@ -257,8 +262,7 @@ public class AMQMessage implements StorableMessage
* @throws AMQException
*/
public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory, TransactionalContext txnConext)
- throws
- AMQException
+ throws AMQException
{
_messageId = messageId;
_messageHandle = factory.createMessageHandle(store, this, true);
@@ -274,10 +278,8 @@ public class AMQMessage implements StorableMessage
* @param txnContext
* @param contentHeader
*/
- public AMQMessage(Long messageId, MessagePublishInfo info,
- TransactionalContext txnContext, ContentHeaderBody contentHeader)
- throws
- AMQException
+ public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext,
+ ContentHeaderBody contentHeader) throws AMQException
{
this(messageId, info, txnContext);
setContentHeaderBody(contentHeader);
@@ -294,13 +296,9 @@ public class AMQMessage implements StorableMessage
* @param contentBodies
* @throws AMQException
*/
- public AMQMessage(Long messageId, MessagePublishInfo info,
- TransactionalContext txnContext,
- ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues,
- List<ContentChunk> contentBodies, MessageStore messageStore, StoreContext storeContext,
- MessageHandleFactory messageHandleFactory)
- throws
- AMQException
+ public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext,
+ ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues, List<ContentChunk> contentBodies,
+ MessageStore messageStore, StoreContext storeContext, MessageHandleFactory messageHandleFactory) throws AMQException
{
this(messageId, info, txnContext, contentHeader);
_transientMessageData.setDestinationQueues(destinationQueues);
@@ -311,9 +309,7 @@ public class AMQMessage implements StorableMessage
}
}
- protected AMQMessage(AMQMessage msg)
- throws
- AMQException
+ protected AMQMessage(AMQMessage msg) throws AMQException
{
_messageId = msg._messageId;
_messageHandle = msg._messageHandle;
@@ -322,9 +318,9 @@ public class AMQMessage implements StorableMessage
_transientMessageData = msg._transientMessageData;
}
- //========================================================================
+ // ========================================================================
// Interface StorableMessage
- //========================================================================
+ // ========================================================================
public long getMessageId()
{
@@ -342,10 +338,12 @@ public class AMQMessage implements StorableMessage
result = new byte[headerBody.getSize()];
bufferedResult = ByteBuffer.wrap(result);
headerBody.writePayload(bufferedResult);
- } catch (AMQException e)
+ }
+ catch (AMQException e)
{
_log.error("Error when getting message header", e);
}
+
return result;
}
@@ -355,10 +353,12 @@ public class AMQMessage implements StorableMessage
try
{
result = _messageHandle.getContentHeaderBody(_txnContext.getStoreContext(), _messageId).getSize();
- } catch (AMQException e)
+ }
+ catch (AMQException e)
{
_log.error("Error when getting message header size", e);
}
+
return result;
}
@@ -372,7 +372,7 @@ public class AMQMessage implements StorableMessage
return _messageHandle.getMessagePayload().length;
}
- public boolean isEnqueued()
+ public boolean isEnqueued()
{
return _queues.size() > 0;
}
@@ -401,6 +401,7 @@ public class AMQMessage implements StorableMessage
{
_log.debug("The queue position is " + _queues.indexOf(queue));
}
+
return _queues.indexOf(queue);
}
@@ -424,44 +425,40 @@ public class AMQMessage implements StorableMessage
return new BodyContentIterator();
}
- public ContentHeaderBody getContentHeaderBody()
- throws
- AMQException
+ public ContentHeaderBody getContentHeaderBody() throws AMQException
{
if (_transientMessageData != null)
{
return _transientMessageData.getContentHeaderBody();
- } else
+ }
+ else
{
return _messageHandle.getContentHeaderBody(getStoreContext(), _messageId);
}
}
- public void setContentHeaderBody(ContentHeaderBody contentHeaderBody)
- throws
- AMQException
+ public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) throws AMQException
{
_transientMessageData.setContentHeaderBody(contentHeaderBody);
}
public void routingComplete(MessageStore store, StoreContext storeContext, MessageHandleFactory factory)
- throws
- AMQException
+ throws AMQException
{
final boolean persistent = isPersistent();
_messageHandle = factory.createMessageHandle(store, this, persistent);
- //if (persistent)
- // {
- _txnContext.beginTranIfNecessary();
- // }
+ // if (persistent)
+ // {
+ _txnContext.beginTranIfNecessary();
+ // }
// enqueuing the messages ensure that if required the destinations are recorded to a
// persistent store
- // for (AMQQueue q : _transientMessageData.getDestinationQueues())
- // {
- // _messageHandle.enqueue(storeContext, _messageId, q);
- // }
+ // for (AMQQueue q : _transientMessageData.getDestinationQueues())
+ // {
+ // _messageHandle.enqueue(storeContext, _messageId, q);
+ // }
if (_transientMessageData.getContentHeaderBody().bodySize == 0)
{
@@ -469,9 +466,7 @@ public class AMQMessage implements StorableMessage
}
}
- public boolean addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk)
- throws
- AMQException
+ public boolean addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk) throws AMQException
{
_transientMessageData.addBodyLength(contentChunk.getSize());
final boolean allContentReceived = isAllContentReceived();
@@ -479,21 +474,20 @@ public class AMQMessage implements StorableMessage
if (allContentReceived)
{
deliver(storeContext);
+
return true;
- } else
+ }
+ else
{
return false;
}
}
- public boolean isAllContentReceived()
- throws
- AMQException
+ public boolean isAllContentReceived() throws AMQException
{
return _transientMessageData.isAllContentReceived();
}
-
/**
* Creates a long-lived reference to this message, and increments the count of such references, as an atomic
* operation.
@@ -501,6 +495,7 @@ public class AMQMessage implements StorableMessage
public AMQMessage takeReference()
{
_referenceCount.incrementAndGet();
+
return this;
}
@@ -510,10 +505,10 @@ public class AMQMessage implements StorableMessage
protected void incrementReference()
{
_referenceCount.incrementAndGet();
-// if (_log.isDebugEnabled())
-// {
-// _log.debug("Ref count on message " + debugIdentity() + " incremented " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
-// }
+ // if (_log.isDebugEnabled())
+ // {
+ // _log.debug("Ref count on message " + debugIdentity() + " incremented " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
+ // }
}
/**
@@ -524,9 +519,7 @@ public class AMQMessage implements StorableMessage
* @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
* failed
*/
- public void decrementReference(StoreContext storeContext)
- throws
- MessageCleanupException
+ public void decrementReference(StoreContext storeContext) throws MessageCleanupException
{
int count = _referenceCount.decrementAndGet();
@@ -538,10 +531,10 @@ public class AMQMessage implements StorableMessage
{
try
{
-// if (_log.isDebugEnabled())
-// {
-// _log.debug("Decremented ref count on message " + debugIdentity() + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
-// }
+ // if (_log.isDebugEnabled())
+ // {
+ // _log.debug("Decremented ref count on message " + debugIdentity() + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
+ // }
// must check if the handle is null since there may be cases where we decide to throw away a message
// and the handle has not yet been constructed
@@ -552,15 +545,17 @@ public class AMQMessage implements StorableMessage
}
catch (AMQException e)
{
- //to maintain consistency, we revert the count
+ // to maintain consistency, we revert the count
incrementReference();
- throw new MessageCleanupException(_messageId, e);
+ throw new MessageCleanupException("Failed to cleanup message with id " + _messageId, e);
}
- } else
+ }
+ else
{
if (count < 0)
{
- throw new MessageCleanupException("Reference count for message id " + debugIdentity() + " has gone below 0.");
+ throw new MessageCleanupException("Reference count for message id " + debugIdentity() + " has gone below 0.",
+ null);
}
}
}
@@ -587,7 +582,7 @@ public class AMQMessage implements StorableMessage
public boolean isTaken(AMQQueue queue)
{
- //return _taken.get();
+ // return _taken.get();
synchronized (this)
{
@@ -604,15 +599,15 @@ public class AMQMessage implements StorableMessage
public boolean taken(AMQQueue queue, Subscription sub)
{
-// if (_taken.getAndSet(true))
-// {
-// return true;
-// }
-// else
-// {
-// _takenBySubcription = sub;
-// return false;
-// }
+ // if (_taken.getAndSet(true))
+ // {
+ // return true;
+ // }
+ // else
+ // {
+ // _takenBySubcription = sub;
+ // return false;
+ // }
synchronized (this)
{
@@ -625,10 +620,12 @@ public class AMQMessage implements StorableMessage
if (taken.getAndSet(true))
{
return true;
- } else
+ }
+ else
{
_takenMap.put(queue, taken);
_takenBySubcriptionMap.put(queue, sub);
+
return false;
}
}
@@ -641,9 +638,8 @@ public class AMQMessage implements StorableMessage
_log.trace("Releasing Message:" + debugIdentity());
}
-// _taken.set(false);
-// _takenBySubcription = null;
-
+ // _taken.set(false);
+ // _takenBySubcription = null;
synchronized (this)
{
@@ -651,7 +647,8 @@ public class AMQMessage implements StorableMessage
if (taken == null)
{
taken = new AtomicBoolean(false);
- } else
+ }
+ else
{
taken.set(false);
}
@@ -672,9 +669,11 @@ public class AMQMessage implements StorableMessage
if (_tokens.contains(token))
{
return true;
- } else
+ }
+ else
{
_tokens.add(token);
+
return false;
}
}
@@ -687,28 +686,23 @@ public class AMQMessage implements StorableMessage
* @param queue the queue
* @throws org.apache.qpid.AMQException if there is an error enqueuing the message
*/
- public void enqueue(AMQQueue queue)
- throws
- AMQException
+ public void enqueue(AMQQueue queue) throws AMQException
{
_transientMessageData.addDestinationQueue(queue);
}
- public void dequeue(StoreContext storeContext, AMQQueue queue)
- throws
- AMQException
+ public void dequeue(StoreContext storeContext, AMQQueue queue) throws AMQException
{
_messageHandle.dequeue(storeContext, _messageId, queue);
}
- public boolean isPersistent()
- throws
- AMQException
+ public boolean isPersistent() throws AMQException
{
if (_transientMessageData != null)
{
return _transientMessageData.isPersistent();
- } else
+ }
+ else
{
return _messageHandle.isPersistent(getStoreContext(), _messageId);
}
@@ -720,29 +714,27 @@ public class AMQMessage implements StorableMessage
* @throws NoConsumersException if the message is marked for immediate delivery but has not been marked as delivered
* to a consumer
*/
- public void checkDeliveredToConsumer()
- throws
- NoConsumersException
+ public void checkDeliveredToConsumer() throws NoConsumersException
{
if (_immediate && !_deliveredToConsumer)
{
- throw new NoConsumersException(this);
+ throw new NoConsumersException(this, null);
}
}
- public MessagePublishInfo getMessagePublishInfo()
- throws
- AMQException
+ public MessagePublishInfo getMessagePublishInfo() throws AMQException
{
MessagePublishInfo pb;
if (_transientMessageData != null)
{
pb = _transientMessageData.getMessagePublishInfo();
- } else
+ }
+ else
{
pb = _messageHandle.getMessagePublishInfo(getStoreContext(), _messageId);
}
+
return pb;
}
@@ -773,7 +765,7 @@ public class AMQMessage implements StorableMessage
*/
public boolean expired(StoreContext storecontext, AMQQueue queue) throws AMQException
{
- //note: If the storecontext isn't need then we can remove the getChannel() from Subscription.
+ // note: If the storecontext isn't need then we can remove the getChannel() from Subscription.
if (_expiration != 0L)
{
@@ -782,6 +774,7 @@ public class AMQMessage implements StorableMessage
if (now > _expiration)
{
dequeue(storecontext, queue);
+
return true;
}
}
@@ -795,9 +788,7 @@ public class AMQMessage implements StorableMessage
_deliveredToConsumer = true;
}
- private void deliver(StoreContext storeContext)
- throws
- AMQException
+ private void deliver(StoreContext storeContext) throws AMQException
{
// we get a reference to the destination queues now so that we can clear the
// transient message data as quickly as possible
@@ -806,12 +797,13 @@ public class AMQMessage implements StorableMessage
{
_log.debug("Delivering message " + debugIdentity() + " to " + destinationQueues);
}
+
try
{
// first we allow the handle to know that the message has been fully received. This is useful if it is
// maintaining any calculated values based on content chunks
- _messageHandle.setPublishAndContentHeaderBody(storeContext, _messageId, _transientMessageData.getMessagePublishInfo(),
- _transientMessageData.getContentHeaderBody());
+ _messageHandle.setPublishAndContentHeaderBody(storeContext, _messageId,
+ _transientMessageData.getMessagePublishInfo(), _transientMessageData.getContentHeaderBody());
// we then allow the transactional context to do something with the message content
// now that it has all been received, before we attempt delivery
@@ -821,9 +813,9 @@ public class AMQMessage implements StorableMessage
for (AMQQueue q : destinationQueues)
{
- //Increment the references to this message for each queue delivery.
+ // Increment the references to this message for each queue delivery.
incrementReference();
- //normal deliver so add this message at the end.
+ // normal deliver so add this message at the end.
_txnContext.deliver(this, q, false);
}
}
@@ -835,182 +827,181 @@ public class AMQMessage implements StorableMessage
}
}
-/*
- public void writeDeliver(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag)
- throws AMQException
- {
- ByteBuffer deliver = createEncodedDeliverFrame(protocolSession, channelId, deliveryTag, consumerTag);
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- getContentHeaderBody());
-
- final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId);
- if (bodyCount == 0)
+ /*
+ public void writeDeliver(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag)
+ throws AMQException
{
- SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
- contentHeader);
+ ByteBuffer deliver = createEncodedDeliverFrame(protocolSession, channelId, deliveryTag, consumerTag);
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ getContentHeaderBody());
+
+ final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId);
+ if (bodyCount == 0)
+ {
+ SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
+ contentHeader);
+
+ protocolSession.writeFrame(compositeBlock);
+ }
+ else
+ {
+
+ //
+ // Optimise the case where we have a single content body. In that case we create a composite block
+ // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+ //
+ ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0);
+
+ AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
+ AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ protocolSession.writeFrame(compositeBlock);
+
+ //
+ // Now start writing out the other content bodies
+ //
+ for (int i = 1; i < bodyCount; i++)
+ {
+ cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i);
+ protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+ }
+
+
+ }
+
- protocolSession.writeFrame(compositeBlock);
}
- else
+
+ public void writeGetOk(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize) throws AMQException
{
+ ByteBuffer deliver = createEncodedGetOkFrame(protocolSession, channelId, deliveryTag, queueSize);
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ getContentHeaderBody());
- //
- // Optimise the case where we have a single content body. In that case we create a composite block
- // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
- //
- ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0);
+ final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId);
+ if (bodyCount == 0)
+ {
+ SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
+ contentHeader);
+ protocolSession.writeFrame(compositeBlock);
+ }
+ else
+ {
+
+ //
+ // Optimise the case where we have a single content body. In that case we create a composite block
+ // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+ //
+ ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0);
+
+ AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
+ AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ protocolSession.writeFrame(compositeBlock);
+
+ //
+ // Now start writing out the other content bodies
+ //
+ for (int i = 1; i < bodyCount; i++)
+ {
+ cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i);
+ protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+ }
- AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
- protocolSession.writeFrame(compositeBlock);
- //
- // Now start writing out the other content bodies
- //
- for (int i = 1; i < bodyCount; i++)
- {
- cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i);
- protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
}
}
- }
+ private ByteBuffer createEncodedDeliverFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag)
+ throws AMQException
+ {
+ MessagePublishInfo pb = getMessagePublishInfo();
+ AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, protocolSession.getProtocolMajorVersion(), (byte) 0, consumerTag,
+ deliveryTag, pb.getExchange(), _messageHandle.isRedelivered(),
+ pb.getRoutingKey());
+ ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
+ deliverFrame.writePayload(buf);
+ buf.flip();
+ return buf;
+ }
- public void writeGetOk(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize) throws AMQException
- {
- ByteBuffer deliver = createEncodedGetOkFrame(protocolSession, channelId, deliveryTag, queueSize);
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- getContentHeaderBody());
+ private ByteBuffer createEncodedGetOkFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize)
+ throws AMQException
+ {
+ MessagePublishInfo pb = getMessagePublishInfo();
+ AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId,
+ protocolSession.getProtocolMajorVersion(),
+ protocolSession.getProtocolMinorVersion(),
+ deliveryTag, pb.getExchange(),
+ queueSize,
+ _messageHandle.isRedelivered(),
+ pb.getRoutingKey());
+ ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); // XXX: Could cast be a problem?
+ getOkFrame.writePayload(buf);
+ buf.flip();
+ return buf;
+ }
- final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId);
- if (bodyCount == 0)
+ private ByteBuffer createEncodedReturnFrame(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText) throws AMQException
{
- SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
- contentHeader);
- protocolSession.writeFrame(compositeBlock);
+ AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId,
+ protocolSession.getProtocolMajorVersion(),
+ protocolSession.getProtocolMinorVersion(),
+ getMessagePublishInfo().getExchange(),
+ replyCode, replyText,
+ getMessagePublishInfo().getRoutingKey());
+ ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem?
+ returnFrame.writePayload(buf);
+ buf.flip();
+ return buf;
}
- else
+
+ public void writeReturn(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText)
+ throws AMQException
{
+ ByteBuffer returnFrame = createEncodedReturnFrame(protocolSession, channelId, replyCode, replyText);
+
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ getContentHeaderBody());
+ Iterator<AMQDataBlock> bodyFrameIterator = getBodyFrameIterator(protocolSession, channelId);
//
// Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
- ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0);
-
- AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
- protocolSession.writeFrame(compositeBlock);
+ if (bodyFrameIterator.hasNext())
+ {
+ AMQDataBlock firstContentBody = bodyFrameIterator.next();
+ AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent);
+ protocolSession.writeFrame(compositeBlock);
+ }
+ else
+ {
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame,
+ new AMQDataBlock[]{contentHeader});
+ protocolSession.writeFrame(compositeBlock);
+ }
//
// Now start writing out the other content bodies
+ // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
//
- for (int i = 1; i < bodyCount; i++)
+ while (bodyFrameIterator.hasNext())
{
- cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i);
- protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+ protocolSession.writeFrame(bodyFrameIterator.next());
}
-
-
}
-
-
- }
-
-
- private ByteBuffer createEncodedDeliverFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag)
- throws AMQException
- {
- MessagePublishInfo pb = getMessagePublishInfo();
- AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, protocolSession.getProtocolMajorVersion(), (byte) 0, consumerTag,
- deliveryTag, pb.getExchange(), _messageHandle.isRedelivered(),
- pb.getRoutingKey());
- ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
- deliverFrame.writePayload(buf);
- buf.flip();
- return buf;
- }
-
- private ByteBuffer createEncodedGetOkFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize)
- throws AMQException
- {
- MessagePublishInfo pb = getMessagePublishInfo();
- AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId,
- protocolSession.getProtocolMajorVersion(),
- protocolSession.getProtocolMinorVersion(),
- deliveryTag, pb.getExchange(),
- queueSize,
- _messageHandle.isRedelivered(),
- pb.getRoutingKey());
- ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); // XXX: Could cast be a problem?
- getOkFrame.writePayload(buf);
- buf.flip();
- return buf;
- }
-
- private ByteBuffer createEncodedReturnFrame(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText) throws AMQException
- {
- AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId,
- protocolSession.getProtocolMajorVersion(),
- protocolSession.getProtocolMinorVersion(),
- getMessagePublishInfo().getExchange(),
- replyCode, replyText,
- getMessagePublishInfo().getRoutingKey());
- ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem?
- returnFrame.writePayload(buf);
- buf.flip();
- return buf;
- }
-
- public void writeReturn(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText)
- throws AMQException
- {
- ByteBuffer returnFrame = createEncodedReturnFrame(protocolSession, channelId, replyCode, replyText);
-
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- getContentHeaderBody());
-
- Iterator<AMQDataBlock> bodyFrameIterator = getBodyFrameIterator(protocolSession, channelId);
- //
- // Optimise the case where we have a single content body. In that case we create a composite block
- // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
- //
- if (bodyFrameIterator.hasNext())
- {
- AMQDataBlock firstContentBody = bodyFrameIterator.next();
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent);
- protocolSession.writeFrame(compositeBlock);
- }
- else
- {
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame,
- new AMQDataBlock[]{contentHeader});
- protocolSession.writeFrame(compositeBlock);
- }
-
- //
- // Now start writing out the other content bodies
- // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
- //
- while (bodyFrameIterator.hasNext())
- {
- protocolSession.writeFrame(bodyFrameIterator.next());
- }
- }
-*/
+ */
public AMQMessageHandle getMessageHandle()
{
return _messageHandle;
}
-
public long getSize()
{
try
@@ -1022,15 +1013,13 @@ public class AMQMessage implements StorableMessage
catch (AMQException e)
{
_log.error(e.toString(), e);
+
return 0;
}
}
-
- public void restoreTransientMessageData()
- throws
- AMQException
+ public void restoreTransientMessageData() throws AMQException
{
TransientMessageData transientMessageData = new TransientMessageData();
transientMessageData.setMessagePublishInfo(getMessagePublishInfo());
@@ -1039,25 +1028,23 @@ public class AMQMessage implements StorableMessage
_transientMessageData = transientMessageData;
}
-
public void clearTransientMessageData()
{
_transientMessageData = null;
}
-
public String toString()
{
-// return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
-// _taken + " by :" + _takenBySubcription;
+ // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
+ // _taken + " by :" + _takenBySubcription;
- return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " +
- _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
+ return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: "
+ + _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
}
public Subscription getDeliveredSubscription(AMQQueue queue)
{
-// return _takenBySubcription;
+ // return _takenBySubcription;
synchronized (this)
{
return _takenBySubcriptionMap.get(queue);
@@ -1074,7 +1061,8 @@ public class AMQMessage implements StorableMessage
}
_rejectedBy.add(subscription);
- } else
+ }
+ else
{
_log.warn("Requesting rejection by null subscriber:" + debugIdentity());
}
@@ -1084,10 +1072,11 @@ public class AMQMessage implements StorableMessage
{
boolean rejected = _rejectedBy != null;
- if (rejected) // We have subscriptions that rejected this message
+ if (rejected) // We have subscriptions that rejected this message
{
return _rejectedBy.contains(subscription);
- } else // This messasge hasn't been rejected yet.
+ }
+ else // This messasge hasn't been rejected yet.
{
return rejected;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index a17cbb87ff..a803ef1227 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -21,9 +21,9 @@
package org.apache.qpid.server.queue;
import java.text.MessageFormat;
-import java.util.List;
-import java.util.Hashtable;
import java.util.Collection;
+import java.util.Hashtable;
+import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -40,11 +40,11 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.exception.InternalErrorException;
-import org.apache.qpid.server.messageStore.StorableMessage;
-import org.apache.qpid.server.messageStore.StorableQueue;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.messageStore.StorableMessage;
+import org.apache.qpid.server.messageStore.StorableQueue;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -55,49 +55,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
*/
public class AMQQueue implements Managable, Comparable, StorableQueue
{
- /**
- * ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription
- * already exists.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Represent failure to create a subscription, because an exclusive subscription already exists.
- * </table>
- *
- * @todo Not an AMQP exception as no status code.
- *
- * @todo Move to top level, used outside this class.
- */
- public static int s_queueID =0;
- public static final class ExistingExclusiveSubscription extends AMQException
- {
-
- public ExistingExclusiveSubscription()
- {
- super("");
- }
- }
-
- /**
- * ExistingSubscriptionPreventsExclusive signals a failure to create an exclusize subscription, as a subscription
- * already exists.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Represent failure to create an exclusize subscription, as a subscription already exists.
- * </table>
- *
- * @todo Not an AMQP exception as no status code.
- *
- * @todo Move to top level, used outside this class.
- */
- public static final class ExistingSubscriptionPreventsExclusive extends AMQException
- {
- public ExistingSubscriptionPreventsExclusive()
- {
- super("");
- }
- }
+ public static int s_queueID = 0;
private static final Logger _logger = Logger.getLogger(AMQQueue.class);
@@ -108,7 +66,6 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
// The list of enqueued messages.
Hashtable<Long, StorableMessage> _messages = new Hashtable<Long, StorableMessage>();
-
/**
* null means shared
*/
@@ -236,12 +193,10 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
_subscribers = subscribers;
_subscriptionFactory = subscriptionFactory;
_deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
- _queueId = s_queueID++;
+ _queueId = s_queueID++;
}
- private AMQQueueMBean createMBean()
- throws
- AMQException
+ private AMQQueueMBean createMBean() throws AMQException
{
try
{
@@ -249,7 +204,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
}
catch (JMException ex)
{
- throw new AMQException("AMQQueue MBean creation has failed ", ex);
+ throw new AMQException(null, "AMQQueue MBean creation has failed ", ex);
}
}
@@ -443,9 +398,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
/**
* Removes the AMQMessage from the top of the queue.
*/
- public synchronized void deleteMessageFromTop(StoreContext storeContext)
- throws
- AMQException
+ public synchronized void deleteMessageFromTop(StoreContext storeContext) throws AMQException
{
_deliveryMgr.removeAMessageFromTop(storeContext);
}
@@ -453,16 +406,12 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
/**
* removes all the messages from the queue.
*/
- public synchronized long clearQueue(StoreContext storeContext)
- throws
- AMQException
+ public synchronized long clearQueue(StoreContext storeContext) throws AMQException
{
return _deliveryMgr.clearAllMessages(storeContext);
}
- public void bind(AMQShortString routingKey, FieldTable arguments, Exchange exchange)
- throws
- AMQException
+ public void bind(AMQShortString routingKey, FieldTable arguments, Exchange exchange) throws AMQException
{
exchange.registerQueue(routingKey, this, arguments);
if (isDurable() && exchange.isDurable())
@@ -470,18 +419,17 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
try
{
_virtualHost.getMessageStore().bindQueue(exchange, routingKey, this, arguments);
- } catch (InternalErrorException e)
+ }
+ catch (InternalErrorException e)
{
- throw new AMQException("Problem binding queue ", e);
+ throw new AMQException(null, "Problem binding queue ", e);
}
}
_bindings.addBinding(routingKey, arguments, exchange);
}
- public void unBind(AMQShortString routingKey, FieldTable arguments, Exchange exchange)
- throws
- AMQException
+ public void unBind(AMQShortString routingKey, FieldTable arguments, Exchange exchange) throws AMQException
{
exchange.deregisterQueue(routingKey, this, arguments);
if (isDurable() && exchange.isDurable())
@@ -489,9 +437,10 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
try
{
_virtualHost.getMessageStore().unbindQueue(exchange, routingKey, this, arguments);
- } catch (InternalErrorException e)
+ }
+ catch (InternalErrorException e)
{
- throw new AMQException("problem unbinding queue", e);
+ throw new AMQException(null, "problem unbinding queue", e);
}
}
@@ -506,14 +455,16 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
if (isExclusive())
{
decrementSubscriberCount();
- throw new ExistingExclusiveSubscription();
- } else if (exclusive)
+ throw new ExistingExclusiveSubscriptionException();
+ }
+ else if (exclusive)
{
decrementSubscriberCount();
- throw new ExistingSubscriptionPreventsExclusive();
+ throw new ExistingSubscriptionPreventsExclusiveException();
}
- } else if (exclusive)
+ }
+ else if (exclusive)
{
setExclusive(true);
}
@@ -559,9 +510,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
return _subscriberCount.decrementAndGet();
}
- public void unregisterProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag)
- throws
- AMQException
+ public void unregisterProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag) throws AMQException
{
if (_logger.isDebugEnabled())
{
@@ -576,8 +525,8 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
_subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel, ps, consumerTag)))
== null)
{
- throw new AMQException("Protocol session with channel " + channel + " and consumer tag " + consumerTag
- + " and protocol session key " + ps.getKey() + " not registered with queue " + this);
+ throw new AMQException(null, "Protocol session with channel " + channel + " and consumer tag " + consumerTag
+ + " and protocol session key " + ps.getKey() + " not registered with queue " + this, null);
}
removedSubscription.close();
@@ -609,21 +558,21 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
return !_deliveryMgr.hasQueuedMessages();
}
- public int delete(boolean checkUnused, boolean checkEmpty)
- throws
- AMQException
+ public int delete(boolean checkUnused, boolean checkEmpty) throws AMQException
{
if (checkUnused && !_subscribers.isEmpty())
{
_logger.info("Will not delete " + this + " as it is in use.");
return 0;
- } else if (checkEmpty && _deliveryMgr.hasQueuedMessages())
+ }
+ else if (checkEmpty && _deliveryMgr.hasQueuedMessages())
{
_logger.info("Will not delete " + this + " as it is not empty.");
return 0;
- } else
+ }
+ else
{
delete();
@@ -631,9 +580,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
}
}
- public void delete()
- throws
- AMQException
+ public void delete() throws AMQException
{
if (!_deleted.getAndSet(true))
{
@@ -650,9 +597,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
}
}
- protected void autodelete()
- throws
- AMQException
+ protected void autodelete() throws AMQException
{
if (_logger.isDebugEnabled())
{
@@ -662,9 +607,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
delete();
}
- public void processGet(StoreContext storeContext, AMQMessage msg, boolean deliverFirst)
- throws
- AMQException
+ public void processGet(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException
{
// fixme not sure what this is doing. should we be passing deliverFirst through here?
// This code is not used so when it is perhaps it should
@@ -687,9 +630,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
// return _deliveryMgr;
// }
- public void process(StoreContext storeContext, AMQMessage msg, boolean deliverFirst)
- throws
- AMQException
+ public void process(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException
{
_deliveryMgr.deliver(storeContext, getName(), msg, deliverFirst);
try
@@ -705,9 +646,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
}
}
- void dequeue(StoreContext storeContext, AMQMessage msg)
- throws
- FailedDequeueException
+ void dequeue(StoreContext storeContext, AMQMessage msg) throws FailedDequeueException
{
try
{
@@ -737,9 +676,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
return _subscribers;
}
- protected void updateReceivedMessageCount(AMQMessage msg)
- throws
- AMQException
+ protected void updateReceivedMessageCount(AMQMessage msg) throws AMQException
{
if (!msg.isRedelivered())
{
@@ -752,7 +689,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
}
catch (JMException e)
{
- throw new AMQException("Unable to get notification from manage queue: " + e, e);
+ throw new AMQException(null, "Unable to get notification from manage queue: " + e, e);
}
}
@@ -783,9 +720,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
return "Queue(" + _name + ")@" + System.identityHashCode(this);
}
- public boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks)
- throws
- AMQException
+ public boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) throws AMQException
{
return _deliveryMgr.performGet(session, channel, acks);
}
@@ -802,9 +737,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
public static interface Task
{
- public void doTask(AMQQueue queue)
- throws
- AMQException;
+ public void doTask(AMQQueue queue) throws AMQException;
}
public void addQueueDeleteTask(Task task)
@@ -837,9 +770,9 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
_deliveryMgr.subscriberHasPendingResend(hasContent, subscription, msg);
}
- //========================================================================
+ // ========================================================================
// Interface StorableQueue
- //========================================================================
+ // ========================================================================
public int getQueueID()
{
@@ -861,9 +794,9 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
_messages.put(m.getMessageId(), m);
}
- //========================================================================
+ // ========================================================================
// Used by the Store
- //========================================================================
+ // ========================================================================
/**
* Get the list of enqueud messages
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ExistingExclusiveSubscriptionException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ExistingExclusiveSubscriptionException.java
new file mode 100644
index 0000000000..a5ff9e6326
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ExistingExclusiveSubscriptionException.java
@@ -0,0 +1,22 @@
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+
+/**
+ * ExistingExclusiveSubscriptionException signals a failure to create a subscription, because an exclusive subscription
+ * already exists.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represent failure to create a subscription, because an exclusive subscription already exists.
+ * </table>
+ *
+ * @todo Not an AMQP exception as no status code.
+ */
+public final class ExistingExclusiveSubscriptionException extends AMQException
+{
+ public ExistingExclusiveSubscriptionException()
+ {
+ super(null, "", null);
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ExistingSubscriptionPreventsExclusiveException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ExistingSubscriptionPreventsExclusiveException.java
new file mode 100644
index 0000000000..a13686eb56
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ExistingSubscriptionPreventsExclusiveException.java
@@ -0,0 +1,22 @@
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+
+/**
+ * ExistingSubscriptionPreventsExclusiveException signals a failure to create an exclusize subscription, as a subscription
+ * already exists.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represent failure to create an exclusize subscription, as a subscription already exists.
+ * </table>
+ *
+ * @todo Not an AMQP exception as no status code.
+ */
+public final class ExistingSubscriptionPreventsExclusiveException extends AMQException
+{
+ public ExistingSubscriptionPreventsExclusiveException()
+ {
+ super(null, "", null);
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java
index 6466e81dd2..d5c34152a8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java
@@ -38,13 +38,8 @@ import org.apache.qpid.AMQException;
*/
public class FailedDequeueException extends AMQException
{
- public FailedDequeueException(String queue)
+ public FailedDequeueException(String queue, Throwable cause)
{
- super("Failed to dequeue message from " + queue);
- }
-
- public FailedDequeueException(String queue, AMQException e)
- {
- super("Failed to dequeue message from " + queue, e);
+ super(null, "Failed to dequeue message from " + queue, cause);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java
index 090096d3c3..2fdd2791b1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java
@@ -40,13 +40,8 @@ import org.apache.qpid.AMQException;
*/
public class MessageCleanupException extends AMQException
{
- public MessageCleanupException(long messageId, AMQException e)
+ public MessageCleanupException(String message, Throwable cause)
{
- super("Failed to cleanup message with id " + messageId, e);
- }
-
- public MessageCleanupException(String message)
- {
- super(message);
+ super(null, message, cause);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java
index d6fd1eec89..afcdf062de 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java
@@ -35,9 +35,9 @@ import org.apache.qpid.server.RequiredDeliveryException;
*/
public class NoConsumersException extends RequiredDeliveryException
{
- public NoConsumersException(AMQMessage message)
+ public NoConsumersException(AMQMessage message, Throwable cause)
{
- super("Immediate delivery is not possible.", message);
+ super("Immediate delivery is not possible.", message, cause);
}
public AMQConstant getReplyCode()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java
index 5539627820..560549c126 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java
@@ -175,7 +175,7 @@ public class StorableMessageHandle implements AMQMessageHandle
}
} catch (Exception e)
{
- throw new AMQException("PRoblem during message enqueue", e);
+ throw new AMQException(null, "PRoblem during message enqueue", e);
}
}
@@ -191,7 +191,7 @@ public class StorableMessageHandle implements AMQMessageHandle
}
} catch (Exception e)
{
- throw new AMQException("PRoblem during message dequeue", e);
+ throw new AMQException(null, "PRoblem during message dequeue", e);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index a7be9f2ad2..1cebf08fa6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -108,7 +108,7 @@ public class SubscriptionImpl implements Subscription
AMQChannel channel = protocolSession.getChannel(channelId);
if (channel == null)
{
- throw new AMQException(AMQConstant.NOT_FOUND, "channel :" + channelId + " not found in protocol session");
+ throw new AMQException(AMQConstant.NOT_FOUND, "channel :" + channelId + " not found in protocol session", null);
}
this.channel = channel;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/IllegalStateTransitionException.java b/java/broker/src/main/java/org/apache/qpid/server/state/IllegalStateTransitionException.java
deleted file mode 100644
index cec67a8a6d..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/state/IllegalStateTransitionException.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.state;
-
-import org.apache.qpid.AMQException;
-
-/**
- * @todo Not an AMQP exception as no status code.
- *
- * @todo Not used! Delete.
- */
-public class IllegalStateTransitionException extends AMQException
-{
- private AMQState _originalState;
-
- private Class _frame;
-
- public IllegalStateTransitionException(AMQState originalState, Class frame)
- {
- super("No valid state transition defined for receiving frame " + frame + " from state " + originalState);
- _originalState = originalState;
- _frame = frame;
- }
-
- public AMQState getOriginalState()
- {
- return _originalState;
- }
-
- public Class getFrameClass()
- {
- return _frame;
- }
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java
index 05756a8c23..6c001485b9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java
@@ -87,7 +87,7 @@ public class DistributedTransactionalContext implements TransactionalContext
_storeContext.setPayload(xid);
} catch (Exception e)
{
- throw new AMQException("Problem during transaction begin", e);
+ throw new AMQException(null, "Problem during transaction begin", e);
}
}
}
@@ -105,7 +105,7 @@ public class DistributedTransactionalContext implements TransactionalContext
}
} catch (Exception e)
{
- throw new AMQException("Problem during transaction commit", e);
+ throw new AMQException(null, "Problem during transaction commit", e);
}
finally
{
@@ -125,7 +125,7 @@ public class DistributedTransactionalContext implements TransactionalContext
}
} catch (Exception e)
{
- throw new AMQException("Problem during transaction rollback", e);
+ throw new AMQException(null, "Problem during transaction rollback", e);
}
finally
{
@@ -152,7 +152,7 @@ public class DistributedTransactionalContext implements TransactionalContext
_transactionManager.getTransaction((Xid) _storeContext.getPayload()).addRecord(new EnqueueRecord(_storeContext, message, queue, deliverFirst));
} catch (Exception e)
{
- throw new AMQException("Problem during transaction rollback", e);
+ throw new AMQException(null, "Problem during transaction rollback", e);
}
}
@@ -196,7 +196,7 @@ public class DistributedTransactionalContext implements TransactionalContext
{
if (!unacknowledgedMessageMap.contains(deliveryTag))
{
- throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel");
+ throw new AMQException(null, "Multiple ack on delivery tag " + deliveryTag + " not known for channel", null);
}
LinkedList<UnacknowledgedMessage> acked = new LinkedList<UnacknowledgedMessage>();
@@ -219,7 +219,7 @@ public class DistributedTransactionalContext implements TransactionalContext
if (msg == null)
{
_log.info("Single ack on delivery tag " + deliveryTag);
- throw new AMQException("Single ack on delivery tag " + deliveryTag);
+ throw new AMQException(null, "Single ack on delivery tag " + deliveryTag, null);
}
if (_log.isDebugEnabled())
@@ -250,7 +250,7 @@ public class DistributedTransactionalContext implements TransactionalContext
_transactionManager.getTransaction((Xid) _storeContext.getPayload()).addRecord(new DequeueRecord());
} catch (Exception e)
{
- throw new AMQException("Problem during message dequeue", e);
+ throw new AMQException(null, "Problem during message dequeue", e);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
index 6d776eec0f..93459beb45 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
@@ -126,7 +126,7 @@ public class LocalTransactionalContext implements TransactionalContext
{
if (!unacknowledgedMessageMap.contains(deliveryTag))
{
- throw new AMQException("Ack with delivery tag " + deliveryTag + " not known for channel");
+ throw new AMQException(null, "Ack with delivery tag " + deliveryTag + " not known for channel", null);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
index 496c94dae9..addb0c791f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
@@ -149,7 +149,7 @@ public class NonTransactionalContext implements TransactionalContext
{
if (!unacknowledgedMessageMap.contains(deliveryTag))
{
- throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel");
+ throw new AMQException(null, "Multiple ack on delivery tag " + deliveryTag + " not known for channel", null);
}
LinkedList<UnacknowledgedMessage> acked = new LinkedList<UnacknowledgedMessage>();
@@ -182,8 +182,8 @@ public class NonTransactionalContext implements TransactionalContext
{
_log.info("Single ack on delivery tag " + deliveryTag + " not known for channel:" +
_channel.getChannelId());
- throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" +
- _channel.getChannelId());
+ throw new AMQException(null, "Single ack on delivery tag " + deliveryTag + " not known for channel:" +
+ _channel.getChannelId(), null);
}
if (!_browsedAcks.contains(deliveryTag))