summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRupert Smith <rupertlssmith@apache.org>2007-05-21 15:11:23 +0000
committerRupert Smith <rupertlssmith@apache.org>2007-05-21 15:11:23 +0000
commit625e140b590838df60d603f42d552a9275aae2ca (patch)
tree6b0acb350f3ead0da52b0301bfee74101d6bb7d4 /java
parent21d2df094acb8530b2fb902b5ed9a1d7db8463fd (diff)
downloadqpid-python-625e140b590838df60d603f42d552a9275aae2ca.tar.gz
Refactored exceptions to have single constructors and made room for wrapped causes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@540165 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/grammar/SelectorParser.jj2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ConsumerTagNotUniqueException.java20
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInUseException.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/NoRouteException.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java14
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java559
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java157
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ExistingExclusiveSubscriptionException.java22
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ExistingSubscriptionPreventsExclusiveException.java22
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/state/IllegalStateTransitionException.java52
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java14
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java6
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java23
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/publisher/MessageFactoryException.java25
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java25
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/shared/ConnectionException.java25
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/shared/ContextException.java25
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java37
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQNoConsumersException.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQNoRouteException.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java10
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java10
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java19
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/UnexpectedBodyReceivedException.java45
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java13
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/IllegalStateTransitionException.java53
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java11
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java12
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java10
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQConnectionWaitException.java5
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedBodyTypeException.java5
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedFrameTypeException.java4
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java2
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java4
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQChannelClosedException.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQChannelException.java26
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQConnectionException.java20
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQDisconnectedException.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQException.java43
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQInvalidArgumentException.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQInvalidRoutingKeyException.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQUndeliveredException.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQUnresolvedAddressException.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java21
-rw-r--r--java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQProtocolClassException.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQProtocolInstanceException.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQProtocolVersionException.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java10
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java2
95 files changed, 631 insertions, 972 deletions
diff --git a/java/broker/src/main/grammar/SelectorParser.jj b/java/broker/src/main/grammar/SelectorParser.jj
index adec1b348d..61638e5e26 100644
--- a/java/broker/src/main/grammar/SelectorParser.jj
+++ b/java/broker/src/main/grammar/SelectorParser.jj
@@ -94,7 +94,7 @@ public class SelectorParser {
return this.JmsSelector();
}
catch (Throwable e) {
- throw (AMQInvalidArgumentException)new AMQInvalidArgumentException(sql).initCause(e);
+ throw new AMQInvalidArgumentException(sql, e);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 1de4d16ad4..5de59f47a3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -212,7 +212,7 @@ public class AMQChannel
{
if (_currentMessage == null)
{
- throw new AMQException("Received content header without previously receiving a BasicPublish frame");
+ throw new AMQException(null, "Received content header without previously receiving a BasicPublish frame", null);
}
else
{
@@ -239,7 +239,7 @@ public class AMQChannel
{
if (_currentMessage == null)
{
- throw new AMQException("Received content body without previously receiving a JmsPublishBody");
+ throw new AMQException(null, "Received content body without previously receiving a JmsPublishBody", null);
}
if (_log.isTraceEnabled())
@@ -883,7 +883,7 @@ public class AMQChannel
{
if (!isTransactional())
{
- throw new AMQException("Fatal error: commit called on non-transactional channel");
+ throw new AMQException(null, "Fatal error: commit called on non-transactional channel", null);
}
_txnContext.commit();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ConsumerTagNotUniqueException.java b/java/broker/src/main/java/org/apache/qpid/server/ConsumerTagNotUniqueException.java
index 9a98af5689..3253650d14 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ConsumerTagNotUniqueException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ConsumerTagNotUniqueException.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,6 +20,16 @@
*/
package org.apache.qpid.server;
-public class ConsumerTagNotUniqueException extends Exception
-{
-}
+/**
+ * ConsumerTagNotUniqueException indicates that a client has attempted to connect with a consumer tag that is already
+ * used.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represents error when clients connects with a non-unique tag.
+ * </table>
+ *
+ * @todo Consider replacing with an AMQNotAllowedException, as this is the status code returned when this happens.
+ */
+public class ConsumerTagNotUniqueException extends Exception
+{ }
diff --git a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
index d61bb8916a..37c5f38ea3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
@@ -41,9 +41,9 @@ public abstract class RequiredDeliveryException extends AMQException
{
private final AMQMessage _amqMessage;
- public RequiredDeliveryException(String message, AMQMessage payload)
+ public RequiredDeliveryException(String message, AMQMessage payload, Throwable cause)
{
- super(message);
+ super(null, message, cause);
// Increment the reference as this message is in the routing phase
// and so will have the ref decremented as routing fails.
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
index 30bbdea2ef..1604d94539 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
@@ -181,8 +181,8 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
if (unacked.getKey() > deliveryTag)
{
//This should not occur now.
- throw new AMQException("UnacknowledgedMessageMap is out of order:" + unacked.getKey() +
- " When deliveryTag is:" + deliveryTag + "ES:" + _map.entrySet().toString());
+ throw new AMQException(null, "UnacknowledgedMessageMap is out of order:" + unacked.getKey() +
+ " When deliveryTag is:" + deliveryTag + "ES:" + _map.entrySet().toString(), null);
}
it.remove();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
index c349b44d6d..39a5bba8b7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
@@ -55,7 +55,7 @@ public class DefaultExchangeFactory implements ExchangeFactory
if (exchClass == null)
{
- throw new AMQUnknownExchangeType("Unknown exchange type: " + type);
+ throw new AMQUnknownExchangeType("Unknown exchange type: " + type, null);
}
try
{
@@ -65,11 +65,11 @@ public class DefaultExchangeFactory implements ExchangeFactory
}
catch (InstantiationException e)
{
- throw new AMQException("Unable to create exchange: " + e, e);
+ throw new AMQException(null, "Unable to create exchange: " + e, e);
}
catch (IllegalAccessException e)
{
- throw new AMQException("Unable to create exchange: " + e, e);
+ throw new AMQException(null, "Unable to create exchange: " + e, e);
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
index 9066af70d9..f3bdecc32e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
@@ -71,7 +71,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
getMessageStore().createExchange(exchange);
} catch (InternalErrorException e)
{
- throw new AMQException("problem registering excahgne " + exchange, e);
+ throw new AMQException(null, "problem registering excahgne " + exchange, e);
}
}
}
@@ -99,14 +99,14 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
getMessageStore().removeExchange(e);
} catch (InternalErrorException e1)
{
- throw new AMQException("Problem unregistering Exchange " + name, e1);
+ throw new AMQException(null, "Problem unregistering Exchange " + name, e1);
}
}
e.close();
}
else
{
- throw new AMQException("Unknown exchange " + name);
+ throw new AMQException(null, "Unknown exchange " + name, null);
}
}
@@ -138,7 +138,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
// TODO: check where the exchange is validated
if (exch == null)
{
- throw new AMQException("Exchange '" + exchange + "' does not exist");
+ throw new AMQException(null, "Exchange '" + exchange + "' does not exist", null);
}
exch.route(payload);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
index ab103fbd2a..01242f90de 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
@@ -126,7 +126,7 @@ public class DestNameExchange extends AbstractExchange
catch (JMException ex)
{
_logger.error("Exception occured in creating the direct exchange mbean", ex);
- throw new AMQException("Exception occured in creating the direct exchange mbean", ex);
+ throw new AMQException(null, "Exception occured in creating the direct exchange mbean", ex);
}
}
@@ -156,8 +156,8 @@ public class DestNameExchange extends AbstractExchange
if (!_index.remove(routingKey, queue))
{
- throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() +
- " with routing key " + routingKey + ". No queue was registered with that routing key");
+ throw new AMQException(null, "Queue " + queue + " was not registered with exchange " + this.getName() +
+ " with routing key " + routingKey + ". No queue was registered with that routing key", null);
}
}
@@ -171,7 +171,7 @@ public class DestNameExchange extends AbstractExchange
String msg = "Routing key " + routingKey + " is not known to " + this;
if (info.isMandatory())
{
- throw new NoRouteException(msg, payload);
+ throw new NoRouteException(msg, payload, null);
}
else
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
index e9c5b0024c..25ec0c3a2d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
@@ -216,7 +216,7 @@ public class DestWildExchange extends AbstractExchange
if (info.isMandatory())
{
String msg = "Topic " + routingKey + " is not known to " + this;
- throw new NoRouteException(msg, payload);
+ throw new NoRouteException(msg, payload, null);
}
else
{
@@ -276,15 +276,15 @@ public class DestWildExchange extends AbstractExchange
List<AMQQueue> queues = _routingKey2queues.get(routingKey);
if (queues == null)
{
- throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() +
- " with routing key " + routingKey + ". No queue was registered with that routing key");
+ throw new AMQException(null, "Queue " + queue + " was not registered with exchange " + this.getName() +
+ " with routing key " + routingKey + ". No queue was registered with that routing key", null);
}
boolean removedQ = queues.remove(queue);
if (!removedQ)
{
- throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() +
- " with routing key " + routingKey);
+ throw new AMQException(null, "Queue " + queue + " was not registered with exchange " + this.getName() +
+ " with routing key " + routingKey, null);
}
if (queues.isEmpty())
{
@@ -301,7 +301,7 @@ public class DestWildExchange extends AbstractExchange
catch (JMException ex)
{
_logger.error("Exception occured in creating the topic exchenge mbean", ex);
- throw new AMQException("Exception occured in creating the topic exchenge mbean", ex);
+ throw new AMQException(null, "Exception occured in creating the topic exchenge mbean", ex);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInUseException.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInUseException.java
index c77f114428..07550dd808 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInUseException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInUseException.java
@@ -38,8 +38,8 @@ import org.apache.qpid.AMQException;
*/
public class ExchangeInUseException extends AMQException
{
- public ExchangeInUseException(String exchangeName)
+ public ExchangeInUseException(String exchangeName, Throwable cause)
{
- super("Exchange " + exchangeName + " is currently in use");
+ super(null, "Exchange " + exchangeName + " is currently in use", cause);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
index b3690d3e10..28d4b19f2e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
@@ -98,7 +98,7 @@ public class FanoutExchange extends AbstractExchange
catch (JMException ex)
{
_logger.error("Exception occured in creating the direct exchange mbean", ex);
- throw new AMQException("Exception occured in creating the direct exchange mbean", ex);
+ throw new AMQException(null, "Exception occured in creating the direct exchange mbean", ex);
}
}
@@ -129,8 +129,8 @@ public class FanoutExchange extends AbstractExchange
if (!_queues.remove(queue))
{
- throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() +
- ". ");
+ throw new AMQException(null, "Queue " + queue + " was not registered with exchange " + this.getName() +
+ ". ", null);
}
}
@@ -143,7 +143,7 @@ public class FanoutExchange extends AbstractExchange
String msg = "No queues bound to " + this;
if (publishInfo.isMandatory())
{
- throw new NoRouteException(msg, payload);
+ throw new NoRouteException(msg, payload, null);
}
else
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
index b4b2bc20bc..8205924207 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
@@ -231,7 +231,7 @@ public class HeadersExchange extends AbstractExchange
if (payload.getMessagePublishInfo().isMandatory())
{
- throw new NoRouteException(msg, payload);
+ throw new NoRouteException(msg, payload, null);
}
else
{
@@ -284,7 +284,7 @@ public class HeadersExchange extends AbstractExchange
catch (JMException ex)
{
_logger.error("Exception occured in creating the HeadersExchangeMBean", ex);
- throw new AMQException("Exception occured in creating the HeadersExchangeMBean", ex);
+ throw new AMQException(null, "Exception occured in creating the HeadersExchangeMBean", ex);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/NoRouteException.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/NoRouteException.java
index 1d6ab3842d..c787103c00 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/NoRouteException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/NoRouteException.java
@@ -36,9 +36,9 @@ import org.apache.qpid.server.queue.AMQMessage;
*/
public class NoRouteException extends RequiredDeliveryException
{
- public NoRouteException(String msg, AMQMessage message)
+ public NoRouteException(String msg, AMQMessage message, Throwable cause)
{
- super(msg, message);
+ super(msg, message, cause);
}
public AMQConstant getReplyCode()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
index 56eae279dc..9346eecbb2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
@@ -33,6 +33,8 @@ import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.ConsumerTagNotUniqueException;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.ExistingExclusiveSubscriptionException;
+import org.apache.qpid.server.queue.ExistingSubscriptionPreventsExclusiveException;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -146,14 +148,14 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
AMQConstant.NOT_ALLOWED.getCode(), // replyCode
msg)); // replyText
}
- catch (AMQQueue.ExistingExclusiveSubscription e)
+ catch (ExistingExclusiveSubscriptionException e)
{
throw body.getChannelException(AMQConstant.ACCESS_REFUSED,
"Cannot subscribe to queue "
+ queue.getName()
+ " as it already has an existing exclusive consumer");
}
- catch (AMQQueue.ExistingSubscriptionPreventsExclusive e)
+ catch (ExistingSubscriptionPreventsExclusiveException e)
{
throw body.getChannelException(AMQConstant.ACCESS_REFUSED,
"Cannot subscribe to queue "
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
index fef00942a0..43986adea7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
@@ -69,7 +69,7 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener
SaslServer ss = session.getSaslServer();
if (ss == null)
{
- throw new AMQException("No SASL context set up in session");
+ throw new AMQException(null, "No SASL context set up in session", null);
}
AuthenticationResult authResult = authMgr.authenticate(ss, body.response);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
index 29d6c26b66..5dbd1b18de 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
@@ -138,7 +138,7 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener<
catch (SaslException e)
{
disposeSaslServer(session);
- throw new AMQException("SASL error: " + e, e);
+ throw new AMQException(null, "SASL error: " + e, e);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
index 2b123bcb2d..f24c96f87f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
@@ -79,7 +79,7 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
AMQShortString routingKey = body.routingKey;
if (exchangeName == null)
{
- throw new AMQException("Exchange exchange must not be null");
+ throw new AMQException(null, "Exchange exchange must not be null", null);
}
Exchange exchange = virtualHost.getExchangeRegistry().getExchange(exchangeName);
AMQFrame response;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
index be3ffcc698..855d1a2add 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
@@ -96,7 +96,7 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange
else if (!exchange.getType().equals(body.type))
{
- throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + body.exchange + " of type " + exchange.getType() + " to " + body.type +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor());
+ throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + body.exchange + " of type " + exchange.getType() + " to " + body.type +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(), null);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
index ec9041c309..f9e94af697 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
@@ -110,7 +110,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
store.createQueue(queue);
} catch (Exception e)
{
- throw new AMQException("Problem when creating queue " + queue, e);
+ throw new AMQException(null, "Problem when creating queue " + queue, e);
}
}
queueRegistry.registerQueue(queue);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
index cfea4637ab..eb89bf78e5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
@@ -114,7 +114,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB
store.destroyQueue(queue);
} catch (Exception e)
{
- throw new AMQException("problem when destroying queue " + queue, e);
+ throw new AMQException(null, "problem when destroying queue " + queue, e);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java b/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java
index 84526dbc11..31313cf024 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java
@@ -71,7 +71,7 @@ public abstract class DefaultManagedObject extends StandardMBean implements Mana
}
catch (JMException e)
{
- throw new AMQException("Error registering managed object " + this + ": " + e, e);
+ throw new AMQException(null, "Error registering managed object " + this + ": " + e, e);
}
}
@@ -88,7 +88,7 @@ public abstract class DefaultManagedObject extends StandardMBean implements Mana
}
catch (JMException e)
{
- throw new AMQException("Error unregistering managed object: " + this + ": " + e, e);
+ throw new AMQException(null, "Error unregistering managed object: " + this + ": " + e, e);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index d430f1af94..82e969b496 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -168,7 +168,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
catch (JMException ex)
{
_logger.error("AMQProtocolSession MBean creation has failed ", ex);
- throw new AMQException("AMQProtocolSession MBean creation has failed ", ex);
+ throw new AMQException(null, "AMQProtocolSession MBean creation has failed ", ex);
}
}
@@ -199,7 +199,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
}
else
{
- throw new UnknnownMessageTypeException(message);
+ throw new UnknnownMessageTypeException(message, null);
}
}
@@ -321,7 +321,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
}
if (!wasAnyoneInterested)
{
- throw new AMQNoMethodHandlerException(evt);
+ throw new AMQNoMethodHandlerException(evt, null);
}
}
catch (AMQChannelException e)
@@ -425,7 +425,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
AMQChannel channel = getChannel(channelId);
if (channel == null)
{
- throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId);
+ throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId, null);
}
return channel;
}
@@ -454,14 +454,14 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
if (_closed)
{
- throw new AMQException("Session is closed");
+ throw new AMQException(null, "Session is closed", null);
}
final int channelId = channel.getChannelId();
if (_closingChannelsList.contains(channelId))
{
- throw new AMQException("Session is marked awaiting channel close");
+ throw new AMQException(null, "Session is marked awaiting channel close", null);
}
if (_channelMap.size() == _maxNoOfChannels)
@@ -469,7 +469,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
String errorMessage = toString() + ": maximum number of channels has been reached (" +
_maxNoOfChannels + "); can't create channel";
_logger.error(errorMessage);
- throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage);
+ throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage, null);
}
else
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java
index a7599a3e0d..ee6e090e24 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java
@@ -39,8 +39,8 @@ import org.apache.qpid.protocol.AMQMethodEvent;
*/
public class AMQNoMethodHandlerException extends AMQException
{
- public AMQNoMethodHandlerException(AMQMethodEvent<AMQMethodBody> evt)
+ public AMQNoMethodHandlerException(AMQMethodEvent<AMQMethodBody> evt, Throwable cause)
{
- super("AMQMethodEvent " + evt + " was not processed by any listener on Broker.");
+ super(null, "AMQMethodEvent " + evt + " was not processed by any listener on Broker.", cause);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java
index 6e72aa062f..d053884e69 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java
@@ -39,8 +39,8 @@ import org.apache.qpid.framing.AMQDataBlock;
*/
public class UnknnownMessageTypeException extends AMQException
{
- public UnknnownMessageTypeException(AMQDataBlock message)
+ public UnknnownMessageTypeException(AMQDataBlock message, Throwable cause)
{
- super("Unknown message type: " + message.getClass().getName() + ": " + message);
+ super(null, "Unknown message type: " + message.getClass().getName() + ": " + message, cause);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index 6ffe1af018..95f75fdb36 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -20,31 +20,33 @@
*/
package org.apache.qpid.server.queue;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+/** Combines the information that make up a deliverable message into a more manageable form. */
+
+import org.apache.log4j.Logger;
+
+import org.apache.mina.common.ByteBuffer;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.messageStore.MessageStore;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.messageStore.StorableMessage;
import org.apache.qpid.server.messageStore.StorableQueue;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.registry.ApplicationRegistry;
-
-/** Combines the information that make up a deliverable message into a more manageable form. */
-
-import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
-
-import java.util.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.txn.TransactionalContext;
/**
* Combines the information that make up a deliverable message into a more manageable form.
@@ -56,7 +58,7 @@ public class AMQMessage implements StorableMessage
// The ordered list of queues into which this message is enqueued.
private List<StorableQueue> _queues = new LinkedList<StorableQueue>();
// Indicates whether this message is staged
- private boolean _isStaged = false;
+ private boolean _isStaged = false;
/**
* Used in clustering
@@ -89,18 +91,15 @@ public class AMQMessage implements StorableMessage
*/
private boolean _immediate;
- // private Subscription _takenBySubcription;
- // private AtomicBoolean _taken = new AtomicBoolean(false);
+ // private Subscription _takenBySubcription;
+ // private AtomicBoolean _taken = new AtomicBoolean(false);
private TransientMessageData _transientMessageData = new TransientMessageData();
-
private Set<Subscription> _rejectedBy = null;
-
private Map<AMQQueue, AtomicBoolean> _takenMap = new HashMap<AMQQueue, AtomicBoolean>();
private Map<AMQQueue, Subscription> _takenBySubcriptionMap = new HashMap<AMQQueue, Subscription>();
-
private final int hashcode = System.identityHashCode(this);
private long _expiration;
@@ -111,8 +110,10 @@ public class AMQMessage implements StorableMessage
public void setExpiration()
{
- long expiration = ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getExpiration();
- long timestamp = ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getTimestamp();
+ long expiration =
+ ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getExpiration();
+ long timestamp =
+ ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getTimestamp();
if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("advanced.synced-clocks", false))
{
@@ -125,10 +126,10 @@ public class AMQMessage implements StorableMessage
{
if (timestamp != 0L)
{
- //todo perhaps use arrival time
+ // todo perhaps use arrival time
long diff = (System.currentTimeMillis() - timestamp);
- if (diff > 1000L || diff < 1000L)
+ if ((diff > 1000L) || (diff < 1000L))
{
_expiration = expiration + diff;
}
@@ -159,11 +160,12 @@ public class AMQMessage implements StorableMessage
{
try
{
- return _index < _messageHandle.getBodyCount(getStoreContext(), _messageId) - 1;
+ return _index < (_messageHandle.getBodyCount(getStoreContext(), _messageId) - 1);
}
catch (AMQException e)
{
_log.error("Unable to get body count: " + e, e);
+
return false;
}
}
@@ -173,7 +175,10 @@ public class AMQMessage implements StorableMessage
try
{
- AMQBody cb = getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(), _messageId, ++_index));
+ AMQBody cb =
+ getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(),
+ _messageId, ++_index));
+
return new AMQFrame(_channel, cb);
}
catch (AMQException e)
@@ -209,11 +214,12 @@ public class AMQMessage implements StorableMessage
{
try
{
- return _index < _messageHandle.getBodyCount(getStoreContext(), _messageId) - 1;
+ return _index < (_messageHandle.getBodyCount(getStoreContext(), _messageId) - 1);
}
catch (AMQException e)
{
_log.error("Error getting body count: " + e, e);
+
return false;
}
}
@@ -236,8 +242,7 @@ public class AMQMessage implements StorableMessage
}
}
- public AMQMessage(Long messageId, MessagePublishInfo info,
- TransactionalContext txnContext)
+ public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext)
{
_messageId = messageId;
_txnContext = txnContext;
@@ -257,8 +262,7 @@ public class AMQMessage implements StorableMessage
* @throws AMQException
*/
public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory, TransactionalContext txnConext)
- throws
- AMQException
+ throws AMQException
{
_messageId = messageId;
_messageHandle = factory.createMessageHandle(store, this, true);
@@ -274,10 +278,8 @@ public class AMQMessage implements StorableMessage
* @param txnContext
* @param contentHeader
*/
- public AMQMessage(Long messageId, MessagePublishInfo info,
- TransactionalContext txnContext, ContentHeaderBody contentHeader)
- throws
- AMQException
+ public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext,
+ ContentHeaderBody contentHeader) throws AMQException
{
this(messageId, info, txnContext);
setContentHeaderBody(contentHeader);
@@ -294,13 +296,9 @@ public class AMQMessage implements StorableMessage
* @param contentBodies
* @throws AMQException
*/
- public AMQMessage(Long messageId, MessagePublishInfo info,
- TransactionalContext txnContext,
- ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues,
- List<ContentChunk> contentBodies, MessageStore messageStore, StoreContext storeContext,
- MessageHandleFactory messageHandleFactory)
- throws
- AMQException
+ public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext,
+ ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues, List<ContentChunk> contentBodies,
+ MessageStore messageStore, StoreContext storeContext, MessageHandleFactory messageHandleFactory) throws AMQException
{
this(messageId, info, txnContext, contentHeader);
_transientMessageData.setDestinationQueues(destinationQueues);
@@ -311,9 +309,7 @@ public class AMQMessage implements StorableMessage
}
}
- protected AMQMessage(AMQMessage msg)
- throws
- AMQException
+ protected AMQMessage(AMQMessage msg) throws AMQException
{
_messageId = msg._messageId;
_messageHandle = msg._messageHandle;
@@ -322,9 +318,9 @@ public class AMQMessage implements StorableMessage
_transientMessageData = msg._transientMessageData;
}
- //========================================================================
+ // ========================================================================
// Interface StorableMessage
- //========================================================================
+ // ========================================================================
public long getMessageId()
{
@@ -342,10 +338,12 @@ public class AMQMessage implements StorableMessage
result = new byte[headerBody.getSize()];
bufferedResult = ByteBuffer.wrap(result);
headerBody.writePayload(bufferedResult);
- } catch (AMQException e)
+ }
+ catch (AMQException e)
{
_log.error("Error when getting message header", e);
}
+
return result;
}
@@ -355,10 +353,12 @@ public class AMQMessage implements StorableMessage
try
{
result = _messageHandle.getContentHeaderBody(_txnContext.getStoreContext(), _messageId).getSize();
- } catch (AMQException e)
+ }
+ catch (AMQException e)
{
_log.error("Error when getting message header size", e);
}
+
return result;
}
@@ -372,7 +372,7 @@ public class AMQMessage implements StorableMessage
return _messageHandle.getMessagePayload().length;
}
- public boolean isEnqueued()
+ public boolean isEnqueued()
{
return _queues.size() > 0;
}
@@ -401,6 +401,7 @@ public class AMQMessage implements StorableMessage
{
_log.debug("The queue position is " + _queues.indexOf(queue));
}
+
return _queues.indexOf(queue);
}
@@ -424,44 +425,40 @@ public class AMQMessage implements StorableMessage
return new BodyContentIterator();
}
- public ContentHeaderBody getContentHeaderBody()
- throws
- AMQException
+ public ContentHeaderBody getContentHeaderBody() throws AMQException
{
if (_transientMessageData != null)
{
return _transientMessageData.getContentHeaderBody();
- } else
+ }
+ else
{
return _messageHandle.getContentHeaderBody(getStoreContext(), _messageId);
}
}
- public void setContentHeaderBody(ContentHeaderBody contentHeaderBody)
- throws
- AMQException
+ public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) throws AMQException
{
_transientMessageData.setContentHeaderBody(contentHeaderBody);
}
public void routingComplete(MessageStore store, StoreContext storeContext, MessageHandleFactory factory)
- throws
- AMQException
+ throws AMQException
{
final boolean persistent = isPersistent();
_messageHandle = factory.createMessageHandle(store, this, persistent);
- //if (persistent)
- // {
- _txnContext.beginTranIfNecessary();
- // }
+ // if (persistent)
+ // {
+ _txnContext.beginTranIfNecessary();
+ // }
// enqueuing the messages ensure that if required the destinations are recorded to a
// persistent store
- // for (AMQQueue q : _transientMessageData.getDestinationQueues())
- // {
- // _messageHandle.enqueue(storeContext, _messageId, q);
- // }
+ // for (AMQQueue q : _transientMessageData.getDestinationQueues())
+ // {
+ // _messageHandle.enqueue(storeContext, _messageId, q);
+ // }
if (_transientMessageData.getContentHeaderBody().bodySize == 0)
{
@@ -469,9 +466,7 @@ public class AMQMessage implements StorableMessage
}
}
- public boolean addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk)
- throws
- AMQException
+ public boolean addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk) throws AMQException
{
_transientMessageData.addBodyLength(contentChunk.getSize());
final boolean allContentReceived = isAllContentReceived();
@@ -479,21 +474,20 @@ public class AMQMessage implements StorableMessage
if (allContentReceived)
{
deliver(storeContext);
+
return true;
- } else
+ }
+ else
{
return false;
}
}
- public boolean isAllContentReceived()
- throws
- AMQException
+ public boolean isAllContentReceived() throws AMQException
{
return _transientMessageData.isAllContentReceived();
}
-
/**
* Creates a long-lived reference to this message, and increments the count of such references, as an atomic
* operation.
@@ -501,6 +495,7 @@ public class AMQMessage implements StorableMessage
public AMQMessage takeReference()
{
_referenceCount.incrementAndGet();
+
return this;
}
@@ -510,10 +505,10 @@ public class AMQMessage implements StorableMessage
protected void incrementReference()
{
_referenceCount.incrementAndGet();
-// if (_log.isDebugEnabled())
-// {
-// _log.debug("Ref count on message " + debugIdentity() + " incremented " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
-// }
+ // if (_log.isDebugEnabled())
+ // {
+ // _log.debug("Ref count on message " + debugIdentity() + " incremented " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
+ // }
}
/**
@@ -524,9 +519,7 @@ public class AMQMessage implements StorableMessage
* @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
* failed
*/
- public void decrementReference(StoreContext storeContext)
- throws
- MessageCleanupException
+ public void decrementReference(StoreContext storeContext) throws MessageCleanupException
{
int count = _referenceCount.decrementAndGet();
@@ -538,10 +531,10 @@ public class AMQMessage implements StorableMessage
{
try
{
-// if (_log.isDebugEnabled())
-// {
-// _log.debug("Decremented ref count on message " + debugIdentity() + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
-// }
+ // if (_log.isDebugEnabled())
+ // {
+ // _log.debug("Decremented ref count on message " + debugIdentity() + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
+ // }
// must check if the handle is null since there may be cases where we decide to throw away a message
// and the handle has not yet been constructed
@@ -552,15 +545,17 @@ public class AMQMessage implements StorableMessage
}
catch (AMQException e)
{
- //to maintain consistency, we revert the count
+ // to maintain consistency, we revert the count
incrementReference();
- throw new MessageCleanupException(_messageId, e);
+ throw new MessageCleanupException("Failed to cleanup message with id " + _messageId, e);
}
- } else
+ }
+ else
{
if (count < 0)
{
- throw new MessageCleanupException("Reference count for message id " + debugIdentity() + " has gone below 0.");
+ throw new MessageCleanupException("Reference count for message id " + debugIdentity() + " has gone below 0.",
+ null);
}
}
}
@@ -587,7 +582,7 @@ public class AMQMessage implements StorableMessage
public boolean isTaken(AMQQueue queue)
{
- //return _taken.get();
+ // return _taken.get();
synchronized (this)
{
@@ -604,15 +599,15 @@ public class AMQMessage implements StorableMessage
public boolean taken(AMQQueue queue, Subscription sub)
{
-// if (_taken.getAndSet(true))
-// {
-// return true;
-// }
-// else
-// {
-// _takenBySubcription = sub;
-// return false;
-// }
+ // if (_taken.getAndSet(true))
+ // {
+ // return true;
+ // }
+ // else
+ // {
+ // _takenBySubcription = sub;
+ // return false;
+ // }
synchronized (this)
{
@@ -625,10 +620,12 @@ public class AMQMessage implements StorableMessage
if (taken.getAndSet(true))
{
return true;
- } else
+ }
+ else
{
_takenMap.put(queue, taken);
_takenBySubcriptionMap.put(queue, sub);
+
return false;
}
}
@@ -641,9 +638,8 @@ public class AMQMessage implements StorableMessage
_log.trace("Releasing Message:" + debugIdentity());
}
-// _taken.set(false);
-// _takenBySubcription = null;
-
+ // _taken.set(false);
+ // _takenBySubcription = null;
synchronized (this)
{
@@ -651,7 +647,8 @@ public class AMQMessage implements StorableMessage
if (taken == null)
{
taken = new AtomicBoolean(false);
- } else
+ }
+ else
{
taken.set(false);
}
@@ -672,9 +669,11 @@ public class AMQMessage implements StorableMessage
if (_tokens.contains(token))
{
return true;
- } else
+ }
+ else
{
_tokens.add(token);
+
return false;
}
}
@@ -687,28 +686,23 @@ public class AMQMessage implements StorableMessage
* @param queue the queue
* @throws org.apache.qpid.AMQException if there is an error enqueuing the message
*/
- public void enqueue(AMQQueue queue)
- throws
- AMQException
+ public void enqueue(AMQQueue queue) throws AMQException
{
_transientMessageData.addDestinationQueue(queue);
}
- public void dequeue(StoreContext storeContext, AMQQueue queue)
- throws
- AMQException
+ public void dequeue(StoreContext storeContext, AMQQueue queue) throws AMQException
{
_messageHandle.dequeue(storeContext, _messageId, queue);
}
- public boolean isPersistent()
- throws
- AMQException
+ public boolean isPersistent() throws AMQException
{
if (_transientMessageData != null)
{
return _transientMessageData.isPersistent();
- } else
+ }
+ else
{
return _messageHandle.isPersistent(getStoreContext(), _messageId);
}
@@ -720,29 +714,27 @@ public class AMQMessage implements StorableMessage
* @throws NoConsumersException if the message is marked for immediate delivery but has not been marked as delivered
* to a consumer
*/
- public void checkDeliveredToConsumer()
- throws
- NoConsumersException
+ public void checkDeliveredToConsumer() throws NoConsumersException
{
if (_immediate && !_deliveredToConsumer)
{
- throw new NoConsumersException(this);
+ throw new NoConsumersException(this, null);
}
}
- public MessagePublishInfo getMessagePublishInfo()
- throws
- AMQException
+ public MessagePublishInfo getMessagePublishInfo() throws AMQException
{
MessagePublishInfo pb;
if (_transientMessageData != null)
{
pb = _transientMessageData.getMessagePublishInfo();
- } else
+ }
+ else
{
pb = _messageHandle.getMessagePublishInfo(getStoreContext(), _messageId);
}
+
return pb;
}
@@ -773,7 +765,7 @@ public class AMQMessage implements StorableMessage
*/
public boolean expired(StoreContext storecontext, AMQQueue queue) throws AMQException
{
- //note: If the storecontext isn't need then we can remove the getChannel() from Subscription.
+ // note: If the storecontext isn't need then we can remove the getChannel() from Subscription.
if (_expiration != 0L)
{
@@ -782,6 +774,7 @@ public class AMQMessage implements StorableMessage
if (now > _expiration)
{
dequeue(storecontext, queue);
+
return true;
}
}
@@ -795,9 +788,7 @@ public class AMQMessage implements StorableMessage
_deliveredToConsumer = true;
}
- private void deliver(StoreContext storeContext)
- throws
- AMQException
+ private void deliver(StoreContext storeContext) throws AMQException
{
// we get a reference to the destination queues now so that we can clear the
// transient message data as quickly as possible
@@ -806,12 +797,13 @@ public class AMQMessage implements StorableMessage
{
_log.debug("Delivering message " + debugIdentity() + " to " + destinationQueues);
}
+
try
{
// first we allow the handle to know that the message has been fully received. This is useful if it is
// maintaining any calculated values based on content chunks
- _messageHandle.setPublishAndContentHeaderBody(storeContext, _messageId, _transientMessageData.getMessagePublishInfo(),
- _transientMessageData.getContentHeaderBody());
+ _messageHandle.setPublishAndContentHeaderBody(storeContext, _messageId,
+ _transientMessageData.getMessagePublishInfo(), _transientMessageData.getContentHeaderBody());
// we then allow the transactional context to do something with the message content
// now that it has all been received, before we attempt delivery
@@ -821,9 +813,9 @@ public class AMQMessage implements StorableMessage
for (AMQQueue q : destinationQueues)
{
- //Increment the references to this message for each queue delivery.
+ // Increment the references to this message for each queue delivery.
incrementReference();
- //normal deliver so add this message at the end.
+ // normal deliver so add this message at the end.
_txnContext.deliver(this, q, false);
}
}
@@ -835,182 +827,181 @@ public class AMQMessage implements StorableMessage
}
}
-/*
- public void writeDeliver(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag)
- throws AMQException
- {
- ByteBuffer deliver = createEncodedDeliverFrame(protocolSession, channelId, deliveryTag, consumerTag);
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- getContentHeaderBody());
-
- final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId);
- if (bodyCount == 0)
+ /*
+ public void writeDeliver(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag)
+ throws AMQException
{
- SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
- contentHeader);
+ ByteBuffer deliver = createEncodedDeliverFrame(protocolSession, channelId, deliveryTag, consumerTag);
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ getContentHeaderBody());
+
+ final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId);
+ if (bodyCount == 0)
+ {
+ SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
+ contentHeader);
+
+ protocolSession.writeFrame(compositeBlock);
+ }
+ else
+ {
+
+ //
+ // Optimise the case where we have a single content body. In that case we create a composite block
+ // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+ //
+ ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0);
+
+ AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
+ AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ protocolSession.writeFrame(compositeBlock);
+
+ //
+ // Now start writing out the other content bodies
+ //
+ for (int i = 1; i < bodyCount; i++)
+ {
+ cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i);
+ protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+ }
+
+
+ }
+
- protocolSession.writeFrame(compositeBlock);
}
- else
+
+ public void writeGetOk(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize) throws AMQException
{
+ ByteBuffer deliver = createEncodedGetOkFrame(protocolSession, channelId, deliveryTag, queueSize);
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ getContentHeaderBody());
- //
- // Optimise the case where we have a single content body. In that case we create a composite block
- // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
- //
- ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0);
+ final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId);
+ if (bodyCount == 0)
+ {
+ SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
+ contentHeader);
+ protocolSession.writeFrame(compositeBlock);
+ }
+ else
+ {
+
+ //
+ // Optimise the case where we have a single content body. In that case we create a composite block
+ // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+ //
+ ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0);
+
+ AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
+ AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ protocolSession.writeFrame(compositeBlock);
+
+ //
+ // Now start writing out the other content bodies
+ //
+ for (int i = 1; i < bodyCount; i++)
+ {
+ cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i);
+ protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+ }
- AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
- protocolSession.writeFrame(compositeBlock);
- //
- // Now start writing out the other content bodies
- //
- for (int i = 1; i < bodyCount; i++)
- {
- cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i);
- protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
}
}
- }
+ private ByteBuffer createEncodedDeliverFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag)
+ throws AMQException
+ {
+ MessagePublishInfo pb = getMessagePublishInfo();
+ AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, protocolSession.getProtocolMajorVersion(), (byte) 0, consumerTag,
+ deliveryTag, pb.getExchange(), _messageHandle.isRedelivered(),
+ pb.getRoutingKey());
+ ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
+ deliverFrame.writePayload(buf);
+ buf.flip();
+ return buf;
+ }
- public void writeGetOk(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize) throws AMQException
- {
- ByteBuffer deliver = createEncodedGetOkFrame(protocolSession, channelId, deliveryTag, queueSize);
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- getContentHeaderBody());
+ private ByteBuffer createEncodedGetOkFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize)
+ throws AMQException
+ {
+ MessagePublishInfo pb = getMessagePublishInfo();
+ AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId,
+ protocolSession.getProtocolMajorVersion(),
+ protocolSession.getProtocolMinorVersion(),
+ deliveryTag, pb.getExchange(),
+ queueSize,
+ _messageHandle.isRedelivered(),
+ pb.getRoutingKey());
+ ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); // XXX: Could cast be a problem?
+ getOkFrame.writePayload(buf);
+ buf.flip();
+ return buf;
+ }
- final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId);
- if (bodyCount == 0)
+ private ByteBuffer createEncodedReturnFrame(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText) throws AMQException
{
- SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
- contentHeader);
- protocolSession.writeFrame(compositeBlock);
+ AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId,
+ protocolSession.getProtocolMajorVersion(),
+ protocolSession.getProtocolMinorVersion(),
+ getMessagePublishInfo().getExchange(),
+ replyCode, replyText,
+ getMessagePublishInfo().getRoutingKey());
+ ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem?
+ returnFrame.writePayload(buf);
+ buf.flip();
+ return buf;
}
- else
+
+ public void writeReturn(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText)
+ throws AMQException
{
+ ByteBuffer returnFrame = createEncodedReturnFrame(protocolSession, channelId, replyCode, replyText);
+
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ getContentHeaderBody());
+ Iterator<AMQDataBlock> bodyFrameIterator = getBodyFrameIterator(protocolSession, channelId);
//
// Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
- ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0);
-
- AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
- protocolSession.writeFrame(compositeBlock);
+ if (bodyFrameIterator.hasNext())
+ {
+ AMQDataBlock firstContentBody = bodyFrameIterator.next();
+ AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent);
+ protocolSession.writeFrame(compositeBlock);
+ }
+ else
+ {
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame,
+ new AMQDataBlock[]{contentHeader});
+ protocolSession.writeFrame(compositeBlock);
+ }
//
// Now start writing out the other content bodies
+ // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
//
- for (int i = 1; i < bodyCount; i++)
+ while (bodyFrameIterator.hasNext())
{
- cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i);
- protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+ protocolSession.writeFrame(bodyFrameIterator.next());
}
-
-
}
-
-
- }
-
-
- private ByteBuffer createEncodedDeliverFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag)
- throws AMQException
- {
- MessagePublishInfo pb = getMessagePublishInfo();
- AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, protocolSession.getProtocolMajorVersion(), (byte) 0, consumerTag,
- deliveryTag, pb.getExchange(), _messageHandle.isRedelivered(),
- pb.getRoutingKey());
- ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
- deliverFrame.writePayload(buf);
- buf.flip();
- return buf;
- }
-
- private ByteBuffer createEncodedGetOkFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize)
- throws AMQException
- {
- MessagePublishInfo pb = getMessagePublishInfo();
- AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId,
- protocolSession.getProtocolMajorVersion(),
- protocolSession.getProtocolMinorVersion(),
- deliveryTag, pb.getExchange(),
- queueSize,
- _messageHandle.isRedelivered(),
- pb.getRoutingKey());
- ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); // XXX: Could cast be a problem?
- getOkFrame.writePayload(buf);
- buf.flip();
- return buf;
- }
-
- private ByteBuffer createEncodedReturnFrame(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText) throws AMQException
- {
- AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId,
- protocolSession.getProtocolMajorVersion(),
- protocolSession.getProtocolMinorVersion(),
- getMessagePublishInfo().getExchange(),
- replyCode, replyText,
- getMessagePublishInfo().getRoutingKey());
- ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem?
- returnFrame.writePayload(buf);
- buf.flip();
- return buf;
- }
-
- public void writeReturn(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText)
- throws AMQException
- {
- ByteBuffer returnFrame = createEncodedReturnFrame(protocolSession, channelId, replyCode, replyText);
-
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- getContentHeaderBody());
-
- Iterator<AMQDataBlock> bodyFrameIterator = getBodyFrameIterator(protocolSession, channelId);
- //
- // Optimise the case where we have a single content body. In that case we create a composite block
- // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
- //
- if (bodyFrameIterator.hasNext())
- {
- AMQDataBlock firstContentBody = bodyFrameIterator.next();
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent);
- protocolSession.writeFrame(compositeBlock);
- }
- else
- {
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame,
- new AMQDataBlock[]{contentHeader});
- protocolSession.writeFrame(compositeBlock);
- }
-
- //
- // Now start writing out the other content bodies
- // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
- //
- while (bodyFrameIterator.hasNext())
- {
- protocolSession.writeFrame(bodyFrameIterator.next());
- }
- }
-*/
+ */
public AMQMessageHandle getMessageHandle()
{
return _messageHandle;
}
-
public long getSize()
{
try
@@ -1022,15 +1013,13 @@ public class AMQMessage implements StorableMessage
catch (AMQException e)
{
_log.error(e.toString(), e);
+
return 0;
}
}
-
- public void restoreTransientMessageData()
- throws
- AMQException
+ public void restoreTransientMessageData() throws AMQException
{
TransientMessageData transientMessageData = new TransientMessageData();
transientMessageData.setMessagePublishInfo(getMessagePublishInfo());
@@ -1039,25 +1028,23 @@ public class AMQMessage implements StorableMessage
_transientMessageData = transientMessageData;
}
-
public void clearTransientMessageData()
{
_transientMessageData = null;
}
-
public String toString()
{
-// return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
-// _taken + " by :" + _takenBySubcription;
+ // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
+ // _taken + " by :" + _takenBySubcription;
- return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " +
- _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
+ return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: "
+ + _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
}
public Subscription getDeliveredSubscription(AMQQueue queue)
{
-// return _takenBySubcription;
+ // return _takenBySubcription;
synchronized (this)
{
return _takenBySubcriptionMap.get(queue);
@@ -1074,7 +1061,8 @@ public class AMQMessage implements StorableMessage
}
_rejectedBy.add(subscription);
- } else
+ }
+ else
{
_log.warn("Requesting rejection by null subscriber:" + debugIdentity());
}
@@ -1084,10 +1072,11 @@ public class AMQMessage implements StorableMessage
{
boolean rejected = _rejectedBy != null;
- if (rejected) // We have subscriptions that rejected this message
+ if (rejected) // We have subscriptions that rejected this message
{
return _rejectedBy.contains(subscription);
- } else // This messasge hasn't been rejected yet.
+ }
+ else // This messasge hasn't been rejected yet.
{
return rejected;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index a17cbb87ff..a803ef1227 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -21,9 +21,9 @@
package org.apache.qpid.server.queue;
import java.text.MessageFormat;
-import java.util.List;
-import java.util.Hashtable;
import java.util.Collection;
+import java.util.Hashtable;
+import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -40,11 +40,11 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.exception.InternalErrorException;
-import org.apache.qpid.server.messageStore.StorableMessage;
-import org.apache.qpid.server.messageStore.StorableQueue;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.messageStore.StorableMessage;
+import org.apache.qpid.server.messageStore.StorableQueue;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -55,49 +55,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
*/
public class AMQQueue implements Managable, Comparable, StorableQueue
{
- /**
- * ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription
- * already exists.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Represent failure to create a subscription, because an exclusive subscription already exists.
- * </table>
- *
- * @todo Not an AMQP exception as no status code.
- *
- * @todo Move to top level, used outside this class.
- */
- public static int s_queueID =0;
- public static final class ExistingExclusiveSubscription extends AMQException
- {
-
- public ExistingExclusiveSubscription()
- {
- super("");
- }
- }
-
- /**
- * ExistingSubscriptionPreventsExclusive signals a failure to create an exclusize subscription, as a subscription
- * already exists.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Represent failure to create an exclusize subscription, as a subscription already exists.
- * </table>
- *
- * @todo Not an AMQP exception as no status code.
- *
- * @todo Move to top level, used outside this class.
- */
- public static final class ExistingSubscriptionPreventsExclusive extends AMQException
- {
- public ExistingSubscriptionPreventsExclusive()
- {
- super("");
- }
- }
+ public static int s_queueID = 0;
private static final Logger _logger = Logger.getLogger(AMQQueue.class);
@@ -108,7 +66,6 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
// The list of enqueued messages.
Hashtable<Long, StorableMessage> _messages = new Hashtable<Long, StorableMessage>();
-
/**
* null means shared
*/
@@ -236,12 +193,10 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
_subscribers = subscribers;
_subscriptionFactory = subscriptionFactory;
_deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
- _queueId = s_queueID++;
+ _queueId = s_queueID++;
}
- private AMQQueueMBean createMBean()
- throws
- AMQException
+ private AMQQueueMBean createMBean() throws AMQException
{
try
{
@@ -249,7 +204,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
}
catch (JMException ex)
{
- throw new AMQException("AMQQueue MBean creation has failed ", ex);
+ throw new AMQException(null, "AMQQueue MBean creation has failed ", ex);
}
}
@@ -443,9 +398,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
/**
* Removes the AMQMessage from the top of the queue.
*/
- public synchronized void deleteMessageFromTop(StoreContext storeContext)
- throws
- AMQException
+ public synchronized void deleteMessageFromTop(StoreContext storeContext) throws AMQException
{
_deliveryMgr.removeAMessageFromTop(storeContext);
}
@@ -453,16 +406,12 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
/**
* removes all the messages from the queue.
*/
- public synchronized long clearQueue(StoreContext storeContext)
- throws
- AMQException
+ public synchronized long clearQueue(StoreContext storeContext) throws AMQException
{
return _deliveryMgr.clearAllMessages(storeContext);
}
- public void bind(AMQShortString routingKey, FieldTable arguments, Exchange exchange)
- throws
- AMQException
+ public void bind(AMQShortString routingKey, FieldTable arguments, Exchange exchange) throws AMQException
{
exchange.registerQueue(routingKey, this, arguments);
if (isDurable() && exchange.isDurable())
@@ -470,18 +419,17 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
try
{
_virtualHost.getMessageStore().bindQueue(exchange, routingKey, this, arguments);
- } catch (InternalErrorException e)
+ }
+ catch (InternalErrorException e)
{
- throw new AMQException("Problem binding queue ", e);
+ throw new AMQException(null, "Problem binding queue ", e);
}
}
_bindings.addBinding(routingKey, arguments, exchange);
}
- public void unBind(AMQShortString routingKey, FieldTable arguments, Exchange exchange)
- throws
- AMQException
+ public void unBind(AMQShortString routingKey, FieldTable arguments, Exchange exchange) throws AMQException
{
exchange.deregisterQueue(routingKey, this, arguments);
if (isDurable() && exchange.isDurable())
@@ -489,9 +437,10 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
try
{
_virtualHost.getMessageStore().unbindQueue(exchange, routingKey, this, arguments);
- } catch (InternalErrorException e)
+ }
+ catch (InternalErrorException e)
{
- throw new AMQException("problem unbinding queue", e);
+ throw new AMQException(null, "problem unbinding queue", e);
}
}
@@ -506,14 +455,16 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
if (isExclusive())
{
decrementSubscriberCount();
- throw new ExistingExclusiveSubscription();
- } else if (exclusive)
+ throw new ExistingExclusiveSubscriptionException();
+ }
+ else if (exclusive)
{
decrementSubscriberCount();
- throw new ExistingSubscriptionPreventsExclusive();
+ throw new ExistingSubscriptionPreventsExclusiveException();
}
- } else if (exclusive)
+ }
+ else if (exclusive)
{
setExclusive(true);
}
@@ -559,9 +510,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
return _subscriberCount.decrementAndGet();
}
- public void unregisterProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag)
- throws
- AMQException
+ public void unregisterProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag) throws AMQException
{
if (_logger.isDebugEnabled())
{
@@ -576,8 +525,8 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
_subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel, ps, consumerTag)))
== null)
{
- throw new AMQException("Protocol session with channel " + channel + " and consumer tag " + consumerTag
- + " and protocol session key " + ps.getKey() + " not registered with queue " + this);
+ throw new AMQException(null, "Protocol session with channel " + channel + " and consumer tag " + consumerTag
+ + " and protocol session key " + ps.getKey() + " not registered with queue " + this, null);
}
removedSubscription.close();
@@ -609,21 +558,21 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
return !_deliveryMgr.hasQueuedMessages();
}
- public int delete(boolean checkUnused, boolean checkEmpty)
- throws
- AMQException
+ public int delete(boolean checkUnused, boolean checkEmpty) throws AMQException
{
if (checkUnused && !_subscribers.isEmpty())
{
_logger.info("Will not delete " + this + " as it is in use.");
return 0;
- } else if (checkEmpty && _deliveryMgr.hasQueuedMessages())
+ }
+ else if (checkEmpty && _deliveryMgr.hasQueuedMessages())
{
_logger.info("Will not delete " + this + " as it is not empty.");
return 0;
- } else
+ }
+ else
{
delete();
@@ -631,9 +580,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
}
}
- public void delete()
- throws
- AMQException
+ public void delete() throws AMQException
{
if (!_deleted.getAndSet(true))
{
@@ -650,9 +597,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
}
}
- protected void autodelete()
- throws
- AMQException
+ protected void autodelete() throws AMQException
{
if (_logger.isDebugEnabled())
{
@@ -662,9 +607,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
delete();
}
- public void processGet(StoreContext storeContext, AMQMessage msg, boolean deliverFirst)
- throws
- AMQException
+ public void processGet(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException
{
// fixme not sure what this is doing. should we be passing deliverFirst through here?
// This code is not used so when it is perhaps it should
@@ -687,9 +630,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
// return _deliveryMgr;
// }
- public void process(StoreContext storeContext, AMQMessage msg, boolean deliverFirst)
- throws
- AMQException
+ public void process(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException
{
_deliveryMgr.deliver(storeContext, getName(), msg, deliverFirst);
try
@@ -705,9 +646,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
}
}
- void dequeue(StoreContext storeContext, AMQMessage msg)
- throws
- FailedDequeueException
+ void dequeue(StoreContext storeContext, AMQMessage msg) throws FailedDequeueException
{
try
{
@@ -737,9 +676,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
return _subscribers;
}
- protected void updateReceivedMessageCount(AMQMessage msg)
- throws
- AMQException
+ protected void updateReceivedMessageCount(AMQMessage msg) throws AMQException
{
if (!msg.isRedelivered())
{
@@ -752,7 +689,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
}
catch (JMException e)
{
- throw new AMQException("Unable to get notification from manage queue: " + e, e);
+ throw new AMQException(null, "Unable to get notification from manage queue: " + e, e);
}
}
@@ -783,9 +720,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
return "Queue(" + _name + ")@" + System.identityHashCode(this);
}
- public boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks)
- throws
- AMQException
+ public boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) throws AMQException
{
return _deliveryMgr.performGet(session, channel, acks);
}
@@ -802,9 +737,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
public static interface Task
{
- public void doTask(AMQQueue queue)
- throws
- AMQException;
+ public void doTask(AMQQueue queue) throws AMQException;
}
public void addQueueDeleteTask(Task task)
@@ -837,9 +770,9 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
_deliveryMgr.subscriberHasPendingResend(hasContent, subscription, msg);
}
- //========================================================================
+ // ========================================================================
// Interface StorableQueue
- //========================================================================
+ // ========================================================================
public int getQueueID()
{
@@ -861,9 +794,9 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
_messages.put(m.getMessageId(), m);
}
- //========================================================================
+ // ========================================================================
// Used by the Store
- //========================================================================
+ // ========================================================================
/**
* Get the list of enqueud messages
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ExistingExclusiveSubscriptionException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ExistingExclusiveSubscriptionException.java
new file mode 100644
index 0000000000..a5ff9e6326
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ExistingExclusiveSubscriptionException.java
@@ -0,0 +1,22 @@
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+
+/**
+ * ExistingExclusiveSubscriptionException signals a failure to create a subscription, because an exclusive subscription
+ * already exists.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represent failure to create a subscription, because an exclusive subscription already exists.
+ * </table>
+ *
+ * @todo Not an AMQP exception as no status code.
+ */
+public final class ExistingExclusiveSubscriptionException extends AMQException
+{
+ public ExistingExclusiveSubscriptionException()
+ {
+ super(null, "", null);
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ExistingSubscriptionPreventsExclusiveException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ExistingSubscriptionPreventsExclusiveException.java
new file mode 100644
index 0000000000..a13686eb56
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ExistingSubscriptionPreventsExclusiveException.java
@@ -0,0 +1,22 @@
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+
+/**
+ * ExistingSubscriptionPreventsExclusiveException signals a failure to create an exclusize subscription, as a subscription
+ * already exists.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represent failure to create an exclusize subscription, as a subscription already exists.
+ * </table>
+ *
+ * @todo Not an AMQP exception as no status code.
+ */
+public final class ExistingSubscriptionPreventsExclusiveException extends AMQException
+{
+ public ExistingSubscriptionPreventsExclusiveException()
+ {
+ super(null, "", null);
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java
index 6466e81dd2..d5c34152a8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java
@@ -38,13 +38,8 @@ import org.apache.qpid.AMQException;
*/
public class FailedDequeueException extends AMQException
{
- public FailedDequeueException(String queue)
+ public FailedDequeueException(String queue, Throwable cause)
{
- super("Failed to dequeue message from " + queue);
- }
-
- public FailedDequeueException(String queue, AMQException e)
- {
- super("Failed to dequeue message from " + queue, e);
+ super(null, "Failed to dequeue message from " + queue, cause);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java
index 090096d3c3..2fdd2791b1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java
@@ -40,13 +40,8 @@ import org.apache.qpid.AMQException;
*/
public class MessageCleanupException extends AMQException
{
- public MessageCleanupException(long messageId, AMQException e)
+ public MessageCleanupException(String message, Throwable cause)
{
- super("Failed to cleanup message with id " + messageId, e);
- }
-
- public MessageCleanupException(String message)
- {
- super(message);
+ super(null, message, cause);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java
index d6fd1eec89..afcdf062de 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java
@@ -35,9 +35,9 @@ import org.apache.qpid.server.RequiredDeliveryException;
*/
public class NoConsumersException extends RequiredDeliveryException
{
- public NoConsumersException(AMQMessage message)
+ public NoConsumersException(AMQMessage message, Throwable cause)
{
- super("Immediate delivery is not possible.", message);
+ super("Immediate delivery is not possible.", message, cause);
}
public AMQConstant getReplyCode()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java
index 5539627820..560549c126 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java
@@ -175,7 +175,7 @@ public class StorableMessageHandle implements AMQMessageHandle
}
} catch (Exception e)
{
- throw new AMQException("PRoblem during message enqueue", e);
+ throw new AMQException(null, "PRoblem during message enqueue", e);
}
}
@@ -191,7 +191,7 @@ public class StorableMessageHandle implements AMQMessageHandle
}
} catch (Exception e)
{
- throw new AMQException("PRoblem during message dequeue", e);
+ throw new AMQException(null, "PRoblem during message dequeue", e);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index a7be9f2ad2..1cebf08fa6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -108,7 +108,7 @@ public class SubscriptionImpl implements Subscription
AMQChannel channel = protocolSession.getChannel(channelId);
if (channel == null)
{
- throw new AMQException(AMQConstant.NOT_FOUND, "channel :" + channelId + " not found in protocol session");
+ throw new AMQException(AMQConstant.NOT_FOUND, "channel :" + channelId + " not found in protocol session", null);
}
this.channel = channel;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/IllegalStateTransitionException.java b/java/broker/src/main/java/org/apache/qpid/server/state/IllegalStateTransitionException.java
deleted file mode 100644
index cec67a8a6d..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/state/IllegalStateTransitionException.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.state;
-
-import org.apache.qpid.AMQException;
-
-/**
- * @todo Not an AMQP exception as no status code.
- *
- * @todo Not used! Delete.
- */
-public class IllegalStateTransitionException extends AMQException
-{
- private AMQState _originalState;
-
- private Class _frame;
-
- public IllegalStateTransitionException(AMQState originalState, Class frame)
- {
- super("No valid state transition defined for receiving frame " + frame + " from state " + originalState);
- _originalState = originalState;
- _frame = frame;
- }
-
- public AMQState getOriginalState()
- {
- return _originalState;
- }
-
- public Class getFrameClass()
- {
- return _frame;
- }
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java
index 05756a8c23..6c001485b9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java
@@ -87,7 +87,7 @@ public class DistributedTransactionalContext implements TransactionalContext
_storeContext.setPayload(xid);
} catch (Exception e)
{
- throw new AMQException("Problem during transaction begin", e);
+ throw new AMQException(null, "Problem during transaction begin", e);
}
}
}
@@ -105,7 +105,7 @@ public class DistributedTransactionalContext implements TransactionalContext
}
} catch (Exception e)
{
- throw new AMQException("Problem during transaction commit", e);
+ throw new AMQException(null, "Problem during transaction commit", e);
}
finally
{
@@ -125,7 +125,7 @@ public class DistributedTransactionalContext implements TransactionalContext
}
} catch (Exception e)
{
- throw new AMQException("Problem during transaction rollback", e);
+ throw new AMQException(null, "Problem during transaction rollback", e);
}
finally
{
@@ -152,7 +152,7 @@ public class DistributedTransactionalContext implements TransactionalContext
_transactionManager.getTransaction((Xid) _storeContext.getPayload()).addRecord(new EnqueueRecord(_storeContext, message, queue, deliverFirst));
} catch (Exception e)
{
- throw new AMQException("Problem during transaction rollback", e);
+ throw new AMQException(null, "Problem during transaction rollback", e);
}
}
@@ -196,7 +196,7 @@ public class DistributedTransactionalContext implements TransactionalContext
{
if (!unacknowledgedMessageMap.contains(deliveryTag))
{
- throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel");
+ throw new AMQException(null, "Multiple ack on delivery tag " + deliveryTag + " not known for channel", null);
}
LinkedList<UnacknowledgedMessage> acked = new LinkedList<UnacknowledgedMessage>();
@@ -219,7 +219,7 @@ public class DistributedTransactionalContext implements TransactionalContext
if (msg == null)
{
_log.info("Single ack on delivery tag " + deliveryTag);
- throw new AMQException("Single ack on delivery tag " + deliveryTag);
+ throw new AMQException(null, "Single ack on delivery tag " + deliveryTag, null);
}
if (_log.isDebugEnabled())
@@ -250,7 +250,7 @@ public class DistributedTransactionalContext implements TransactionalContext
_transactionManager.getTransaction((Xid) _storeContext.getPayload()).addRecord(new DequeueRecord());
} catch (Exception e)
{
- throw new AMQException("Problem during message dequeue", e);
+ throw new AMQException(null, "Problem during message dequeue", e);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
index 6d776eec0f..93459beb45 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
@@ -126,7 +126,7 @@ public class LocalTransactionalContext implements TransactionalContext
{
if (!unacknowledgedMessageMap.contains(deliveryTag))
{
- throw new AMQException("Ack with delivery tag " + deliveryTag + " not known for channel");
+ throw new AMQException(null, "Ack with delivery tag " + deliveryTag + " not known for channel", null);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
index 496c94dae9..addb0c791f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
@@ -149,7 +149,7 @@ public class NonTransactionalContext implements TransactionalContext
{
if (!unacknowledgedMessageMap.contains(deliveryTag))
{
- throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel");
+ throw new AMQException(null, "Multiple ack on delivery tag " + deliveryTag + " not known for channel", null);
}
LinkedList<UnacknowledgedMessage> acked = new LinkedList<UnacknowledgedMessage>();
@@ -182,8 +182,8 @@ public class NonTransactionalContext implements TransactionalContext
{
_log.info("Single ack on delivery tag " + deliveryTag + " not known for channel:" +
_channel.getChannelId());
- throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" +
- _channel.getChannelId());
+ throw new AMQException(null, "Single ack on delivery tag " + deliveryTag + " not known for channel:" +
+ _channel.getChannelId(), null);
}
if (!_browsedAcks.contains(deliveryTag))
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java
index f3b21e3c64..69960e54e5 100644
--- a/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,12 +19,13 @@
package org.apache.qpid.example.publisher;
-import org.apache.qpid.example.shared.FileUtils;
-import org.apache.qpid.example.shared.Statics;
-
import java.io.*;
+
import javax.jms.*;
+import org.apache.qpid.example.shared.FileUtils;
+import org.apache.qpid.example.shared.Statics;
+
public class FileMessageFactory
{
protected final Session _session;
@@ -47,8 +48,7 @@ public class FileMessageFactory
}
catch (IOException e)
{
- MessageFactoryException mfe = new MessageFactoryException(e.toString());
- mfe.initCause(e);
+ MessageFactoryException mfe = new MessageFactoryException(e.toString(), e);
throw mfe;
}
}
@@ -64,7 +64,8 @@ public class FileMessageFactory
{
TextMessage msg = _session.createTextMessage();
msg.setText(_payload);
- msg.setStringProperty(Statics.FILENAME_PROPERTY,new File(_filename).getName());
+ msg.setStringProperty(Statics.FILENAME_PROPERTY, new File(_filename).getName());
+
return msg;
}
@@ -79,6 +80,7 @@ public class FileMessageFactory
{
TextMessage msg = session.createTextMessage();
msg.setText(textMsg);
+
return msg;
}
@@ -116,6 +118,7 @@ public class FileMessageFactory
catch (JMSException e)
{
e.printStackTrace(System.out);
+
return e.toString();
}
}
@@ -124,13 +127,13 @@ public class FileMessageFactory
{
try
{
- return m instanceof TextMessage && ((TextMessage) m).getText().equals(s);
+ return (m instanceof TextMessage) && ((TextMessage) m).getText().equals(s);
}
catch (JMSException e)
{
e.printStackTrace(System.out);
+
return false;
}
}
}
-
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/publisher/MessageFactoryException.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MessageFactoryException.java
index 0a4231c977..d709da6432 100644
--- a/java/client/example/src/main/java/org/apache/qpid/example/publisher/MessageFactoryException.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MessageFactoryException.java
@@ -22,33 +22,8 @@ package org.apache.qpid.example.publisher;
public class MessageFactoryException extends Exception
{
-
- private int _errorCode;
-
- public MessageFactoryException(String message)
- {
- super(message);
- }
-
public MessageFactoryException(String msg, Throwable t)
{
super(msg, t);
}
-
- public MessageFactoryException(int errorCode, String msg, Throwable t)
- {
- super(msg + " [error code " + errorCode + ']', t);
- _errorCode = errorCode;
- }
-
- public MessageFactoryException(int errorCode, String msg)
- {
- super(msg + " [error code " + errorCode + ']');
- _errorCode = errorCode;
- }
-
- public int getErrorCode()
- {
- return _errorCode;
- }
}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java
index 399cbc9427..245008b68a 100644
--- a/java/client/example/src/main/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java
@@ -25,33 +25,8 @@ package org.apache.qpid.example.publisher;
*/
public class UndeliveredMessageException extends Exception
{
-
- private int _errorCode;
-
- public UndeliveredMessageException(String message)
- {
- super(message);
- }
-
public UndeliveredMessageException(String msg, Throwable t)
{
super(msg, t);
}
-
- public UndeliveredMessageException(int errorCode, String msg, Throwable t)
- {
- super(msg + " [error code " + errorCode + ']', t);
- _errorCode = errorCode;
- }
-
- public UndeliveredMessageException(int errorCode, String msg)
- {
- super(msg + " [error code " + errorCode + ']');
- _errorCode = errorCode;
- }
-
- public int getErrorCode()
- {
- return _errorCode;
- }
}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/shared/ConnectionException.java b/java/client/example/src/main/java/org/apache/qpid/example/shared/ConnectionException.java
index 6eb847ea9d..1a3d596a24 100644
--- a/java/client/example/src/main/java/org/apache/qpid/example/shared/ConnectionException.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/shared/ConnectionException.java
@@ -22,33 +22,8 @@ package org.apache.qpid.example.shared;
public class ConnectionException extends Exception
{
-
- private int _errorCode;
-
- public ConnectionException(String message)
- {
- super(message);
- }
-
public ConnectionException(String msg, Throwable t)
{
super(msg, t);
}
-
- public ConnectionException(int errorCode, String msg, Throwable t)
- {
- super(msg + " [error code " + errorCode + ']', t);
- _errorCode = errorCode;
- }
-
- public ConnectionException(int errorCode, String msg)
- {
- super(msg + " [error code " + errorCode + ']');
- _errorCode = errorCode;
- }
-
- public int getErrorCode()
- {
- return _errorCode;
- }
}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/shared/ContextException.java b/java/client/example/src/main/java/org/apache/qpid/example/shared/ContextException.java
index bf805ab817..2987a9559b 100644
--- a/java/client/example/src/main/java/org/apache/qpid/example/shared/ContextException.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/shared/ContextException.java
@@ -22,33 +22,8 @@ package org.apache.qpid.example.shared;
public class ContextException extends Exception
{
-
- private int _errorCode;
-
- public ContextException(String message)
- {
- super(message);
- }
-
public ContextException(String msg, Throwable t)
{
super(msg, t);
}
-
- public ContextException(int errorCode, String msg, Throwable t)
- {
- super(msg + " [error code " + errorCode + ']', t);
- _errorCode = errorCode;
- }
-
- public ContextException(int errorCode, String msg)
- {
- super(msg + " [error code " + errorCode + ']');
- _errorCode = errorCode;
- }
-
- public int getErrorCode()
- {
- return _errorCode;
- }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java b/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java
index b6fbb6c6bf..6bae0166d1 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java
@@ -35,8 +35,8 @@ import org.apache.qpid.protocol.AMQConstant;
*/
public class AMQAuthenticationException extends AMQException
{
- public AMQAuthenticationException(AMQConstant error, String msg)
+ public AMQAuthenticationException(AMQConstant error, String msg, Throwable cause)
{
- super(error, msg);
+ super(error, msg, cause);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 347f5728e2..2c92cfb85e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -65,6 +65,7 @@ import org.apache.qpid.jms.Connection;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.jms.FailoverPolicy;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.url.URLSyntaxException;
public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
@@ -96,8 +97,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private AMQProtocolHandler _protocolHandler;
/** Maps from session id (Integer) to AMQSession instance */
- private final Map<Integer,AMQSession> _sessions = new LinkedHashMap<Integer,AMQSession>();
-
+ private final Map<Integer, AMQSession> _sessions = new LinkedHashMap<Integer, AMQSession>();
private String _clientName;
@@ -225,6 +225,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
this(new AMQConnectionURL(connection), sslConfig);
}
+ /**
+ * @todo Some horrible stuff going on here with setting exceptions to be non-null to detect if an exception
+ * was thrown during the connection! Intention not clear. Use a flag anyway, not exceptions... Will fix soon.
+ */
public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException
{
if (_logger.isInfoEnabled())
@@ -321,16 +325,20 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
message = "Unable to Connect";
}
- AMQException e = new AMQConnectionFailureException(message);
+ AMQException e = new AMQConnectionFailureException(message, null);
if (lastException != null)
{
if (lastException instanceof UnresolvedAddressException)
{
- e = new AMQUnresolvedAddressException(message, _failoverPolicy.getCurrentBrokerDetails().toString());
+ e = new AMQUnresolvedAddressException(message, _failoverPolicy.getCurrentBrokerDetails().toString(),
+ null);
}
- e.initCause(lastException);
+ if (e.getCause() != null)
+ {
+ e.initCause(lastException);
+ }
}
throw e;
@@ -509,7 +517,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
AMQSession session =
new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh,
prefetchLow);
- //_protocolHandler.addSessionByChannel(channelId, session);
+ // _protocolHandler.addSessionByChannel(channelId, session);
registerSession(channelId, session);
boolean success = false;
@@ -590,7 +598,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
catch (AMQException e)
{
deregisterSession(channelId);
- throw new AMQException("Error reopening channel " + channelId + " after failover: " + e, e);
+ throw new AMQException(null, "Error reopening channel " + channelId + " after failover: " + e, e);
}
}
@@ -1047,7 +1055,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
*/
public void exceptionReceived(Throwable cause)
{
-
if (_logger.isDebugEnabled())
{
_logger.debug("exceptionReceived done by:" + Thread.currentThread().getName(), cause);
@@ -1060,11 +1067,18 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
else
{
+ AMQConstant code = null;
+
if (cause instanceof AMQException)
{
+ code = ((AMQException) cause).getErrorCode();
+ }
+
+ if (code != null)
+ {
je =
- new JMSException(Integer.toString(((AMQException) cause).getErrorCode().getCode()),
- "Exception thrown against " + toString() + ": " + cause);
+ new JMSException(Integer.toString(code.getCode()), "Exception thrown against " + toString() + ": "
+ + cause);
}
else
{
@@ -1135,7 +1149,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
for (Iterator it = sessions.iterator(); it.hasNext();)
{
AMQSession s = (AMQSession) it.next();
- //_protocolHandler.addSessionByChannel(s.getChannelId(), s);
+ // _protocolHandler.addSessionByChannel(s.getChannelId(), s);
reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted());
s.resubscribe();
}
@@ -1223,7 +1237,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_taskPool.execute(task);
}
-
public AMQSession getSession(int channelId)
{
return _sessions.get(channelId);
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQNoConsumersException.java b/java/client/src/main/java/org/apache/qpid/client/AMQNoConsumersException.java
index 54d5a0426f..08867b5de7 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQNoConsumersException.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQNoConsumersException.java
@@ -33,8 +33,8 @@ import org.apache.qpid.protocol.AMQConstant;
*/
public class AMQNoConsumersException extends AMQUndeliveredException
{
- public AMQNoConsumersException(String msg, Object bounced)
+ public AMQNoConsumersException(String msg, Object bounced, Throwable cause)
{
- super(AMQConstant.NO_CONSUMERS, msg, bounced);
+ super(AMQConstant.NO_CONSUMERS, msg, bounced, cause);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQNoRouteException.java b/java/client/src/main/java/org/apache/qpid/client/AMQNoRouteException.java
index a314101acf..42ed9c3df7 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQNoRouteException.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQNoRouteException.java
@@ -33,8 +33,8 @@ import org.apache.qpid.protocol.AMQConstant;
*/
public class AMQNoRouteException extends AMQUndeliveredException
{
- public AMQNoRouteException(String msg, Object bounced)
+ public AMQNoRouteException(String msg, Object bounced, Throwable cause)
{
- super(AMQConstant.NO_ROUTE, msg, bounced);
+ super(AMQConstant.NO_ROUTE, msg, bounced, cause);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index b7615c5b7b..8796a225ba 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -781,7 +781,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
else
{
- amqe = new AMQException("Closing session forcibly", e);
+ amqe = new AMQException(null, "Closing session forcibly", e);
}
_connection.deregisterSession(_channelId);
closeProducersAndConsumers(amqe);
@@ -1928,15 +1928,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
//@TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
if (errorCode == AMQConstant.NO_CONSUMERS)
{
- _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage));
+ _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null));
}
else if (errorCode == AMQConstant.NO_ROUTE)
{
- _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage));
+ _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null));
}
else
{
- _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
+ _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage, null));
}
}
@@ -2118,7 +2118,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
catch (JMSException e) //thrown by getMessageSelector
{
- throw new AMQException(e.getMessage(), e);
+ throw new AMQException(null, e.getMessage(), e);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
index 844ecbe743..00eac7f2af 100644
--- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
@@ -93,11 +93,11 @@ public class FailoverHandler implements Runnable
_amqProtocolHandler.setStateManager(existingStateManager);
if (_host != null)
{
- _amqProtocolHandler.getConnection().exceptionReceived(new AMQDisconnectedException("Redirect was vetoed by client"));
+ _amqProtocolHandler.getConnection().exceptionReceived(new AMQDisconnectedException("Redirect was vetoed by client", null));
}
else
{
- _amqProtocolHandler.getConnection().exceptionReceived(new AMQDisconnectedException("Failover was vetoed by client"));
+ _amqProtocolHandler.getConnection().exceptionReceived(new AMQDisconnectedException("Failover was vetoed by client", null));
}
_amqProtocolHandler.getFailoverLatch().countDown();
_amqProtocolHandler.setFailoverLatch(null);
@@ -124,7 +124,7 @@ public class FailoverHandler implements Runnable
_amqProtocolHandler.setStateManager(existingStateManager);
_amqProtocolHandler.getConnection().exceptionReceived(
new AMQDisconnectedException("Server closed connection and no failover " +
- "was successful"));
+ "was successful", null));
}
else
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
index f62baf2c3a..fbf4d96647 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
@@ -70,27 +70,27 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener
}
if (errorCode == AMQConstant.NO_CONSUMERS)
{
- throw new AMQNoConsumersException("Error: " + reason, null);
+ throw new AMQNoConsumersException("Error: " + reason, null, null);
}
else if (errorCode == AMQConstant.NO_ROUTE)
{
- throw new AMQNoRouteException("Error: " + reason, null);
+ throw new AMQNoRouteException("Error: " + reason, null, null);
}
else if (errorCode == AMQConstant.INVALID_ARGUMENT)
{
_logger.debug("Broker responded with Invalid Argument.");
- throw new org.apache.qpid.AMQInvalidArgumentException(String.valueOf(reason));
+ throw new org.apache.qpid.AMQInvalidArgumentException(String.valueOf(reason), null);
}
else if (errorCode == AMQConstant.INVALID_ROUTING_KEY)
{
_logger.debug("Broker responded with Invalid Routing Key.");
- throw new AMQInvalidRoutingKeyException(String.valueOf(reason));
+ throw new AMQInvalidRoutingKeyException(String.valueOf(reason), null);
}
else
{
- throw new AMQChannelClosedException(errorCode, "Error: " + reason);
+ throw new AMQChannelClosedException(errorCode, "Error: " + reason, null);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
index 9c8e9188ec..d8153f9c97 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
@@ -77,14 +77,14 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener
//todo this is a bit of a fudge (could be conssidered such as each new connection needs a new state manager or at least a fresh state.
stateManager.changeState(AMQState.CONNECTION_NOT_STARTED);
- throw new AMQAuthenticationException(errorCode, reason == null ? null : reason.toString());
+ throw new AMQAuthenticationException(errorCode, reason == null ? null : reason.toString(), null);
}
else
{
_logger.info("Connection close received with error code " + errorCode);
- throw new AMQConnectionClosedException(errorCode, "Error: " + reason);
+ throw new AMQConnectionClosedException(errorCode, "Error: " + reason, null);
}
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java
index ab6acffeaf..b7776705fe 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java
@@ -46,7 +46,7 @@ public class ConnectionSecureMethodHandler implements StateAwareMethodListener
SaslClient client = protocolSession.getSaslClient();
if (client == null)
{
- throw new AMQException("No SASL client set up - cannot proceed with authentication");
+ throw new AMQException(null, "No SASL client set up - cannot proceed with authentication", null);
}
ConnectionSecureBody body = (ConnectionSecureBody) evt.getMethod();
@@ -65,7 +65,7 @@ public class ConnectionSecureMethodHandler implements StateAwareMethodListener
}
catch (SaslException e)
{
- throw new AMQException("Error processing SASL challenge: " + e, e);
+ throw new AMQException(null, "Error processing SASL challenge: " + e, e);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
index 28c0c4f3c9..157128aebc 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
@@ -92,7 +92,7 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener
if (body.mechanisms == null)
{
- throw new AMQException("mechanism not specified in ConnectionStart method frame");
+ throw new AMQException(null, "mechanism not specified in ConnectionStart method frame", null);
}
else
{
@@ -102,7 +102,7 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener
if (mechanism == null)
{
- throw new AMQException("No supported security mechanism found, passed: " + new String(body.mechanisms));
+ throw new AMQException(null, "No supported security mechanism found, passed: " + new String(body.mechanisms), null);
}
byte[] saslResponse;
@@ -113,10 +113,9 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener
createCallbackHandler(mechanism, protocolSession));
if (sc == null)
{
- throw new AMQException(
- "Client SASL configuration error: no SaslClient could be created for mechanism " + mechanism
+ throw new AMQException(null, "Client SASL configuration error: no SaslClient could be created for mechanism " + mechanism
+ ". Please ensure all factories are registered. See DynamicSaslRegistrar for "
- + " details of how to register non-standard SASL client providers.");
+ + " details of how to register non-standard SASL client providers.", null);
}
protocolSession.setSaslClient(sc);
@@ -125,12 +124,12 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener
catch (SaslException e)
{
protocolSession.setSaslClient(null);
- throw new AMQException("Unable to create SASL client: " + e, e);
+ throw new AMQException(null, "Unable to create SASL client: " + e, e);
}
if (body.locales == null)
{
- throw new AMQException("Locales is not defined in Connection Start method");
+ throw new AMQException(null, "Locales is not defined in Connection Start method", null);
}
final String locales = new String(body.locales, "utf8");
@@ -142,7 +141,7 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener
}
else
{
- throw new AMQException("No locales sent from server, passed: " + locales);
+ throw new AMQException(null, "No locales sent from server, passed: " + locales, null);
}
stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
@@ -170,7 +169,7 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener
}
catch (UnsupportedEncodingException e)
{
- throw new AMQException("Unable to decode data: " + e, e);
+ throw new AMQException(null, "Unable to decode data: " + e, e);
}
}
else
@@ -235,7 +234,7 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener
}
catch (Exception e)
{
- throw new AMQException("Unable to create callback handler: " + e, e);
+ throw new AMQException(null, "Unable to create callback handler: " + e, e);
}
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
index 63b16ebbd6..ae64ac987e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
@@ -67,7 +67,7 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm
}
catch (JMSException je)
{
- throw new AMQException("Error populating MapMessage from ByteBuffer", je);
+ throw new AMQException(null, "Error populating MapMessage from ByteBuffer", je);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
index c2015f9e7c..02a8544b52 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
@@ -99,7 +99,7 @@ public class MessageFactoryRegistry
MessageFactory mf = _mimeShortStringToFactoryMap.get(contentTypeShortString);
if (mf == null)
{
- throw new AMQException("Unsupport MIME type of " + properties.getContentTypeAsString());
+ throw new AMQException(null, "Unsupport MIME type of " + properties.getContentTypeAsString(), null);
}
else
{
@@ -117,7 +117,7 @@ public class MessageFactoryRegistry
MessageFactory mf = _mimeStringToFactoryMap.get(mimeType);
if (mf == null)
{
- throw new AMQException("Unsupport MIME type of " + mimeType);
+ throw new AMQException(null, "Unsupport MIME type of " + mimeType, null);
}
else
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnexpectedBodyReceivedException.java b/java/client/src/main/java/org/apache/qpid/client/message/UnexpectedBodyReceivedException.java
deleted file mode 100644
index 1f61a661d4..0000000000
--- a/java/client/src/main/java/org/apache/qpid/client/message/UnexpectedBodyReceivedException.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.message;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQConstant;
-
-/**
- * @todo Not used! Delete!
- */
-public class UnexpectedBodyReceivedException extends AMQException
-{
- public UnexpectedBodyReceivedException(String msg, Throwable t)
- {
- super(msg, t);
- }
-
- public UnexpectedBodyReceivedException(String msg)
- {
- super(msg);
- }
-
- public UnexpectedBodyReceivedException(AMQConstant errorCode, String msg)
- {
- super(errorCode, msg);
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index addef94215..5687ad2658 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -195,7 +195,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
_logger.info("sessionClose() not allowed to failover");
_connection.exceptionReceived(
new AMQDisconnectedException("Server closed connection and reconnection " +
- "not permitted."));
+ "not permitted.", null));
}
else
{
@@ -263,7 +263,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
_logger.error("Exception caught by protocol handler: " + cause, cause);
// we notify the state manager of the error in case we have any clients waiting on a state
// change. Those "waiters" will be interrupted and can handle the exception
- AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
+ AMQException amqe = new AMQException(null, "Protocol handler error: " + cause, cause);
propagateExceptionToWaiters(amqe);
_connection.exceptionReceived(cause);
}
@@ -334,7 +334,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
if (!wasAnyoneInterested)
{
- throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + _frameListeners);
+ throw new AMQException(null, "AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + _frameListeners, null);
}
}
catch (AMQException e)
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 386aae4ad1..f691637cdc 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -238,13 +238,12 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId);
if (msg == null)
{
- throw new AMQException("Error: received content header without having received a BasicDeliver frame first");
+ throw new AMQException(null, "Error: received content header without having received a BasicDeliver frame first", null);
}
if (msg.getContentHeader() != null)
{
- throw new AMQException(
- "Error: received duplicate content header or did not receive correct number of content body frames");
+ throw new AMQException(null, "Error: received duplicate content header or did not receive correct number of content body frames", null);
}
msg.setContentHeader(contentHeader);
@@ -259,13 +258,13 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId);
if (msg == null)
{
- throw new AMQException("Error: received content body without having received a JMSDeliver frame first");
+ throw new AMQException(null, "Error: received content body without having received a JMSDeliver frame first", null);
}
if (msg.getContentHeader() == null)
{
_channelId2UnprocessedMsgMap.remove(channelId);
- throw new AMQException("Error: received content body without having received a ContentHeader frame first");
+ throw new AMQException(null, "Error: received content body without having received a ContentHeader frame first", null);
}
/*try
@@ -365,11 +364,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
final AMQSession session = getSession(channelId);
try
{
- session.closed(new AMQException(code, text));
+ session.closed(new AMQException(code, text, null));
}
catch (JMSException e)
{
- throw new AMQException("JMSException received while closing session", e);
+ throw new AMQException(null, "JMSException received while closing session", e);
}
return true;
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
index 85f98eab69..4691d48f29 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
@@ -109,7 +109,7 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener
_lock.wait(timeout);
if (!_ready)
{
- _error = new AMQTimeoutException("Server did not respond in a timely fashion");
+ _error = new AMQTimeoutException("Server did not respond in a timely fashion", null);
_ready = true;
}
}
@@ -138,7 +138,7 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener
}
else
{
- throw new AMQException("Woken up due to " + _error.getClass(), _error);
+ throw new AMQException(null, "Woken up due to " + _error.getClass(), _error);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
index 0f43115841..c995bf40da 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
@@ -248,7 +248,7 @@ public class AMQStateManager implements AMQMethodListener
if (_currentState != s)
{
_logger.warn("State not achieved within permitted time. Current state " + _currentState + ", desired state: " + s);
- throw new AMQException("State not achieved within permitted time. Current state " + _currentState + ", desired state: " + s);
+ throw new AMQException(null, "State not achieved within permitted time. Current state " + _currentState + ", desired state: " + s, null);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/IllegalStateTransitionException.java b/java/client/src/main/java/org/apache/qpid/client/state/IllegalStateTransitionException.java
deleted file mode 100644
index 41fa1ba704..0000000000
--- a/java/client/src/main/java/org/apache/qpid/client/state/IllegalStateTransitionException.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.state;
-
-import org.apache.qpid.AMQException;
-
-/**
- * @todo Not an AMQP exception as no status code.
- *
- * @todo Not used! Delete.
- */
-public class IllegalStateTransitionException extends AMQException
-{
- private AMQState _originalState;
-
- private Class _frame;
-
- public IllegalStateTransitionException(AMQState originalState, Class frame)
- {
- super("No valid state transition defined for receiving frame " + frame +
- " from state " + originalState);
- _originalState = originalState;
- _frame = frame;
- }
-
- public AMQState getOriginalState()
- {
- return _originalState;
- }
-
- public Class getFrameClass()
- {
- return _frame;
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java b/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
index 8a0b5e7d84..1fd657c5fb 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
@@ -76,7 +76,7 @@ public class StateWaiter implements StateListener
}
else
{
- throw new AMQException("Error: " + _throwable, _throwable); // FIXME: this will wrap FailoverException in throwable which will prevent it being caught.
+ throw new AMQException(null, "Error: " + _throwable, _throwable); // FIXME: this will wrap FailoverException in throwable which will prevent it being caught.
}
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java b/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java
index da16baaad9..6e47e2ce28 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java
@@ -31,19 +31,16 @@ import org.apache.qpid.jms.BrokerDetails;
* <tr><th> Responsibilities <th> Collaborations
* <tr><td> Represent absence of a transport medium.
* </table>
+ *
+ * @todo Error code never used. This is not an AMQException.
*/
public class AMQNoTransportForProtocolException extends AMQTransportConnectionException
{
BrokerDetails _details;
- public AMQNoTransportForProtocolException(BrokerDetails details)
- {
- this(details, "No Transport exists for specified broker protocol");
- }
-
- public AMQNoTransportForProtocolException(BrokerDetails details, String message)
+ public AMQNoTransportForProtocolException(BrokerDetails details, String message, Throwable cause)
{
- super(null, message, null);
+ super(null, message, cause);
_details = details;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java b/java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java
index 24b4e03b39..6bef6216bd 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java
@@ -31,6 +31,8 @@ import org.apache.qpid.protocol.AMQConstant;
* <tr><th> Responsibilities <th> Collaborations
* <tr><td> Represent failure to connect through the transport medium.
* </table>
+ *
+ * @todo Error code never used. This is not an AMQException.
*/
public class AMQTransportConnectionException extends AMQException
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
index 0bc83e9804..b9193ce14e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
@@ -67,7 +67,7 @@ public class TransportConnection
if (transport == -1)
{
- throw new AMQNoTransportForProtocolException(details);
+ throw new AMQNoTransportForProtocolException(details, null, null);
}
if (transport == _currentInstance)
diff --git a/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java b/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java
index 1791e7ede3..1818132be0 100644
--- a/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java
+++ b/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java
@@ -30,21 +30,13 @@ import org.apache.qpid.protocol.AMQConstant;
* <tr><th> Responsibilities <th> Collaborations
* <tr><td> Represent failure to create an in VM broker.
* </table>
+ *
+ * @todo Error code never used. This is not an AMQException.
*/
public class AMQVMBrokerCreationException extends AMQTransportConnectionException
{
private int _port;
- /**
- * @param port
- *
- * @deprecated
- */
- public AMQVMBrokerCreationException(int port)
- {
- this(null, port, "Unable to create vm broker", null);
- }
-
public AMQVMBrokerCreationException(AMQConstant errorCode, int port, String message, Throwable cause)
{
super(errorCode, message, cause);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java
index 5e45d1d537..4a114321aa 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java
@@ -67,27 +67,27 @@ public class ChannelCloseMethodHandlerNoCloseOk implements StateAwareMethodListe
_logger.error("Channel close received with errorCode " + errorCode + ", and reason " + reason);
if (errorCode == AMQConstant.NO_CONSUMERS)
{
- throw new AMQNoConsumersException("Error: " + reason, null);
+ throw new AMQNoConsumersException("Error: " + reason, null, null);
}
else if (errorCode == AMQConstant.NO_ROUTE)
{
- throw new AMQNoRouteException("Error: " + reason, null);
+ throw new AMQNoRouteException("Error: " + reason, null, null);
}
else if (errorCode == AMQConstant.INVALID_ARGUMENT)
{
_logger.debug("Broker responded with Invalid Argument.");
- throw new AMQInvalidArgumentException(String.valueOf(reason));
+ throw new AMQInvalidArgumentException(String.valueOf(reason), null);
}
else if (errorCode == AMQConstant.INVALID_ROUTING_KEY)
{
_logger.debug("Broker responded with Invalid Routing Key.");
- throw new AMQInvalidRoutingKeyException(String.valueOf(reason));
+ throw new AMQInvalidRoutingKeyException(String.valueOf(reason), null);
}
else
{
- throw new AMQChannelClosedException(errorCode, "Error: " + reason);
+ throw new AMQChannelClosedException(errorCode, "Error: " + reason, null);
}
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQConnectionWaitException.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQConnectionWaitException.java
index 2baaa344ef..a8a505294e 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQConnectionWaitException.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQConnectionWaitException.java
@@ -34,9 +34,8 @@ import org.apache.qpid.AMQException;
*/
public class AMQConnectionWaitException extends AMQException
{
- public AMQConnectionWaitException(String s, Throwable e)
+ public AMQConnectionWaitException(String s, Throwable cause)
{
- super(s, e);
-
+ super(null, s, cause);
}
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedBodyTypeException.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedBodyTypeException.java
index 951bd22df0..00f3ddc395 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedBodyTypeException.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedBodyTypeException.java
@@ -39,8 +39,9 @@ import org.apache.qpid.framing.AMQBody;
*/
public class AMQUnexpectedBodyTypeException extends AMQException
{
- public AMQUnexpectedBodyTypeException(Class<? extends AMQBody> expectedClass, AMQBody body)
+ public AMQUnexpectedBodyTypeException(Class<? extends AMQBody> expectedClass, AMQBody body, Throwable cause)
{
- super("Unexpected body type. Expected: " + expectedClass.getName() + "; got: " + body.getClass().getName());
+ super(null, "Unexpected body type. Expected: " + expectedClass.getName() + "; got: " + body.getClass().getName(),
+ cause);
}
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedFrameTypeException.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedFrameTypeException.java
index 4dd318f90d..11096ccf7e 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedFrameTypeException.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedFrameTypeException.java
@@ -38,8 +38,8 @@ import org.apache.qpid.AMQException;
*/
public class AMQUnexpectedFrameTypeException extends AMQException
{
- public AMQUnexpectedFrameTypeException(String s)
+ public AMQUnexpectedFrameTypeException(String s, Throwable cause)
{
- super(s);
+ super(null, s, cause);
}
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
index ee5aa48db9..480a6f3603 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
@@ -107,7 +107,7 @@ public class ClusteredProtocolHandler extends AMQPFastProtocolHandler implements
buffer(session, msg);
break;
default:
- throw new AMQException("Received message while in state: " + state);
+ throw new AMQException(null, "Received message while in state: " + state, null);
}
JoinState latest = _groupMgr.getState();
if (!latest.equals(state))
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
index 2f473b63fb..a9a7a55128 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
@@ -176,7 +176,7 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
}
catch (Exception e)
{
- throw new AMQException("Could not connect to leader: " + e, e);
+ throw new AMQException(null, "Could not connect to leader: " + e, e);
}
}
@@ -259,7 +259,7 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
catch (Exception e)
{
e.printStackTrace();
- throw new AMQException("Could not connect to prospect: " + e, e);
+ throw new AMQException(null, "Could not connect to prospect: " + e, e);
}
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
index b01ec491ec..6529c7f3e2 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
@@ -207,7 +207,7 @@ public class MinaBrokerProxy extends Broker implements MethodHandler
}
else
{
- throw new AMQUnexpectedBodyTypeException(AMQMethodBody.class, body);
+ throw new AMQUnexpectedBodyTypeException(AMQMethodBody.class, body, null);
}
}
@@ -260,7 +260,7 @@ public class MinaBrokerProxy extends Broker implements MethodHandler
}
else
{
- throw new AMQUnexpectedFrameTypeException("Received message of unrecognised type: " + object);
+ throw new AMQUnexpectedFrameTypeException("Received message of unrecognised type: " + object, null);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/AMQChannelClosedException.java b/java/common/src/main/java/org/apache/qpid/AMQChannelClosedException.java
index 251e91c1b9..1b2eabdc86 100644
--- a/java/common/src/main/java/org/apache/qpid/AMQChannelClosedException.java
+++ b/java/common/src/main/java/org/apache/qpid/AMQChannelClosedException.java
@@ -34,8 +34,8 @@ import org.apache.qpid.protocol.AMQConstant;
*/
public class AMQChannelClosedException extends AMQException
{
- public AMQChannelClosedException(AMQConstant errorCode, String msg)
+ public AMQChannelClosedException(AMQConstant errorCode, String msg, Throwable cause)
{
- super(errorCode, msg);
+ super(errorCode, msg, cause);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java
index 9efd271e4d..9d8672f433 100644
--- a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java
+++ b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -39,22 +39,17 @@ public class AMQChannelException extends AMQException
{
private final int _classId;
private final int _methodId;
- /* AMQP version for which exception ocurred */
+
+ /** AMQP version for which exception ocurred, major code. */
private final byte major;
- private final byte minor;
- public AMQChannelException(AMQConstant errorCode, String msg, int classId, int methodId, byte major, byte minor, Throwable t)
- {
- super(errorCode, msg, t);
- _classId = classId;
- _methodId = methodId;
- this.major = major;
- this.minor = minor;
- }
+ /** AMQP version for which exception ocurred, minor code. */
+ private final byte minor;
- public AMQChannelException(AMQConstant errorCode, String msg, int classId, int methodId, byte major, byte minor)
+ public AMQChannelException(AMQConstant errorCode, String msg, int classId, int methodId, byte major, byte minor,
+ Throwable cause)
{
- super(errorCode, msg);
+ super(errorCode, msg, cause);
_classId = classId;
_methodId = methodId;
this.major = major;
@@ -63,6 +58,7 @@ public class AMQChannelException extends AMQException
public AMQFrame getCloseFrame(int channel)
{
- return ChannelCloseBody.createAMQFrame(channel, major, minor, _classId, _methodId, getErrorCode().getCode(), new AMQShortString(getMessage()));
+ return ChannelCloseBody.createAMQFrame(channel, major, minor, _classId, _methodId, getErrorCode().getCode(),
+ new AMQShortString(getMessage()));
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java
index 931c6cd87a..e95e805e9f 100644
--- a/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java
+++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java
@@ -34,8 +34,8 @@ import org.apache.qpid.protocol.AMQConstant;
*/
public class AMQConnectionClosedException extends AMQException
{
- public AMQConnectionClosedException(AMQConstant errorCode, String msg)
+ public AMQConnectionClosedException(AMQConstant errorCode, String msg, Throwable cause)
{
- super(errorCode, msg);
+ super(errorCode, msg, cause);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java
index 7edfa648ed..ba9f69a05c 100644
--- a/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java
+++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java
@@ -40,24 +40,19 @@ public class AMQConnectionException extends AMQException
{
private final int _classId;
private final int _methodId;
- /* AMQP version for which exception ocurred */
+
+ /** AMQP version for which exception ocurred, major code. */
private final byte major;
+
+ /** AMQP version for which exception ocurred, minor code. */
private final byte minor;
+
boolean _closeConnetion;
public AMQConnectionException(AMQConstant errorCode, String msg, int classId, int methodId, byte major, byte minor,
- Throwable t)
+ Throwable cause)
{
- super(errorCode, msg, t);
- _classId = classId;
- _methodId = methodId;
- this.major = major;
- this.minor = minor;
- }
-
- public AMQConnectionException(AMQConstant errorCode, String msg, int classId, int methodId, byte major, byte minor)
- {
- super(errorCode, msg);
+ super(errorCode, msg, cause);
_classId = classId;
_methodId = methodId;
this.major = major;
@@ -69,5 +64,4 @@ public class AMQConnectionException extends AMQException
return ConnectionCloseBody.createAMQFrame(channel, major, minor, _classId, _methodId, getErrorCode().getCode(),
new AMQShortString(getMessage()));
}
-
}
diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java
index 72fa2ae984..c043a00836 100644
--- a/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java
+++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java
@@ -12,8 +12,8 @@ package org.apache.qpid;
*/
public class AMQConnectionFailureException extends AMQException
{
- public AMQConnectionFailureException(String message)
+ public AMQConnectionFailureException(String message, Throwable cause)
{
- super(message);
+ super(null, message, cause);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/AMQDisconnectedException.java b/java/common/src/main/java/org/apache/qpid/AMQDisconnectedException.java
index e62b2c10a2..5ec5c42ab9 100644
--- a/java/common/src/main/java/org/apache/qpid/AMQDisconnectedException.java
+++ b/java/common/src/main/java/org/apache/qpid/AMQDisconnectedException.java
@@ -32,8 +32,8 @@ package org.apache.qpid;
*/
public class AMQDisconnectedException extends AMQException
{
- public AMQDisconnectedException(String msg)
+ public AMQDisconnectedException(String msg, Throwable cause)
{
- super(msg);
+ super(null, msg, cause);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/AMQException.java b/java/common/src/main/java/org/apache/qpid/AMQException.java
index 41599ed880..6cbb98fd86 100644
--- a/java/common/src/main/java/org/apache/qpid/AMQException.java
+++ b/java/common/src/main/java/org/apache/qpid/AMQException.java
@@ -44,48 +44,11 @@ public class AMQException extends Exception
*
* @param errorCode The error code. May be null if not to be set.
* @param msg The exception message. May be null if not to be set.
- * @param t The underlying cause of the exception. May be null if not to be set.
+ * @param cause The underlying cause of the exception. May be null if not to be set.
*/
- public AMQException(AMQConstant errorCode, String msg, Throwable t)
+ public AMQException(AMQConstant errorCode, String msg, Throwable cause)
{
- super(((msg == null) ? "" : msg) + ((errorCode == null) ? "" : (" [error code " + errorCode + "]")), t);
- _errorCode = errorCode;
- }
-
- /**
- * @param message
- *
- * @deprecated Use {@link #AMQException(org.apache.qpid.protocol.AMQConstant, String, Throwable)} instead.
- */
- public AMQException(String message)
- {
- super(message);
- // fixme This method needs removed and all AMQExceptions need a valid error code
- _errorCode = AMQConstant.getConstant(-1);
- }
-
- /**
- * @param msg
- * @param t
- *
- * @deprecated Use {@link #AMQException(org.apache.qpid.protocol.AMQConstant, String, Throwable)} instead.
- */
- public AMQException(String msg, Throwable t)
- {
- super(msg, t);
- // fixme This method needs removed and all AMQExceptions need a valid error code
- _errorCode = AMQConstant.getConstant(-1);
- }
-
- /**
- * @param errorCode
- * @param msg
- *
- * @deprecated Use {@link #AMQException(org.apache.qpid.protocol.AMQConstant, String, Throwable)} instead.
- */
- public AMQException(AMQConstant errorCode, String msg)
- {
- super(msg + " [error code " + errorCode + ']');
+ super(((msg == null) ? "" : msg) + ((errorCode == null) ? "" : (" [error code " + errorCode + "]")), cause);
_errorCode = errorCode;
}
diff --git a/java/common/src/main/java/org/apache/qpid/AMQInvalidArgumentException.java b/java/common/src/main/java/org/apache/qpid/AMQInvalidArgumentException.java
index 278128f924..15c8bea0a4 100644
--- a/java/common/src/main/java/org/apache/qpid/AMQInvalidArgumentException.java
+++ b/java/common/src/main/java/org/apache/qpid/AMQInvalidArgumentException.java
@@ -32,8 +32,8 @@ import org.apache.qpid.protocol.AMQConstant;
*/
public class AMQInvalidArgumentException extends AMQException
{
- public AMQInvalidArgumentException(String message)
+ public AMQInvalidArgumentException(String message, Throwable cause)
{
- super(AMQConstant.INVALID_ARGUMENT, message);
+ super(AMQConstant.INVALID_ARGUMENT, message, cause);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/AMQInvalidRoutingKeyException.java b/java/common/src/main/java/org/apache/qpid/AMQInvalidRoutingKeyException.java
index b5ec9845d6..c117968a29 100644
--- a/java/common/src/main/java/org/apache/qpid/AMQInvalidRoutingKeyException.java
+++ b/java/common/src/main/java/org/apache/qpid/AMQInvalidRoutingKeyException.java
@@ -32,8 +32,8 @@ import org.apache.qpid.protocol.AMQConstant;
*/
public class AMQInvalidRoutingKeyException extends AMQException
{
- public AMQInvalidRoutingKeyException(String message)
+ public AMQInvalidRoutingKeyException(String message, Throwable cause)
{
- super(AMQConstant.INVALID_ROUTING_KEY, message);
+ super(AMQConstant.INVALID_ROUTING_KEY, message, cause);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java b/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java
index 0f8d9c47db..4ae8282af5 100644
--- a/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java
+++ b/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java
@@ -32,8 +32,8 @@ import org.apache.qpid.protocol.AMQConstant;
*/
public class AMQTimeoutException extends AMQException
{
- public AMQTimeoutException(String message)
+ public AMQTimeoutException(String message, Throwable cause)
{
- super(AMQConstant.REQUEST_TIMEOUT, message);
+ super(AMQConstant.REQUEST_TIMEOUT, message, cause);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/AMQUndeliveredException.java b/java/common/src/main/java/org/apache/qpid/AMQUndeliveredException.java
index 03220cc95e..1502c0efc5 100644
--- a/java/common/src/main/java/org/apache/qpid/AMQUndeliveredException.java
+++ b/java/common/src/main/java/org/apache/qpid/AMQUndeliveredException.java
@@ -34,9 +34,9 @@ public class AMQUndeliveredException extends AMQException
{
private Object _bounced;
- public AMQUndeliveredException(AMQConstant errorCode, String msg, Object bounced)
+ public AMQUndeliveredException(AMQConstant errorCode, String msg, Object bounced, Throwable cause)
{
- super(errorCode, msg);
+ super(errorCode, msg, cause);
_bounced = bounced;
}
diff --git a/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java b/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java
index c4aa992a01..f483b9947b 100644
--- a/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java
+++ b/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java
@@ -15,8 +15,8 @@ package org.apache.qpid;
*/
public class AMQUnknownExchangeType extends AMQException
{
- public AMQUnknownExchangeType(String message)
+ public AMQUnknownExchangeType(String message, Throwable cause)
{
- super(message);
+ super(null, message, cause);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/AMQUnresolvedAddressException.java b/java/common/src/main/java/org/apache/qpid/AMQUnresolvedAddressException.java
index 6cc9c3fe00..eee3e6afcf 100644
--- a/java/common/src/main/java/org/apache/qpid/AMQUnresolvedAddressException.java
+++ b/java/common/src/main/java/org/apache/qpid/AMQUnresolvedAddressException.java
@@ -37,9 +37,9 @@ public class AMQUnresolvedAddressException extends AMQException
{
String _broker;
- public AMQUnresolvedAddressException(String message, String broker)
+ public AMQUnresolvedAddressException(String message, String broker, Throwable cause)
{
- super(message);
+ super(null, message, cause);
_broker = broker;
}
diff --git a/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java b/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java
index 1e5cc57fff..73a336321c 100644
--- a/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java
+++ b/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java
@@ -36,25 +36,8 @@ import org.apache.qpid.protocol.AMQConstant;
*/
public class PropertyException extends AMQException
{
- public PropertyException(String message)
+ public PropertyException(String message, Throwable cause)
{
- super(message);
+ super(null, message, cause);
}
-
- /*
- public PropertyException(String msg, Throwable t)
- {
- super(msg, t);
- }
-
- public PropertyException(AMQConstant errorCode, String msg, Throwable t)
- {
- super(errorCode, msg, t);
- }
-
- public PropertyException(AMQConstant errorCode, String msg)
- {
- super(errorCode, msg);
- }
- */
}
diff --git a/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java b/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java
index b3c310d23c..6e2b25fb2c 100644
--- a/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java
+++ b/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java
@@ -80,7 +80,7 @@ public class PropertyUtils
if (replacement == null)
{
- throw new PropertyException("Property ${" + propertyName + "} has not been set");
+ throw new PropertyException("Property ${" + propertyName + "} has not been set", null);
}
fragment = replacement;
@@ -145,7 +145,7 @@ public class PropertyUtils
int endName = value.indexOf('}', pos);
if (endName < 0)
{
- throw new PropertyException("Syntax error in property: " + value);
+ throw new PropertyException("Syntax error in property: " + value, null);
}
String propertyName = value.substring(pos + 2, endName);
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java b/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java
index cd5ccf8e04..843b6a1e8c 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java
@@ -34,8 +34,8 @@ import org.apache.qpid.protocol.AMQConstant;
*/
public class AMQFrameDecodingException extends AMQException
{
- public AMQFrameDecodingException(AMQConstant errorCode, String message, Throwable t)
+ public AMQFrameDecodingException(AMQConstant errorCode, String message, Throwable cause)
{
- super(errorCode, message, t);
+ super(errorCode, message, cause);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
index 23a1ce367e..0982847aac 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
@@ -111,7 +111,7 @@ public abstract class AMQMethodBody extends AMQBody
public AMQChannelException getChannelException(AMQConstant code, String message)
{
- return new AMQChannelException(code, message, getClazz(), getMethod(), major, minor);
+ return new AMQChannelException(code, message, getClazz(), getMethod(), major, minor, null);
}
public AMQChannelException getChannelException(AMQConstant code, String message, Throwable cause)
@@ -121,7 +121,7 @@ public abstract class AMQMethodBody extends AMQBody
public AMQConnectionException getConnectionException(AMQConstant code, String message)
{
- return new AMQConnectionException(code, message, getClazz(), getMethod(), major, minor);
+ return new AMQConnectionException(code, message, getClazz(), getMethod(), major, minor, null);
}
public AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause)
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolClassException.java b/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolClassException.java
index e48fd2e7f9..ab09c1de6d 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolClassException.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolClassException.java
@@ -32,8 +32,8 @@ package org.apache.qpid.framing;
*/
public class AMQProtocolClassException extends AMQProtocolHeaderException
{
- public AMQProtocolClassException(String message)
+ public AMQProtocolClassException(String message, Throwable cause)
{
- super(message);
+ super(message, cause);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java b/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java
index 1ce49aba83..6b819364da 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java
@@ -34,8 +34,8 @@ import org.apache.qpid.AMQException;
*/
public class AMQProtocolHeaderException extends AMQException
{
- public AMQProtocolHeaderException(String message)
+ public AMQProtocolHeaderException(String message, Throwable cause)
{
- super(message);
+ super(null, message, cause);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolInstanceException.java b/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolInstanceException.java
index 9049eace2a..3165c373a9 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolInstanceException.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolInstanceException.java
@@ -32,8 +32,8 @@ package org.apache.qpid.framing;
*/
public class AMQProtocolInstanceException extends AMQProtocolHeaderException
{
- public AMQProtocolInstanceException(String message)
+ public AMQProtocolInstanceException(String message, Throwable cause)
{
- super(message);
+ super(message, cause);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolVersionException.java b/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolVersionException.java
index 9074931617..c9b0973ea6 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolVersionException.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolVersionException.java
@@ -32,8 +32,8 @@ package org.apache.qpid.framing;
*/
public class AMQProtocolVersionException extends AMQProtocolHeaderException
{
- public AMQProtocolVersionException(String message)
+ public AMQProtocolVersionException(String message, Throwable cause)
{
- super(message);
+ super(message, cause);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
index 8b40fe72eb..4c253b9973 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
@@ -144,7 +144,7 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
if(_protocolHeader.length != 4)
{
- throw new AMQProtocolHeaderException("Protocol header should have exactly four octets");
+ throw new AMQProtocolHeaderException("Protocol header should have exactly four octets", null);
}
for(int i = 0; i < 4; i++)
{
@@ -152,7 +152,7 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
{
try
{
- throw new AMQProtocolHeaderException("Protocol header is not correct: Got " + new String(_protocolHeader,"ISO-8859-1") + " should be: " + new String(AMQP_HEADER, "ISO-8859-1"));
+ throw new AMQProtocolHeaderException("Protocol header is not correct: Got " + new String(_protocolHeader,"ISO-8859-1") + " should be: " + new String(AMQP_HEADER, "ISO-8859-1"), null);
}
catch (UnsupportedEncodingException e)
{
@@ -163,12 +163,12 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
if (_protocolClass != CURRENT_PROTOCOL_CLASS)
{
throw new AMQProtocolClassException("Protocol class " + CURRENT_PROTOCOL_CLASS + " was expected; received " +
- _protocolClass);
+ _protocolClass, null);
}
if (_protocolInstance != TCP_PROTOCOL_INSTANCE)
{
throw new AMQProtocolInstanceException("Protocol instance " + TCP_PROTOCOL_INSTANCE + " was expected; received " +
- _protocolInstance);
+ _protocolInstance, null);
}
ProtocolVersion pv = new ProtocolVersion(_protocolMajor, _protocolMinor);
@@ -178,7 +178,7 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
{
// TODO: add list of available versions in list to msg...
throw new AMQProtocolVersionException("Protocol version " +
- _protocolMajor + "." + _protocolMinor + " not suppoerted by this version of the Qpid broker.");
+ _protocolMajor + "." + _protocolMinor + " not suppoerted by this version of the Qpid broker.", null);
}
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java b/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java
index 1d9e30c24e..58ea392306 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java
@@ -268,7 +268,7 @@ public class TxnBufferTest extends TestCase
{
public void prepare() throws AMQException
{
- throw new AMQException("Fail!");
+ throw new AMQException(null, "Fail!", null);
}
}