diff options
| author | Rupert Smith <rupertlssmith@apache.org> | 2007-05-21 15:11:23 +0000 |
|---|---|---|
| committer | Rupert Smith <rupertlssmith@apache.org> | 2007-05-21 15:11:23 +0000 |
| commit | 625e140b590838df60d603f42d552a9275aae2ca (patch) | |
| tree | 6b0acb350f3ead0da52b0301bfee74101d6bb7d4 /java | |
| parent | 21d2df094acb8530b2fb902b5ed9a1d7db8463fd (diff) | |
| download | qpid-python-625e140b590838df60d603f42d552a9275aae2ca.tar.gz | |
Refactored exceptions to have single constructors and made room for wrapped causes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@540165 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
95 files changed, 631 insertions, 972 deletions
diff --git a/java/broker/src/main/grammar/SelectorParser.jj b/java/broker/src/main/grammar/SelectorParser.jj index adec1b348d..61638e5e26 100644 --- a/java/broker/src/main/grammar/SelectorParser.jj +++ b/java/broker/src/main/grammar/SelectorParser.jj @@ -94,7 +94,7 @@ public class SelectorParser { return this.JmsSelector();
}
catch (Throwable e) {
- throw (AMQInvalidArgumentException)new AMQInvalidArgumentException(sql).initCause(e);
+ throw new AMQInvalidArgumentException(sql, e);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 1de4d16ad4..5de59f47a3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -212,7 +212,7 @@ public class AMQChannel { if (_currentMessage == null) { - throw new AMQException("Received content header without previously receiving a BasicPublish frame"); + throw new AMQException(null, "Received content header without previously receiving a BasicPublish frame", null); } else { @@ -239,7 +239,7 @@ public class AMQChannel { if (_currentMessage == null) { - throw new AMQException("Received content body without previously receiving a JmsPublishBody"); + throw new AMQException(null, "Received content body without previously receiving a JmsPublishBody", null); } if (_log.isTraceEnabled()) @@ -883,7 +883,7 @@ public class AMQChannel { if (!isTransactional()) { - throw new AMQException("Fatal error: commit called on non-transactional channel"); + throw new AMQException(null, "Fatal error: commit called on non-transactional channel", null); } _txnContext.commit(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/ConsumerTagNotUniqueException.java b/java/broker/src/main/java/org/apache/qpid/server/ConsumerTagNotUniqueException.java index 9a98af5689..3253650d14 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ConsumerTagNotUniqueException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ConsumerTagNotUniqueException.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,6 +20,16 @@ */ package org.apache.qpid.server; -public class ConsumerTagNotUniqueException extends Exception -{ -} +/** + * ConsumerTagNotUniqueException indicates that a client has attempted to connect with a consumer tag that is already + * used. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Represents error when clients connects with a non-unique tag. + * </table> + * + * @todo Consider replacing with an AMQNotAllowedException, as this is the status code returned when this happens. + */ +public class ConsumerTagNotUniqueException extends Exception +{ } diff --git a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java index d61bb8916a..37c5f38ea3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java @@ -41,9 +41,9 @@ public abstract class RequiredDeliveryException extends AMQException { private final AMQMessage _amqMessage; - public RequiredDeliveryException(String message, AMQMessage payload) + public RequiredDeliveryException(String message, AMQMessage payload, Throwable cause) { - super(message); + super(null, message, cause); // Increment the reference as this message is in the routing phase // and so will have the ref decremented as routing fails. diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java index 30bbdea2ef..1604d94539 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java @@ -181,8 +181,8 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap if (unacked.getKey() > deliveryTag) { //This should not occur now. - throw new AMQException("UnacknowledgedMessageMap is out of order:" + unacked.getKey() + - " When deliveryTag is:" + deliveryTag + "ES:" + _map.entrySet().toString()); + throw new AMQException(null, "UnacknowledgedMessageMap is out of order:" + unacked.getKey() + + " When deliveryTag is:" + deliveryTag + "ES:" + _map.entrySet().toString(), null); } it.remove(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java index c349b44d6d..39a5bba8b7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java @@ -55,7 +55,7 @@ public class DefaultExchangeFactory implements ExchangeFactory if (exchClass == null) { - throw new AMQUnknownExchangeType("Unknown exchange type: " + type); + throw new AMQUnknownExchangeType("Unknown exchange type: " + type, null); } try { @@ -65,11 +65,11 @@ public class DefaultExchangeFactory implements ExchangeFactory } catch (InstantiationException e) { - throw new AMQException("Unable to create exchange: " + e, e); + throw new AMQException(null, "Unable to create exchange: " + e, e); } catch (IllegalAccessException e) { - throw new AMQException("Unable to create exchange: " + e, e); + throw new AMQException(null, "Unable to create exchange: " + e, e); } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java index 9066af70d9..f3bdecc32e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java @@ -71,7 +71,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry getMessageStore().createExchange(exchange); } catch (InternalErrorException e) { - throw new AMQException("problem registering excahgne " + exchange, e); + throw new AMQException(null, "problem registering excahgne " + exchange, e); } } } @@ -99,14 +99,14 @@ public class DefaultExchangeRegistry implements ExchangeRegistry getMessageStore().removeExchange(e); } catch (InternalErrorException e1) { - throw new AMQException("Problem unregistering Exchange " + name, e1); + throw new AMQException(null, "Problem unregistering Exchange " + name, e1); } } e.close(); } else { - throw new AMQException("Unknown exchange " + name); + throw new AMQException(null, "Unknown exchange " + name, null); } } @@ -138,7 +138,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry // TODO: check where the exchange is validated if (exch == null) { - throw new AMQException("Exchange '" + exchange + "' does not exist"); + throw new AMQException(null, "Exchange '" + exchange + "' does not exist", null); } exch.route(payload); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java index ab103fbd2a..01242f90de 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java @@ -126,7 +126,7 @@ public class DestNameExchange extends AbstractExchange catch (JMException ex) { _logger.error("Exception occured in creating the direct exchange mbean", ex); - throw new AMQException("Exception occured in creating the direct exchange mbean", ex); + throw new AMQException(null, "Exception occured in creating the direct exchange mbean", ex); } } @@ -156,8 +156,8 @@ public class DestNameExchange extends AbstractExchange if (!_index.remove(routingKey, queue)) { - throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() + - " with routing key " + routingKey + ". No queue was registered with that routing key"); + throw new AMQException(null, "Queue " + queue + " was not registered with exchange " + this.getName() + + " with routing key " + routingKey + ". No queue was registered with that routing key", null); } } @@ -171,7 +171,7 @@ public class DestNameExchange extends AbstractExchange String msg = "Routing key " + routingKey + " is not known to " + this; if (info.isMandatory()) { - throw new NoRouteException(msg, payload); + throw new NoRouteException(msg, payload, null); } else { diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java index e9c5b0024c..25ec0c3a2d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java @@ -216,7 +216,7 @@ public class DestWildExchange extends AbstractExchange if (info.isMandatory()) { String msg = "Topic " + routingKey + " is not known to " + this; - throw new NoRouteException(msg, payload); + throw new NoRouteException(msg, payload, null); } else { @@ -276,15 +276,15 @@ public class DestWildExchange extends AbstractExchange List<AMQQueue> queues = _routingKey2queues.get(routingKey); if (queues == null) { - throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() + - " with routing key " + routingKey + ". No queue was registered with that routing key"); + throw new AMQException(null, "Queue " + queue + " was not registered with exchange " + this.getName() + + " with routing key " + routingKey + ". No queue was registered with that routing key", null); } boolean removedQ = queues.remove(queue); if (!removedQ) { - throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() + - " with routing key " + routingKey); + throw new AMQException(null, "Queue " + queue + " was not registered with exchange " + this.getName() + + " with routing key " + routingKey, null); } if (queues.isEmpty()) { @@ -301,7 +301,7 @@ public class DestWildExchange extends AbstractExchange catch (JMException ex) { _logger.error("Exception occured in creating the topic exchenge mbean", ex); - throw new AMQException("Exception occured in creating the topic exchenge mbean", ex); + throw new AMQException(null, "Exception occured in creating the topic exchenge mbean", ex); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInUseException.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInUseException.java index c77f114428..07550dd808 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInUseException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInUseException.java @@ -38,8 +38,8 @@ import org.apache.qpid.AMQException; */ public class ExchangeInUseException extends AMQException { - public ExchangeInUseException(String exchangeName) + public ExchangeInUseException(String exchangeName, Throwable cause) { - super("Exchange " + exchangeName + " is currently in use"); + super(null, "Exchange " + exchangeName + " is currently in use", cause); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java index b3690d3e10..28d4b19f2e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -98,7 +98,7 @@ public class FanoutExchange extends AbstractExchange catch (JMException ex)
{
_logger.error("Exception occured in creating the direct exchange mbean", ex);
- throw new AMQException("Exception occured in creating the direct exchange mbean", ex);
+ throw new AMQException(null, "Exception occured in creating the direct exchange mbean", ex);
}
}
@@ -129,8 +129,8 @@ public class FanoutExchange extends AbstractExchange if (!_queues.remove(queue))
{
- throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() +
- ". ");
+ throw new AMQException(null, "Queue " + queue + " was not registered with exchange " + this.getName() +
+ ". ", null);
}
}
@@ -143,7 +143,7 @@ public class FanoutExchange extends AbstractExchange String msg = "No queues bound to " + this;
if (publishInfo.isMandatory())
{
- throw new NoRouteException(msg, payload);
+ throw new NoRouteException(msg, payload, null);
}
else
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index b4b2bc20bc..8205924207 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -231,7 +231,7 @@ public class HeadersExchange extends AbstractExchange if (payload.getMessagePublishInfo().isMandatory()) { - throw new NoRouteException(msg, payload); + throw new NoRouteException(msg, payload, null); } else { @@ -284,7 +284,7 @@ public class HeadersExchange extends AbstractExchange catch (JMException ex) { _logger.error("Exception occured in creating the HeadersExchangeMBean", ex); - throw new AMQException("Exception occured in creating the HeadersExchangeMBean", ex); + throw new AMQException(null, "Exception occured in creating the HeadersExchangeMBean", ex); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/NoRouteException.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/NoRouteException.java index 1d6ab3842d..c787103c00 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/NoRouteException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/NoRouteException.java @@ -36,9 +36,9 @@ import org.apache.qpid.server.queue.AMQMessage; */ public class NoRouteException extends RequiredDeliveryException { - public NoRouteException(String msg, AMQMessage message) + public NoRouteException(String msg, AMQMessage message, Throwable cause) { - super(msg, message); + super(msg, message, cause); } public AMQConstant getReplyCode() diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java index 56eae279dc..9346eecbb2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java @@ -33,6 +33,8 @@ import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.ConsumerTagNotUniqueException; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.ExistingExclusiveSubscriptionException; +import org.apache.qpid.server.queue.ExistingSubscriptionPreventsExclusiveException; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -146,14 +148,14 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic AMQConstant.NOT_ALLOWED.getCode(), // replyCode msg)); // replyText } - catch (AMQQueue.ExistingExclusiveSubscription e) + catch (ExistingExclusiveSubscriptionException e) { throw body.getChannelException(AMQConstant.ACCESS_REFUSED, "Cannot subscribe to queue " + queue.getName() + " as it already has an existing exclusive consumer"); } - catch (AMQQueue.ExistingSubscriptionPreventsExclusive e) + catch (ExistingSubscriptionPreventsExclusiveException e) { throw body.getChannelException(AMQConstant.ACCESS_REFUSED, "Cannot subscribe to queue " diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java index fef00942a0..43986adea7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java @@ -69,7 +69,7 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener SaslServer ss = session.getSaslServer(); if (ss == null) { - throw new AMQException("No SASL context set up in session"); + throw new AMQException(null, "No SASL context set up in session", null); } AuthenticationResult authResult = authMgr.authenticate(ss, body.response); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java index 29d6c26b66..5dbd1b18de 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java @@ -138,7 +138,7 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< catch (SaslException e) { disposeSaslServer(session); - throw new AMQException("SASL error: " + e, e); + throw new AMQException(null, "SASL error: " + e, e); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java index 2b123bcb2d..f24c96f87f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java @@ -79,7 +79,7 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo AMQShortString routingKey = body.routingKey; if (exchangeName == null) { - throw new AMQException("Exchange exchange must not be null"); + throw new AMQException(null, "Exchange exchange must not be null", null); } Exchange exchange = virtualHost.getExchangeRegistry().getExchange(exchangeName); AMQFrame response; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java index be3ffcc698..855d1a2add 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java @@ -96,7 +96,7 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange else if (!exchange.getType().equals(body.type)) { - throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + body.exchange + " of type " + exchange.getType() + " to " + body.type +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor()); + throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + body.exchange + " of type " + exchange.getType() + " to " + body.type +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(), null); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index ec9041c309..f9e94af697 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -110,7 +110,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar store.createQueue(queue); } catch (Exception e) { - throw new AMQException("Problem when creating queue " + queue, e); + throw new AMQException(null, "Problem when creating queue " + queue, e); } } queueRegistry.registerQueue(queue); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java index cfea4637ab..eb89bf78e5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java @@ -114,7 +114,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB store.destroyQueue(queue); } catch (Exception e) { - throw new AMQException("problem when destroying queue " + queue, e); + throw new AMQException(null, "problem when destroying queue " + queue, e); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java b/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java index 84526dbc11..31313cf024 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java +++ b/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java @@ -71,7 +71,7 @@ public abstract class DefaultManagedObject extends StandardMBean implements Mana } catch (JMException e) { - throw new AMQException("Error registering managed object " + this + ": " + e, e); + throw new AMQException(null, "Error registering managed object " + this + ": " + e, e); } } @@ -88,7 +88,7 @@ public abstract class DefaultManagedObject extends StandardMBean implements Mana } catch (JMException e) { - throw new AMQException("Error unregistering managed object: " + this + ": " + e, e); + throw new AMQException(null, "Error unregistering managed object: " + this + ": " + e, e); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index d430f1af94..82e969b496 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -168,7 +168,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, catch (JMException ex) { _logger.error("AMQProtocolSession MBean creation has failed ", ex); - throw new AMQException("AMQProtocolSession MBean creation has failed ", ex); + throw new AMQException(null, "AMQProtocolSession MBean creation has failed ", ex); } } @@ -199,7 +199,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } else { - throw new UnknnownMessageTypeException(message); + throw new UnknnownMessageTypeException(message, null); } } @@ -321,7 +321,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } if (!wasAnyoneInterested) { - throw new AMQNoMethodHandlerException(evt); + throw new AMQNoMethodHandlerException(evt, null); } } catch (AMQChannelException e) @@ -425,7 +425,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, AMQChannel channel = getChannel(channelId); if (channel == null) { - throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId); + throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId, null); } return channel; } @@ -454,14 +454,14 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { if (_closed) { - throw new AMQException("Session is closed"); + throw new AMQException(null, "Session is closed", null); } final int channelId = channel.getChannelId(); if (_closingChannelsList.contains(channelId)) { - throw new AMQException("Session is marked awaiting channel close"); + throw new AMQException(null, "Session is marked awaiting channel close", null); } if (_channelMap.size() == _maxNoOfChannels) @@ -469,7 +469,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, String errorMessage = toString() + ": maximum number of channels has been reached (" + _maxNoOfChannels + "); can't create channel"; _logger.error(errorMessage); - throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage); + throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage, null); } else { diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java index a7599a3e0d..ee6e090e24 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java @@ -39,8 +39,8 @@ import org.apache.qpid.protocol.AMQMethodEvent; */
public class AMQNoMethodHandlerException extends AMQException
{
- public AMQNoMethodHandlerException(AMQMethodEvent<AMQMethodBody> evt)
+ public AMQNoMethodHandlerException(AMQMethodEvent<AMQMethodBody> evt, Throwable cause)
{
- super("AMQMethodEvent " + evt + " was not processed by any listener on Broker.");
+ super(null, "AMQMethodEvent " + evt + " was not processed by any listener on Broker.", cause);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java index 6e72aa062f..d053884e69 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java @@ -39,8 +39,8 @@ import org.apache.qpid.framing.AMQDataBlock; */
public class UnknnownMessageTypeException extends AMQException
{
- public UnknnownMessageTypeException(AMQDataBlock message)
+ public UnknnownMessageTypeException(AMQDataBlock message, Throwable cause)
{
- super("Unknown message type: " + message.getClass().getName() + ": " + message);
+ super(null, "Unknown message type: " + message.getClass().getName() + ": " + message, cause);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 6ffe1af018..95f75fdb36 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -20,31 +20,33 @@ */ package org.apache.qpid.server.queue; +import java.util.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + + +/** Combines the information that make up a deliverable message into a more manageable form. */ + +import org.apache.log4j.Logger; + +import org.apache.mina.common.ByteBuffer; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQBody; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; -import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.messageStore.MessageStore; -import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.messageStore.StorableMessage; import org.apache.qpid.server.messageStore.StorableQueue; +import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.registry.ApplicationRegistry; - -/** Combines the information that make up a deliverable message into a more manageable form. */ - -import org.apache.log4j.Logger; -import org.apache.mina.common.ByteBuffer; - -import java.util.*; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.txn.TransactionalContext; /** * Combines the information that make up a deliverable message into a more manageable form. @@ -56,7 +58,7 @@ public class AMQMessage implements StorableMessage // The ordered list of queues into which this message is enqueued. private List<StorableQueue> _queues = new LinkedList<StorableQueue>(); // Indicates whether this message is staged - private boolean _isStaged = false; + private boolean _isStaged = false; /** * Used in clustering @@ -89,18 +91,15 @@ public class AMQMessage implements StorableMessage */ private boolean _immediate; - // private Subscription _takenBySubcription; - // private AtomicBoolean _taken = new AtomicBoolean(false); + // private Subscription _takenBySubcription; + // private AtomicBoolean _taken = new AtomicBoolean(false); private TransientMessageData _transientMessageData = new TransientMessageData(); - private Set<Subscription> _rejectedBy = null; - private Map<AMQQueue, AtomicBoolean> _takenMap = new HashMap<AMQQueue, AtomicBoolean>(); private Map<AMQQueue, Subscription> _takenBySubcriptionMap = new HashMap<AMQQueue, Subscription>(); - private final int hashcode = System.identityHashCode(this); private long _expiration; @@ -111,8 +110,10 @@ public class AMQMessage implements StorableMessage public void setExpiration() { - long expiration = ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getExpiration(); - long timestamp = ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getTimestamp(); + long expiration = + ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getExpiration(); + long timestamp = + ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getTimestamp(); if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("advanced.synced-clocks", false)) { @@ -125,10 +126,10 @@ public class AMQMessage implements StorableMessage { if (timestamp != 0L) { - //todo perhaps use arrival time + // todo perhaps use arrival time long diff = (System.currentTimeMillis() - timestamp); - if (diff > 1000L || diff < 1000L) + if ((diff > 1000L) || (diff < 1000L)) { _expiration = expiration + diff; } @@ -159,11 +160,12 @@ public class AMQMessage implements StorableMessage { try { - return _index < _messageHandle.getBodyCount(getStoreContext(), _messageId) - 1; + return _index < (_messageHandle.getBodyCount(getStoreContext(), _messageId) - 1); } catch (AMQException e) { _log.error("Unable to get body count: " + e, e); + return false; } } @@ -173,7 +175,10 @@ public class AMQMessage implements StorableMessage try { - AMQBody cb = getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(), _messageId, ++_index)); + AMQBody cb = + getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(), + _messageId, ++_index)); + return new AMQFrame(_channel, cb); } catch (AMQException e) @@ -209,11 +214,12 @@ public class AMQMessage implements StorableMessage { try { - return _index < _messageHandle.getBodyCount(getStoreContext(), _messageId) - 1; + return _index < (_messageHandle.getBodyCount(getStoreContext(), _messageId) - 1); } catch (AMQException e) { _log.error("Error getting body count: " + e, e); + return false; } } @@ -236,8 +242,7 @@ public class AMQMessage implements StorableMessage } } - public AMQMessage(Long messageId, MessagePublishInfo info, - TransactionalContext txnContext) + public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext) { _messageId = messageId; _txnContext = txnContext; @@ -257,8 +262,7 @@ public class AMQMessage implements StorableMessage * @throws AMQException */ public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory, TransactionalContext txnConext) - throws - AMQException + throws AMQException { _messageId = messageId; _messageHandle = factory.createMessageHandle(store, this, true); @@ -274,10 +278,8 @@ public class AMQMessage implements StorableMessage * @param txnContext * @param contentHeader */ - public AMQMessage(Long messageId, MessagePublishInfo info, - TransactionalContext txnContext, ContentHeaderBody contentHeader) - throws - AMQException + public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext, + ContentHeaderBody contentHeader) throws AMQException { this(messageId, info, txnContext); setContentHeaderBody(contentHeader); @@ -294,13 +296,9 @@ public class AMQMessage implements StorableMessage * @param contentBodies * @throws AMQException */ - public AMQMessage(Long messageId, MessagePublishInfo info, - TransactionalContext txnContext, - ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues, - List<ContentChunk> contentBodies, MessageStore messageStore, StoreContext storeContext, - MessageHandleFactory messageHandleFactory) - throws - AMQException + public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext, + ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues, List<ContentChunk> contentBodies, + MessageStore messageStore, StoreContext storeContext, MessageHandleFactory messageHandleFactory) throws AMQException { this(messageId, info, txnContext, contentHeader); _transientMessageData.setDestinationQueues(destinationQueues); @@ -311,9 +309,7 @@ public class AMQMessage implements StorableMessage } } - protected AMQMessage(AMQMessage msg) - throws - AMQException + protected AMQMessage(AMQMessage msg) throws AMQException { _messageId = msg._messageId; _messageHandle = msg._messageHandle; @@ -322,9 +318,9 @@ public class AMQMessage implements StorableMessage _transientMessageData = msg._transientMessageData; } - //======================================================================== + // ======================================================================== // Interface StorableMessage - //======================================================================== + // ======================================================================== public long getMessageId() { @@ -342,10 +338,12 @@ public class AMQMessage implements StorableMessage result = new byte[headerBody.getSize()]; bufferedResult = ByteBuffer.wrap(result); headerBody.writePayload(bufferedResult); - } catch (AMQException e) + } + catch (AMQException e) { _log.error("Error when getting message header", e); } + return result; } @@ -355,10 +353,12 @@ public class AMQMessage implements StorableMessage try { result = _messageHandle.getContentHeaderBody(_txnContext.getStoreContext(), _messageId).getSize(); - } catch (AMQException e) + } + catch (AMQException e) { _log.error("Error when getting message header size", e); } + return result; } @@ -372,7 +372,7 @@ public class AMQMessage implements StorableMessage return _messageHandle.getMessagePayload().length; } - public boolean isEnqueued() + public boolean isEnqueued() { return _queues.size() > 0; } @@ -401,6 +401,7 @@ public class AMQMessage implements StorableMessage { _log.debug("The queue position is " + _queues.indexOf(queue)); } + return _queues.indexOf(queue); } @@ -424,44 +425,40 @@ public class AMQMessage implements StorableMessage return new BodyContentIterator(); } - public ContentHeaderBody getContentHeaderBody() - throws - AMQException + public ContentHeaderBody getContentHeaderBody() throws AMQException { if (_transientMessageData != null) { return _transientMessageData.getContentHeaderBody(); - } else + } + else { return _messageHandle.getContentHeaderBody(getStoreContext(), _messageId); } } - public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) - throws - AMQException + public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) throws AMQException { _transientMessageData.setContentHeaderBody(contentHeaderBody); } public void routingComplete(MessageStore store, StoreContext storeContext, MessageHandleFactory factory) - throws - AMQException + throws AMQException { final boolean persistent = isPersistent(); _messageHandle = factory.createMessageHandle(store, this, persistent); - //if (persistent) - // { - _txnContext.beginTranIfNecessary(); - // } + // if (persistent) + // { + _txnContext.beginTranIfNecessary(); + // } // enqueuing the messages ensure that if required the destinations are recorded to a // persistent store - // for (AMQQueue q : _transientMessageData.getDestinationQueues()) - // { - // _messageHandle.enqueue(storeContext, _messageId, q); - // } + // for (AMQQueue q : _transientMessageData.getDestinationQueues()) + // { + // _messageHandle.enqueue(storeContext, _messageId, q); + // } if (_transientMessageData.getContentHeaderBody().bodySize == 0) { @@ -469,9 +466,7 @@ public class AMQMessage implements StorableMessage } } - public boolean addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk) - throws - AMQException + public boolean addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk) throws AMQException { _transientMessageData.addBodyLength(contentChunk.getSize()); final boolean allContentReceived = isAllContentReceived(); @@ -479,21 +474,20 @@ public class AMQMessage implements StorableMessage if (allContentReceived) { deliver(storeContext); + return true; - } else + } + else { return false; } } - public boolean isAllContentReceived() - throws - AMQException + public boolean isAllContentReceived() throws AMQException { return _transientMessageData.isAllContentReceived(); } - /** * Creates a long-lived reference to this message, and increments the count of such references, as an atomic * operation. @@ -501,6 +495,7 @@ public class AMQMessage implements StorableMessage public AMQMessage takeReference() { _referenceCount.incrementAndGet(); + return this; } @@ -510,10 +505,10 @@ public class AMQMessage implements StorableMessage protected void incrementReference() { _referenceCount.incrementAndGet(); -// if (_log.isDebugEnabled()) -// { -// _log.debug("Ref count on message " + debugIdentity() + " incremented " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); -// } + // if (_log.isDebugEnabled()) + // { + // _log.debug("Ref count on message " + debugIdentity() + " incremented " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); + // } } /** @@ -524,9 +519,7 @@ public class AMQMessage implements StorableMessage * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that * failed */ - public void decrementReference(StoreContext storeContext) - throws - MessageCleanupException + public void decrementReference(StoreContext storeContext) throws MessageCleanupException { int count = _referenceCount.decrementAndGet(); @@ -538,10 +531,10 @@ public class AMQMessage implements StorableMessage { try { -// if (_log.isDebugEnabled()) -// { -// _log.debug("Decremented ref count on message " + debugIdentity() + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); -// } + // if (_log.isDebugEnabled()) + // { + // _log.debug("Decremented ref count on message " + debugIdentity() + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); + // } // must check if the handle is null since there may be cases where we decide to throw away a message // and the handle has not yet been constructed @@ -552,15 +545,17 @@ public class AMQMessage implements StorableMessage } catch (AMQException e) { - //to maintain consistency, we revert the count + // to maintain consistency, we revert the count incrementReference(); - throw new MessageCleanupException(_messageId, e); + throw new MessageCleanupException("Failed to cleanup message with id " + _messageId, e); } - } else + } + else { if (count < 0) { - throw new MessageCleanupException("Reference count for message id " + debugIdentity() + " has gone below 0."); + throw new MessageCleanupException("Reference count for message id " + debugIdentity() + " has gone below 0.", + null); } } } @@ -587,7 +582,7 @@ public class AMQMessage implements StorableMessage public boolean isTaken(AMQQueue queue) { - //return _taken.get(); + // return _taken.get(); synchronized (this) { @@ -604,15 +599,15 @@ public class AMQMessage implements StorableMessage public boolean taken(AMQQueue queue, Subscription sub) { -// if (_taken.getAndSet(true)) -// { -// return true; -// } -// else -// { -// _takenBySubcription = sub; -// return false; -// } + // if (_taken.getAndSet(true)) + // { + // return true; + // } + // else + // { + // _takenBySubcription = sub; + // return false; + // } synchronized (this) { @@ -625,10 +620,12 @@ public class AMQMessage implements StorableMessage if (taken.getAndSet(true)) { return true; - } else + } + else { _takenMap.put(queue, taken); _takenBySubcriptionMap.put(queue, sub); + return false; } } @@ -641,9 +638,8 @@ public class AMQMessage implements StorableMessage _log.trace("Releasing Message:" + debugIdentity()); } -// _taken.set(false); -// _takenBySubcription = null; - + // _taken.set(false); + // _takenBySubcription = null; synchronized (this) { @@ -651,7 +647,8 @@ public class AMQMessage implements StorableMessage if (taken == null) { taken = new AtomicBoolean(false); - } else + } + else { taken.set(false); } @@ -672,9 +669,11 @@ public class AMQMessage implements StorableMessage if (_tokens.contains(token)) { return true; - } else + } + else { _tokens.add(token); + return false; } } @@ -687,28 +686,23 @@ public class AMQMessage implements StorableMessage * @param queue the queue * @throws org.apache.qpid.AMQException if there is an error enqueuing the message */ - public void enqueue(AMQQueue queue) - throws - AMQException + public void enqueue(AMQQueue queue) throws AMQException { _transientMessageData.addDestinationQueue(queue); } - public void dequeue(StoreContext storeContext, AMQQueue queue) - throws - AMQException + public void dequeue(StoreContext storeContext, AMQQueue queue) throws AMQException { _messageHandle.dequeue(storeContext, _messageId, queue); } - public boolean isPersistent() - throws - AMQException + public boolean isPersistent() throws AMQException { if (_transientMessageData != null) { return _transientMessageData.isPersistent(); - } else + } + else { return _messageHandle.isPersistent(getStoreContext(), _messageId); } @@ -720,29 +714,27 @@ public class AMQMessage implements StorableMessage * @throws NoConsumersException if the message is marked for immediate delivery but has not been marked as delivered * to a consumer */ - public void checkDeliveredToConsumer() - throws - NoConsumersException + public void checkDeliveredToConsumer() throws NoConsumersException { if (_immediate && !_deliveredToConsumer) { - throw new NoConsumersException(this); + throw new NoConsumersException(this, null); } } - public MessagePublishInfo getMessagePublishInfo() - throws - AMQException + public MessagePublishInfo getMessagePublishInfo() throws AMQException { MessagePublishInfo pb; if (_transientMessageData != null) { pb = _transientMessageData.getMessagePublishInfo(); - } else + } + else { pb = _messageHandle.getMessagePublishInfo(getStoreContext(), _messageId); } + return pb; } @@ -773,7 +765,7 @@ public class AMQMessage implements StorableMessage */ public boolean expired(StoreContext storecontext, AMQQueue queue) throws AMQException { - //note: If the storecontext isn't need then we can remove the getChannel() from Subscription. + // note: If the storecontext isn't need then we can remove the getChannel() from Subscription. if (_expiration != 0L) { @@ -782,6 +774,7 @@ public class AMQMessage implements StorableMessage if (now > _expiration) { dequeue(storecontext, queue); + return true; } } @@ -795,9 +788,7 @@ public class AMQMessage implements StorableMessage _deliveredToConsumer = true; } - private void deliver(StoreContext storeContext) - throws - AMQException + private void deliver(StoreContext storeContext) throws AMQException { // we get a reference to the destination queues now so that we can clear the // transient message data as quickly as possible @@ -806,12 +797,13 @@ public class AMQMessage implements StorableMessage { _log.debug("Delivering message " + debugIdentity() + " to " + destinationQueues); } + try { // first we allow the handle to know that the message has been fully received. This is useful if it is // maintaining any calculated values based on content chunks - _messageHandle.setPublishAndContentHeaderBody(storeContext, _messageId, _transientMessageData.getMessagePublishInfo(), - _transientMessageData.getContentHeaderBody()); + _messageHandle.setPublishAndContentHeaderBody(storeContext, _messageId, + _transientMessageData.getMessagePublishInfo(), _transientMessageData.getContentHeaderBody()); // we then allow the transactional context to do something with the message content // now that it has all been received, before we attempt delivery @@ -821,9 +813,9 @@ public class AMQMessage implements StorableMessage for (AMQQueue q : destinationQueues) { - //Increment the references to this message for each queue delivery. + // Increment the references to this message for each queue delivery. incrementReference(); - //normal deliver so add this message at the end. + // normal deliver so add this message at the end. _txnContext.deliver(this, q, false); } } @@ -835,182 +827,181 @@ public class AMQMessage implements StorableMessage } } -/* - public void writeDeliver(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag) - throws AMQException - { - ByteBuffer deliver = createEncodedDeliverFrame(protocolSession, channelId, deliveryTag, consumerTag); - AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, - getContentHeaderBody()); - - final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId); - if (bodyCount == 0) + /* + public void writeDeliver(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag) + throws AMQException { - SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver, - contentHeader); + ByteBuffer deliver = createEncodedDeliverFrame(protocolSession, channelId, deliveryTag, consumerTag); + AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, + getContentHeaderBody()); + + final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId); + if (bodyCount == 0) + { + SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver, + contentHeader); + + protocolSession.writeFrame(compositeBlock); + } + else + { + + // + // Optimise the case where we have a single content body. In that case we create a composite block + // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. + // + ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0); + + AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)); + AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody}; + CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent); + protocolSession.writeFrame(compositeBlock); + + // + // Now start writing out the other content bodies + // + for (int i = 1; i < bodyCount; i++) + { + cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i); + protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb))); + } + + + } + - protocolSession.writeFrame(compositeBlock); } - else + + public void writeGetOk(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize) throws AMQException { + ByteBuffer deliver = createEncodedGetOkFrame(protocolSession, channelId, deliveryTag, queueSize); + AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, + getContentHeaderBody()); - // - // Optimise the case where we have a single content body. In that case we create a composite block - // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. - // - ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0); + final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId); + if (bodyCount == 0) + { + SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver, + contentHeader); + protocolSession.writeFrame(compositeBlock); + } + else + { + + // + // Optimise the case where we have a single content body. In that case we create a composite block + // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. + // + ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0); + + AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)); + AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody}; + CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent); + protocolSession.writeFrame(compositeBlock); + + // + // Now start writing out the other content bodies + // + for (int i = 1; i < bodyCount; i++) + { + cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i); + protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb))); + } - AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)); - AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody}; - CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent); - protocolSession.writeFrame(compositeBlock); - // - // Now start writing out the other content bodies - // - for (int i = 1; i < bodyCount; i++) - { - cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i); - protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb))); } } - } + private ByteBuffer createEncodedDeliverFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag) + throws AMQException + { + MessagePublishInfo pb = getMessagePublishInfo(); + AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, protocolSession.getProtocolMajorVersion(), (byte) 0, consumerTag, + deliveryTag, pb.getExchange(), _messageHandle.isRedelivered(), + pb.getRoutingKey()); + ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem? + deliverFrame.writePayload(buf); + buf.flip(); + return buf; + } - public void writeGetOk(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize) throws AMQException - { - ByteBuffer deliver = createEncodedGetOkFrame(protocolSession, channelId, deliveryTag, queueSize); - AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, - getContentHeaderBody()); + private ByteBuffer createEncodedGetOkFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize) + throws AMQException + { + MessagePublishInfo pb = getMessagePublishInfo(); + AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId, + protocolSession.getProtocolMajorVersion(), + protocolSession.getProtocolMinorVersion(), + deliveryTag, pb.getExchange(), + queueSize, + _messageHandle.isRedelivered(), + pb.getRoutingKey()); + ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); // XXX: Could cast be a problem? + getOkFrame.writePayload(buf); + buf.flip(); + return buf; + } - final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId); - if (bodyCount == 0) + private ByteBuffer createEncodedReturnFrame(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText) throws AMQException { - SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver, - contentHeader); - protocolSession.writeFrame(compositeBlock); + AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId, + protocolSession.getProtocolMajorVersion(), + protocolSession.getProtocolMinorVersion(), + getMessagePublishInfo().getExchange(), + replyCode, replyText, + getMessagePublishInfo().getRoutingKey()); + ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem? + returnFrame.writePayload(buf); + buf.flip(); + return buf; } - else + + public void writeReturn(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText) + throws AMQException { + ByteBuffer returnFrame = createEncodedReturnFrame(protocolSession, channelId, replyCode, replyText); + + AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, + getContentHeaderBody()); + Iterator<AMQDataBlock> bodyFrameIterator = getBodyFrameIterator(protocolSession, channelId); // // Optimise the case where we have a single content body. In that case we create a composite block // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. // - ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0); - - AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)); - AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody}; - CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent); - protocolSession.writeFrame(compositeBlock); + if (bodyFrameIterator.hasNext()) + { + AMQDataBlock firstContentBody = bodyFrameIterator.next(); + AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody}; + CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent); + protocolSession.writeFrame(compositeBlock); + } + else + { + CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, + new AMQDataBlock[]{contentHeader}); + protocolSession.writeFrame(compositeBlock); + } // // Now start writing out the other content bodies + // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded // - for (int i = 1; i < bodyCount; i++) + while (bodyFrameIterator.hasNext()) { - cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i); - protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb))); + protocolSession.writeFrame(bodyFrameIterator.next()); } - - } - - - } - - - private ByteBuffer createEncodedDeliverFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag) - throws AMQException - { - MessagePublishInfo pb = getMessagePublishInfo(); - AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, protocolSession.getProtocolMajorVersion(), (byte) 0, consumerTag, - deliveryTag, pb.getExchange(), _messageHandle.isRedelivered(), - pb.getRoutingKey()); - ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem? - deliverFrame.writePayload(buf); - buf.flip(); - return buf; - } - - private ByteBuffer createEncodedGetOkFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize) - throws AMQException - { - MessagePublishInfo pb = getMessagePublishInfo(); - AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId, - protocolSession.getProtocolMajorVersion(), - protocolSession.getProtocolMinorVersion(), - deliveryTag, pb.getExchange(), - queueSize, - _messageHandle.isRedelivered(), - pb.getRoutingKey()); - ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); // XXX: Could cast be a problem? - getOkFrame.writePayload(buf); - buf.flip(); - return buf; - } - - private ByteBuffer createEncodedReturnFrame(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText) throws AMQException - { - AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId, - protocolSession.getProtocolMajorVersion(), - protocolSession.getProtocolMinorVersion(), - getMessagePublishInfo().getExchange(), - replyCode, replyText, - getMessagePublishInfo().getRoutingKey()); - ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem? - returnFrame.writePayload(buf); - buf.flip(); - return buf; - } - - public void writeReturn(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText) - throws AMQException - { - ByteBuffer returnFrame = createEncodedReturnFrame(protocolSession, channelId, replyCode, replyText); - - AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, - getContentHeaderBody()); - - Iterator<AMQDataBlock> bodyFrameIterator = getBodyFrameIterator(protocolSession, channelId); - // - // Optimise the case where we have a single content body. In that case we create a composite block - // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. - // - if (bodyFrameIterator.hasNext()) - { - AMQDataBlock firstContentBody = bodyFrameIterator.next(); - AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody}; - CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent); - protocolSession.writeFrame(compositeBlock); - } - else - { - CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, - new AMQDataBlock[]{contentHeader}); - protocolSession.writeFrame(compositeBlock); - } - - // - // Now start writing out the other content bodies - // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded - // - while (bodyFrameIterator.hasNext()) - { - protocolSession.writeFrame(bodyFrameIterator.next()); - } - } -*/ + */ public AMQMessageHandle getMessageHandle() { return _messageHandle; } - public long getSize() { try @@ -1022,15 +1013,13 @@ public class AMQMessage implements StorableMessage catch (AMQException e) { _log.error(e.toString(), e); + return 0; } } - - public void restoreTransientMessageData() - throws - AMQException + public void restoreTransientMessageData() throws AMQException { TransientMessageData transientMessageData = new TransientMessageData(); transientMessageData.setMessagePublishInfo(getMessagePublishInfo()); @@ -1039,25 +1028,23 @@ public class AMQMessage implements StorableMessage _transientMessageData = transientMessageData; } - public void clearTransientMessageData() { _transientMessageData = null; } - public String toString() { -// return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " + -// _taken + " by :" + _takenBySubcription; + // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " + + // _taken + " by :" + _takenBySubcription; - return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " + - _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString(); + return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " + + _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString(); } public Subscription getDeliveredSubscription(AMQQueue queue) { -// return _takenBySubcription; + // return _takenBySubcription; synchronized (this) { return _takenBySubcriptionMap.get(queue); @@ -1074,7 +1061,8 @@ public class AMQMessage implements StorableMessage } _rejectedBy.add(subscription); - } else + } + else { _log.warn("Requesting rejection by null subscriber:" + debugIdentity()); } @@ -1084,10 +1072,11 @@ public class AMQMessage implements StorableMessage { boolean rejected = _rejectedBy != null; - if (rejected) // We have subscriptions that rejected this message + if (rejected) // We have subscriptions that rejected this message { return _rejectedBy.contains(subscription); - } else // This messasge hasn't been rejected yet. + } + else // This messasge hasn't been rejected yet. { return rejected; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index a17cbb87ff..a803ef1227 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -21,9 +21,9 @@ package org.apache.qpid.server.queue; import java.text.MessageFormat; -import java.util.List; -import java.util.Hashtable; import java.util.Collection; +import java.util.Hashtable; +import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; @@ -40,11 +40,11 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.exception.InternalErrorException; -import org.apache.qpid.server.messageStore.StorableMessage; -import org.apache.qpid.server.messageStore.StorableQueue; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.messageStore.StorableMessage; +import org.apache.qpid.server.messageStore.StorableQueue; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -55,49 +55,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost; */ public class AMQQueue implements Managable, Comparable, StorableQueue { - /** - * ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription - * already exists. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Represent failure to create a subscription, because an exclusive subscription already exists. - * </table> - * - * @todo Not an AMQP exception as no status code. - * - * @todo Move to top level, used outside this class. - */ - public static int s_queueID =0; - public static final class ExistingExclusiveSubscription extends AMQException - { - - public ExistingExclusiveSubscription() - { - super(""); - } - } - - /** - * ExistingSubscriptionPreventsExclusive signals a failure to create an exclusize subscription, as a subscription - * already exists. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Represent failure to create an exclusize subscription, as a subscription already exists. - * </table> - * - * @todo Not an AMQP exception as no status code. - * - * @todo Move to top level, used outside this class. - */ - public static final class ExistingSubscriptionPreventsExclusive extends AMQException - { - public ExistingSubscriptionPreventsExclusive() - { - super(""); - } - } + public static int s_queueID = 0; private static final Logger _logger = Logger.getLogger(AMQQueue.class); @@ -108,7 +66,6 @@ public class AMQQueue implements Managable, Comparable, StorableQueue // The list of enqueued messages. Hashtable<Long, StorableMessage> _messages = new Hashtable<Long, StorableMessage>(); - /** * null means shared */ @@ -236,12 +193,10 @@ public class AMQQueue implements Managable, Comparable, StorableQueue _subscribers = subscribers; _subscriptionFactory = subscriptionFactory; _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this); - _queueId = s_queueID++; + _queueId = s_queueID++; } - private AMQQueueMBean createMBean() - throws - AMQException + private AMQQueueMBean createMBean() throws AMQException { try { @@ -249,7 +204,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue } catch (JMException ex) { - throw new AMQException("AMQQueue MBean creation has failed ", ex); + throw new AMQException(null, "AMQQueue MBean creation has failed ", ex); } } @@ -443,9 +398,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue /** * Removes the AMQMessage from the top of the queue. */ - public synchronized void deleteMessageFromTop(StoreContext storeContext) - throws - AMQException + public synchronized void deleteMessageFromTop(StoreContext storeContext) throws AMQException { _deliveryMgr.removeAMessageFromTop(storeContext); } @@ -453,16 +406,12 @@ public class AMQQueue implements Managable, Comparable, StorableQueue /** * removes all the messages from the queue. */ - public synchronized long clearQueue(StoreContext storeContext) - throws - AMQException + public synchronized long clearQueue(StoreContext storeContext) throws AMQException { return _deliveryMgr.clearAllMessages(storeContext); } - public void bind(AMQShortString routingKey, FieldTable arguments, Exchange exchange) - throws - AMQException + public void bind(AMQShortString routingKey, FieldTable arguments, Exchange exchange) throws AMQException { exchange.registerQueue(routingKey, this, arguments); if (isDurable() && exchange.isDurable()) @@ -470,18 +419,17 @@ public class AMQQueue implements Managable, Comparable, StorableQueue try { _virtualHost.getMessageStore().bindQueue(exchange, routingKey, this, arguments); - } catch (InternalErrorException e) + } + catch (InternalErrorException e) { - throw new AMQException("Problem binding queue ", e); + throw new AMQException(null, "Problem binding queue ", e); } } _bindings.addBinding(routingKey, arguments, exchange); } - public void unBind(AMQShortString routingKey, FieldTable arguments, Exchange exchange) - throws - AMQException + public void unBind(AMQShortString routingKey, FieldTable arguments, Exchange exchange) throws AMQException { exchange.deregisterQueue(routingKey, this, arguments); if (isDurable() && exchange.isDurable()) @@ -489,9 +437,10 @@ public class AMQQueue implements Managable, Comparable, StorableQueue try { _virtualHost.getMessageStore().unbindQueue(exchange, routingKey, this, arguments); - } catch (InternalErrorException e) + } + catch (InternalErrorException e) { - throw new AMQException("problem unbinding queue", e); + throw new AMQException(null, "problem unbinding queue", e); } } @@ -506,14 +455,16 @@ public class AMQQueue implements Managable, Comparable, StorableQueue if (isExclusive()) { decrementSubscriberCount(); - throw new ExistingExclusiveSubscription(); - } else if (exclusive) + throw new ExistingExclusiveSubscriptionException(); + } + else if (exclusive) { decrementSubscriberCount(); - throw new ExistingSubscriptionPreventsExclusive(); + throw new ExistingSubscriptionPreventsExclusiveException(); } - } else if (exclusive) + } + else if (exclusive) { setExclusive(true); } @@ -559,9 +510,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue return _subscriberCount.decrementAndGet(); } - public void unregisterProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag) - throws - AMQException + public void unregisterProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag) throws AMQException { if (_logger.isDebugEnabled()) { @@ -576,8 +525,8 @@ public class AMQQueue implements Managable, Comparable, StorableQueue _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel, ps, consumerTag))) == null) { - throw new AMQException("Protocol session with channel " + channel + " and consumer tag " + consumerTag - + " and protocol session key " + ps.getKey() + " not registered with queue " + this); + throw new AMQException(null, "Protocol session with channel " + channel + " and consumer tag " + consumerTag + + " and protocol session key " + ps.getKey() + " not registered with queue " + this, null); } removedSubscription.close(); @@ -609,21 +558,21 @@ public class AMQQueue implements Managable, Comparable, StorableQueue return !_deliveryMgr.hasQueuedMessages(); } - public int delete(boolean checkUnused, boolean checkEmpty) - throws - AMQException + public int delete(boolean checkUnused, boolean checkEmpty) throws AMQException { if (checkUnused && !_subscribers.isEmpty()) { _logger.info("Will not delete " + this + " as it is in use."); return 0; - } else if (checkEmpty && _deliveryMgr.hasQueuedMessages()) + } + else if (checkEmpty && _deliveryMgr.hasQueuedMessages()) { _logger.info("Will not delete " + this + " as it is not empty."); return 0; - } else + } + else { delete(); @@ -631,9 +580,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue } } - public void delete() - throws - AMQException + public void delete() throws AMQException { if (!_deleted.getAndSet(true)) { @@ -650,9 +597,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue } } - protected void autodelete() - throws - AMQException + protected void autodelete() throws AMQException { if (_logger.isDebugEnabled()) { @@ -662,9 +607,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue delete(); } - public void processGet(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) - throws - AMQException + public void processGet(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException { // fixme not sure what this is doing. should we be passing deliverFirst through here? // This code is not used so when it is perhaps it should @@ -687,9 +630,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue // return _deliveryMgr; // } - public void process(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) - throws - AMQException + public void process(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException { _deliveryMgr.deliver(storeContext, getName(), msg, deliverFirst); try @@ -705,9 +646,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue } } - void dequeue(StoreContext storeContext, AMQMessage msg) - throws - FailedDequeueException + void dequeue(StoreContext storeContext, AMQMessage msg) throws FailedDequeueException { try { @@ -737,9 +676,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue return _subscribers; } - protected void updateReceivedMessageCount(AMQMessage msg) - throws - AMQException + protected void updateReceivedMessageCount(AMQMessage msg) throws AMQException { if (!msg.isRedelivered()) { @@ -752,7 +689,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue } catch (JMException e) { - throw new AMQException("Unable to get notification from manage queue: " + e, e); + throw new AMQException(null, "Unable to get notification from manage queue: " + e, e); } } @@ -783,9 +720,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue return "Queue(" + _name + ")@" + System.identityHashCode(this); } - public boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) - throws - AMQException + public boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) throws AMQException { return _deliveryMgr.performGet(session, channel, acks); } @@ -802,9 +737,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue public static interface Task { - public void doTask(AMQQueue queue) - throws - AMQException; + public void doTask(AMQQueue queue) throws AMQException; } public void addQueueDeleteTask(Task task) @@ -837,9 +770,9 @@ public class AMQQueue implements Managable, Comparable, StorableQueue _deliveryMgr.subscriberHasPendingResend(hasContent, subscription, msg); } - //======================================================================== + // ======================================================================== // Interface StorableQueue - //======================================================================== + // ======================================================================== public int getQueueID() { @@ -861,9 +794,9 @@ public class AMQQueue implements Managable, Comparable, StorableQueue _messages.put(m.getMessageId(), m); } - //======================================================================== + // ======================================================================== // Used by the Store - //======================================================================== + // ======================================================================== /** * Get the list of enqueud messages diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ExistingExclusiveSubscriptionException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ExistingExclusiveSubscriptionException.java new file mode 100644 index 0000000000..a5ff9e6326 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ExistingExclusiveSubscriptionException.java @@ -0,0 +1,22 @@ +package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+
+/**
+ * ExistingExclusiveSubscriptionException signals a failure to create a subscription, because an exclusive subscription
+ * already exists.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represent failure to create a subscription, because an exclusive subscription already exists.
+ * </table>
+ *
+ * @todo Not an AMQP exception as no status code.
+ */
+public final class ExistingExclusiveSubscriptionException extends AMQException
+{
+ public ExistingExclusiveSubscriptionException()
+ {
+ super(null, "", null);
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ExistingSubscriptionPreventsExclusiveException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ExistingSubscriptionPreventsExclusiveException.java new file mode 100644 index 0000000000..a13686eb56 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ExistingSubscriptionPreventsExclusiveException.java @@ -0,0 +1,22 @@ +package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+
+/**
+ * ExistingSubscriptionPreventsExclusiveException signals a failure to create an exclusize subscription, as a subscription
+ * already exists.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represent failure to create an exclusize subscription, as a subscription already exists.
+ * </table>
+ *
+ * @todo Not an AMQP exception as no status code.
+ */
+public final class ExistingSubscriptionPreventsExclusiveException extends AMQException
+{
+ public ExistingSubscriptionPreventsExclusiveException()
+ {
+ super(null, "", null);
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java index 6466e81dd2..d5c34152a8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java @@ -38,13 +38,8 @@ import org.apache.qpid.AMQException; */ public class FailedDequeueException extends AMQException { - public FailedDequeueException(String queue) + public FailedDequeueException(String queue, Throwable cause) { - super("Failed to dequeue message from " + queue); - } - - public FailedDequeueException(String queue, AMQException e) - { - super("Failed to dequeue message from " + queue, e); + super(null, "Failed to dequeue message from " + queue, cause); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java index 090096d3c3..2fdd2791b1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java @@ -40,13 +40,8 @@ import org.apache.qpid.AMQException; */ public class MessageCleanupException extends AMQException { - public MessageCleanupException(long messageId, AMQException e) + public MessageCleanupException(String message, Throwable cause) { - super("Failed to cleanup message with id " + messageId, e); - } - - public MessageCleanupException(String message) - { - super(message); + super(null, message, cause); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java index d6fd1eec89..afcdf062de 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java @@ -35,9 +35,9 @@ import org.apache.qpid.server.RequiredDeliveryException; */ public class NoConsumersException extends RequiredDeliveryException { - public NoConsumersException(AMQMessage message) + public NoConsumersException(AMQMessage message, Throwable cause) { - super("Immediate delivery is not possible.", message); + super("Immediate delivery is not possible.", message, cause); } public AMQConstant getReplyCode() diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java index 5539627820..560549c126 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java @@ -175,7 +175,7 @@ public class StorableMessageHandle implements AMQMessageHandle } } catch (Exception e) { - throw new AMQException("PRoblem during message enqueue", e); + throw new AMQException(null, "PRoblem during message enqueue", e); } } @@ -191,7 +191,7 @@ public class StorableMessageHandle implements AMQMessageHandle } } catch (Exception e) { - throw new AMQException("PRoblem during message dequeue", e); + throw new AMQException(null, "PRoblem during message dequeue", e); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index a7be9f2ad2..1cebf08fa6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -108,7 +108,7 @@ public class SubscriptionImpl implements Subscription AMQChannel channel = protocolSession.getChannel(channelId); if (channel == null) { - throw new AMQException(AMQConstant.NOT_FOUND, "channel :" + channelId + " not found in protocol session"); + throw new AMQException(AMQConstant.NOT_FOUND, "channel :" + channelId + " not found in protocol session", null); } this.channel = channel; diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/IllegalStateTransitionException.java b/java/broker/src/main/java/org/apache/qpid/server/state/IllegalStateTransitionException.java deleted file mode 100644 index cec67a8a6d..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/state/IllegalStateTransitionException.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.state; - -import org.apache.qpid.AMQException; - -/** - * @todo Not an AMQP exception as no status code. - * - * @todo Not used! Delete. - */ -public class IllegalStateTransitionException extends AMQException -{ - private AMQState _originalState; - - private Class _frame; - - public IllegalStateTransitionException(AMQState originalState, Class frame) - { - super("No valid state transition defined for receiving frame " + frame + " from state " + originalState); - _originalState = originalState; - _frame = frame; - } - - public AMQState getOriginalState() - { - return _originalState; - } - - public Class getFrameClass() - { - return _frame; - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java index 05756a8c23..6c001485b9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java @@ -87,7 +87,7 @@ public class DistributedTransactionalContext implements TransactionalContext _storeContext.setPayload(xid); } catch (Exception e) { - throw new AMQException("Problem during transaction begin", e); + throw new AMQException(null, "Problem during transaction begin", e); } } } @@ -105,7 +105,7 @@ public class DistributedTransactionalContext implements TransactionalContext } } catch (Exception e) { - throw new AMQException("Problem during transaction commit", e); + throw new AMQException(null, "Problem during transaction commit", e); } finally { @@ -125,7 +125,7 @@ public class DistributedTransactionalContext implements TransactionalContext } } catch (Exception e) { - throw new AMQException("Problem during transaction rollback", e); + throw new AMQException(null, "Problem during transaction rollback", e); } finally { @@ -152,7 +152,7 @@ public class DistributedTransactionalContext implements TransactionalContext _transactionManager.getTransaction((Xid) _storeContext.getPayload()).addRecord(new EnqueueRecord(_storeContext, message, queue, deliverFirst)); } catch (Exception e) { - throw new AMQException("Problem during transaction rollback", e); + throw new AMQException(null, "Problem during transaction rollback", e); } } @@ -196,7 +196,7 @@ public class DistributedTransactionalContext implements TransactionalContext { if (!unacknowledgedMessageMap.contains(deliveryTag)) { - throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel"); + throw new AMQException(null, "Multiple ack on delivery tag " + deliveryTag + " not known for channel", null); } LinkedList<UnacknowledgedMessage> acked = new LinkedList<UnacknowledgedMessage>(); @@ -219,7 +219,7 @@ public class DistributedTransactionalContext implements TransactionalContext if (msg == null) { _log.info("Single ack on delivery tag " + deliveryTag); - throw new AMQException("Single ack on delivery tag " + deliveryTag); + throw new AMQException(null, "Single ack on delivery tag " + deliveryTag, null); } if (_log.isDebugEnabled()) @@ -250,7 +250,7 @@ public class DistributedTransactionalContext implements TransactionalContext _transactionManager.getTransaction((Xid) _storeContext.getPayload()).addRecord(new DequeueRecord()); } catch (Exception e) { - throw new AMQException("Problem during message dequeue", e); + throw new AMQException(null, "Problem during message dequeue", e); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java index 6d776eec0f..93459beb45 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java @@ -126,7 +126,7 @@ public class LocalTransactionalContext implements TransactionalContext { if (!unacknowledgedMessageMap.contains(deliveryTag)) { - throw new AMQException("Ack with delivery tag " + deliveryTag + " not known for channel"); + throw new AMQException(null, "Ack with delivery tag " + deliveryTag + " not known for channel", null); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java index 496c94dae9..addb0c791f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -149,7 +149,7 @@ public class NonTransactionalContext implements TransactionalContext { if (!unacknowledgedMessageMap.contains(deliveryTag)) { - throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel"); + throw new AMQException(null, "Multiple ack on delivery tag " + deliveryTag + " not known for channel", null); } LinkedList<UnacknowledgedMessage> acked = new LinkedList<UnacknowledgedMessage>(); @@ -182,8 +182,8 @@ public class NonTransactionalContext implements TransactionalContext { _log.info("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channel.getChannelId()); - throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" + - _channel.getChannelId()); + throw new AMQException(null, "Single ack on delivery tag " + deliveryTag + " not known for channel:" + + _channel.getChannelId(), null); } if (!_browsedAcks.contains(deliveryTag)) diff --git a/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java index f3b21e3c64..69960e54e5 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,12 +19,13 @@ package org.apache.qpid.example.publisher; -import org.apache.qpid.example.shared.FileUtils; -import org.apache.qpid.example.shared.Statics; - import java.io.*; + import javax.jms.*; +import org.apache.qpid.example.shared.FileUtils; +import org.apache.qpid.example.shared.Statics; + public class FileMessageFactory { protected final Session _session; @@ -47,8 +48,7 @@ public class FileMessageFactory } catch (IOException e) { - MessageFactoryException mfe = new MessageFactoryException(e.toString()); - mfe.initCause(e); + MessageFactoryException mfe = new MessageFactoryException(e.toString(), e); throw mfe; } } @@ -64,7 +64,8 @@ public class FileMessageFactory { TextMessage msg = _session.createTextMessage(); msg.setText(_payload); - msg.setStringProperty(Statics.FILENAME_PROPERTY,new File(_filename).getName()); + msg.setStringProperty(Statics.FILENAME_PROPERTY, new File(_filename).getName()); + return msg; } @@ -79,6 +80,7 @@ public class FileMessageFactory { TextMessage msg = session.createTextMessage(); msg.setText(textMsg); + return msg; } @@ -116,6 +118,7 @@ public class FileMessageFactory catch (JMSException e) { e.printStackTrace(System.out); + return e.toString(); } } @@ -124,13 +127,13 @@ public class FileMessageFactory { try { - return m instanceof TextMessage && ((TextMessage) m).getText().equals(s); + return (m instanceof TextMessage) && ((TextMessage) m).getText().equals(s); } catch (JMSException e) { e.printStackTrace(System.out); + return false; } } } - diff --git a/java/client/example/src/main/java/org/apache/qpid/example/publisher/MessageFactoryException.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MessageFactoryException.java index 0a4231c977..d709da6432 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/publisher/MessageFactoryException.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MessageFactoryException.java @@ -22,33 +22,8 @@ package org.apache.qpid.example.publisher; public class MessageFactoryException extends Exception { - - private int _errorCode; - - public MessageFactoryException(String message) - { - super(message); - } - public MessageFactoryException(String msg, Throwable t) { super(msg, t); } - - public MessageFactoryException(int errorCode, String msg, Throwable t) - { - super(msg + " [error code " + errorCode + ']', t); - _errorCode = errorCode; - } - - public MessageFactoryException(int errorCode, String msg) - { - super(msg + " [error code " + errorCode + ']'); - _errorCode = errorCode; - } - - public int getErrorCode() - { - return _errorCode; - } } diff --git a/java/client/example/src/main/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java index 399cbc9427..245008b68a 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java @@ -25,33 +25,8 @@ package org.apache.qpid.example.publisher; */ public class UndeliveredMessageException extends Exception { - - private int _errorCode; - - public UndeliveredMessageException(String message) - { - super(message); - } - public UndeliveredMessageException(String msg, Throwable t) { super(msg, t); } - - public UndeliveredMessageException(int errorCode, String msg, Throwable t) - { - super(msg + " [error code " + errorCode + ']', t); - _errorCode = errorCode; - } - - public UndeliveredMessageException(int errorCode, String msg) - { - super(msg + " [error code " + errorCode + ']'); - _errorCode = errorCode; - } - - public int getErrorCode() - { - return _errorCode; - } } diff --git a/java/client/example/src/main/java/org/apache/qpid/example/shared/ConnectionException.java b/java/client/example/src/main/java/org/apache/qpid/example/shared/ConnectionException.java index 6eb847ea9d..1a3d596a24 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/shared/ConnectionException.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/shared/ConnectionException.java @@ -22,33 +22,8 @@ package org.apache.qpid.example.shared; public class ConnectionException extends Exception { - - private int _errorCode; - - public ConnectionException(String message) - { - super(message); - } - public ConnectionException(String msg, Throwable t) { super(msg, t); } - - public ConnectionException(int errorCode, String msg, Throwable t) - { - super(msg + " [error code " + errorCode + ']', t); - _errorCode = errorCode; - } - - public ConnectionException(int errorCode, String msg) - { - super(msg + " [error code " + errorCode + ']'); - _errorCode = errorCode; - } - - public int getErrorCode() - { - return _errorCode; - } } diff --git a/java/client/example/src/main/java/org/apache/qpid/example/shared/ContextException.java b/java/client/example/src/main/java/org/apache/qpid/example/shared/ContextException.java index bf805ab817..2987a9559b 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/shared/ContextException.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/shared/ContextException.java @@ -22,33 +22,8 @@ package org.apache.qpid.example.shared; public class ContextException extends Exception { - - private int _errorCode; - - public ContextException(String message) - { - super(message); - } - public ContextException(String msg, Throwable t) { super(msg, t); } - - public ContextException(int errorCode, String msg, Throwable t) - { - super(msg + " [error code " + errorCode + ']', t); - _errorCode = errorCode; - } - - public ContextException(int errorCode, String msg) - { - super(msg + " [error code " + errorCode + ']'); - _errorCode = errorCode; - } - - public int getErrorCode() - { - return _errorCode; - } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java b/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java index b6fbb6c6bf..6bae0166d1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java @@ -35,8 +35,8 @@ import org.apache.qpid.protocol.AMQConstant; */ public class AMQAuthenticationException extends AMQException { - public AMQAuthenticationException(AMQConstant error, String msg) + public AMQAuthenticationException(AMQConstant error, String msg, Throwable cause) { - super(error, msg); + super(error, msg, cause); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 347f5728e2..2c92cfb85e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -65,6 +65,7 @@ import org.apache.qpid.jms.Connection; import org.apache.qpid.jms.ConnectionListener; import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.jms.FailoverPolicy; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.url.URLSyntaxException; public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable @@ -96,8 +97,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private AMQProtocolHandler _protocolHandler; /** Maps from session id (Integer) to AMQSession instance */ - private final Map<Integer,AMQSession> _sessions = new LinkedHashMap<Integer,AMQSession>(); - + private final Map<Integer, AMQSession> _sessions = new LinkedHashMap<Integer, AMQSession>(); private String _clientName; @@ -225,6 +225,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect this(new AMQConnectionURL(connection), sslConfig); } + /** + * @todo Some horrible stuff going on here with setting exceptions to be non-null to detect if an exception + * was thrown during the connection! Intention not clear. Use a flag anyway, not exceptions... Will fix soon. + */ public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException { if (_logger.isInfoEnabled()) @@ -321,16 +325,20 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect message = "Unable to Connect"; } - AMQException e = new AMQConnectionFailureException(message); + AMQException e = new AMQConnectionFailureException(message, null); if (lastException != null) { if (lastException instanceof UnresolvedAddressException) { - e = new AMQUnresolvedAddressException(message, _failoverPolicy.getCurrentBrokerDetails().toString()); + e = new AMQUnresolvedAddressException(message, _failoverPolicy.getCurrentBrokerDetails().toString(), + null); } - e.initCause(lastException); + if (e.getCause() != null) + { + e.initCause(lastException); + } } throw e; @@ -509,7 +517,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect AMQSession session = new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh, prefetchLow); - //_protocolHandler.addSessionByChannel(channelId, session); + // _protocolHandler.addSessionByChannel(channelId, session); registerSession(channelId, session); boolean success = false; @@ -590,7 +598,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect catch (AMQException e) { deregisterSession(channelId); - throw new AMQException("Error reopening channel " + channelId + " after failover: " + e, e); + throw new AMQException(null, "Error reopening channel " + channelId + " after failover: " + e, e); } } @@ -1047,7 +1055,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect */ public void exceptionReceived(Throwable cause) { - if (_logger.isDebugEnabled()) { _logger.debug("exceptionReceived done by:" + Thread.currentThread().getName(), cause); @@ -1060,11 +1067,18 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } else { + AMQConstant code = null; + if (cause instanceof AMQException) { + code = ((AMQException) cause).getErrorCode(); + } + + if (code != null) + { je = - new JMSException(Integer.toString(((AMQException) cause).getErrorCode().getCode()), - "Exception thrown against " + toString() + ": " + cause); + new JMSException(Integer.toString(code.getCode()), "Exception thrown against " + toString() + ": " + + cause); } else { @@ -1135,7 +1149,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect for (Iterator it = sessions.iterator(); it.hasNext();) { AMQSession s = (AMQSession) it.next(); - //_protocolHandler.addSessionByChannel(s.getChannelId(), s); + // _protocolHandler.addSessionByChannel(s.getChannelId(), s); reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted()); s.resubscribe(); } @@ -1223,7 +1237,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _taskPool.execute(task); } - public AMQSession getSession(int channelId) { return _sessions.get(channelId); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQNoConsumersException.java b/java/client/src/main/java/org/apache/qpid/client/AMQNoConsumersException.java index 54d5a0426f..08867b5de7 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQNoConsumersException.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQNoConsumersException.java @@ -33,8 +33,8 @@ import org.apache.qpid.protocol.AMQConstant; */ public class AMQNoConsumersException extends AMQUndeliveredException { - public AMQNoConsumersException(String msg, Object bounced) + public AMQNoConsumersException(String msg, Object bounced, Throwable cause) { - super(AMQConstant.NO_CONSUMERS, msg, bounced); + super(AMQConstant.NO_CONSUMERS, msg, bounced, cause); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQNoRouteException.java b/java/client/src/main/java/org/apache/qpid/client/AMQNoRouteException.java index a314101acf..42ed9c3df7 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQNoRouteException.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQNoRouteException.java @@ -33,8 +33,8 @@ import org.apache.qpid.protocol.AMQConstant; */ public class AMQNoRouteException extends AMQUndeliveredException { - public AMQNoRouteException(String msg, Object bounced) + public AMQNoRouteException(String msg, Object bounced, Throwable cause) { - super(AMQConstant.NO_ROUTE, msg, bounced); + super(AMQConstant.NO_ROUTE, msg, bounced, cause); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index b7615c5b7b..8796a225ba 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -781,7 +781,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } else { - amqe = new AMQException("Closing session forcibly", e); + amqe = new AMQException(null, "Closing session forcibly", e); } _connection.deregisterSession(_channelId); closeProducersAndConsumers(amqe); @@ -1928,15 +1928,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi //@TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. if (errorCode == AMQConstant.NO_CONSUMERS) { - _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage)); + _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null)); } else if (errorCode == AMQConstant.NO_ROUTE) { - _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage)); + _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null)); } else { - _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage)); + _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage, null)); } } @@ -2118,7 +2118,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } catch (JMSException e) //thrown by getMessageSelector { - throw new AMQException(e.getMessage(), e); + throw new AMQException(null, e.getMessage(), e); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java index 844ecbe743..00eac7f2af 100644 --- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java @@ -93,11 +93,11 @@ public class FailoverHandler implements Runnable _amqProtocolHandler.setStateManager(existingStateManager); if (_host != null) { - _amqProtocolHandler.getConnection().exceptionReceived(new AMQDisconnectedException("Redirect was vetoed by client")); + _amqProtocolHandler.getConnection().exceptionReceived(new AMQDisconnectedException("Redirect was vetoed by client", null)); } else { - _amqProtocolHandler.getConnection().exceptionReceived(new AMQDisconnectedException("Failover was vetoed by client")); + _amqProtocolHandler.getConnection().exceptionReceived(new AMQDisconnectedException("Failover was vetoed by client", null)); } _amqProtocolHandler.getFailoverLatch().countDown(); _amqProtocolHandler.setFailoverLatch(null); @@ -124,7 +124,7 @@ public class FailoverHandler implements Runnable _amqProtocolHandler.setStateManager(existingStateManager); _amqProtocolHandler.getConnection().exceptionReceived( new AMQDisconnectedException("Server closed connection and no failover " + - "was successful")); + "was successful", null)); } else { diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java index f62baf2c3a..fbf4d96647 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java @@ -70,27 +70,27 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener } if (errorCode == AMQConstant.NO_CONSUMERS) { - throw new AMQNoConsumersException("Error: " + reason, null); + throw new AMQNoConsumersException("Error: " + reason, null, null); } else if (errorCode == AMQConstant.NO_ROUTE) { - throw new AMQNoRouteException("Error: " + reason, null); + throw new AMQNoRouteException("Error: " + reason, null, null); } else if (errorCode == AMQConstant.INVALID_ARGUMENT) { _logger.debug("Broker responded with Invalid Argument."); - throw new org.apache.qpid.AMQInvalidArgumentException(String.valueOf(reason)); + throw new org.apache.qpid.AMQInvalidArgumentException(String.valueOf(reason), null); } else if (errorCode == AMQConstant.INVALID_ROUTING_KEY) { _logger.debug("Broker responded with Invalid Routing Key."); - throw new AMQInvalidRoutingKeyException(String.valueOf(reason)); + throw new AMQInvalidRoutingKeyException(String.valueOf(reason), null); } else { - throw new AMQChannelClosedException(errorCode, "Error: " + reason); + throw new AMQChannelClosedException(errorCode, "Error: " + reason, null); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java index 9c8e9188ec..d8153f9c97 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java @@ -77,14 +77,14 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener //todo this is a bit of a fudge (could be conssidered such as each new connection needs a new state manager or at least a fresh state. stateManager.changeState(AMQState.CONNECTION_NOT_STARTED); - throw new AMQAuthenticationException(errorCode, reason == null ? null : reason.toString()); + throw new AMQAuthenticationException(errorCode, reason == null ? null : reason.toString(), null); } else { _logger.info("Connection close received with error code " + errorCode); - throw new AMQConnectionClosedException(errorCode, "Error: " + reason); + throw new AMQConnectionClosedException(errorCode, "Error: " + reason, null); } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java index ab6acffeaf..b7776705fe 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java @@ -46,7 +46,7 @@ public class ConnectionSecureMethodHandler implements StateAwareMethodListener SaslClient client = protocolSession.getSaslClient(); if (client == null) { - throw new AMQException("No SASL client set up - cannot proceed with authentication"); + throw new AMQException(null, "No SASL client set up - cannot proceed with authentication", null); } ConnectionSecureBody body = (ConnectionSecureBody) evt.getMethod(); @@ -65,7 +65,7 @@ public class ConnectionSecureMethodHandler implements StateAwareMethodListener } catch (SaslException e) { - throw new AMQException("Error processing SASL challenge: " + e, e); + throw new AMQException(null, "Error processing SASL challenge: " + e, e); } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java index 28c0c4f3c9..157128aebc 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java @@ -92,7 +92,7 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener if (body.mechanisms == null) { - throw new AMQException("mechanism not specified in ConnectionStart method frame"); + throw new AMQException(null, "mechanism not specified in ConnectionStart method frame", null); } else { @@ -102,7 +102,7 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener if (mechanism == null) { - throw new AMQException("No supported security mechanism found, passed: " + new String(body.mechanisms)); + throw new AMQException(null, "No supported security mechanism found, passed: " + new String(body.mechanisms), null); } byte[] saslResponse; @@ -113,10 +113,9 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener createCallbackHandler(mechanism, protocolSession)); if (sc == null) { - throw new AMQException( - "Client SASL configuration error: no SaslClient could be created for mechanism " + mechanism + throw new AMQException(null, "Client SASL configuration error: no SaslClient could be created for mechanism " + mechanism + ". Please ensure all factories are registered. See DynamicSaslRegistrar for " - + " details of how to register non-standard SASL client providers."); + + " details of how to register non-standard SASL client providers.", null); } protocolSession.setSaslClient(sc); @@ -125,12 +124,12 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener catch (SaslException e) { protocolSession.setSaslClient(null); - throw new AMQException("Unable to create SASL client: " + e, e); + throw new AMQException(null, "Unable to create SASL client: " + e, e); } if (body.locales == null) { - throw new AMQException("Locales is not defined in Connection Start method"); + throw new AMQException(null, "Locales is not defined in Connection Start method", null); } final String locales = new String(body.locales, "utf8"); @@ -142,7 +141,7 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener } else { - throw new AMQException("No locales sent from server, passed: " + locales); + throw new AMQException(null, "No locales sent from server, passed: " + locales, null); } stateManager.changeState(AMQState.CONNECTION_NOT_TUNED); @@ -170,7 +169,7 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener } catch (UnsupportedEncodingException e) { - throw new AMQException("Unable to decode data: " + e, e); + throw new AMQException(null, "Unable to decode data: " + e, e); } } else @@ -235,7 +234,7 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener } catch (Exception e) { - throw new AMQException("Unable to create callback handler: " + e, e); + throw new AMQException(null, "Unable to create callback handler: " + e, e); } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java index 63b16ebbd6..ae64ac987e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java @@ -67,7 +67,7 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm } catch (JMSException je) { - throw new AMQException("Error populating MapMessage from ByteBuffer", je); + throw new AMQException(null, "Error populating MapMessage from ByteBuffer", je); } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java index c2015f9e7c..02a8544b52 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java @@ -99,7 +99,7 @@ public class MessageFactoryRegistry MessageFactory mf = _mimeShortStringToFactoryMap.get(contentTypeShortString); if (mf == null) { - throw new AMQException("Unsupport MIME type of " + properties.getContentTypeAsString()); + throw new AMQException(null, "Unsupport MIME type of " + properties.getContentTypeAsString(), null); } else { @@ -117,7 +117,7 @@ public class MessageFactoryRegistry MessageFactory mf = _mimeStringToFactoryMap.get(mimeType); if (mf == null) { - throw new AMQException("Unsupport MIME type of " + mimeType); + throw new AMQException(null, "Unsupport MIME type of " + mimeType, null); } else { diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnexpectedBodyReceivedException.java b/java/client/src/main/java/org/apache/qpid/client/message/UnexpectedBodyReceivedException.java deleted file mode 100644 index 1f61a661d4..0000000000 --- a/java/client/src/main/java/org/apache/qpid/client/message/UnexpectedBodyReceivedException.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.message; - -import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQConstant; - -/** - * @todo Not used! Delete! - */ -public class UnexpectedBodyReceivedException extends AMQException -{ - public UnexpectedBodyReceivedException(String msg, Throwable t) - { - super(msg, t); - } - - public UnexpectedBodyReceivedException(String msg) - { - super(msg); - } - - public UnexpectedBodyReceivedException(AMQConstant errorCode, String msg) - { - super(errorCode, msg); - } -} diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index addef94215..5687ad2658 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -195,7 +195,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter _logger.info("sessionClose() not allowed to failover"); _connection.exceptionReceived( new AMQDisconnectedException("Server closed connection and reconnection " + - "not permitted.")); + "not permitted.", null)); } else { @@ -263,7 +263,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter _logger.error("Exception caught by protocol handler: " + cause, cause); // we notify the state manager of the error in case we have any clients waiting on a state // change. Those "waiters" will be interrupted and can handle the exception - AMQException amqe = new AMQException("Protocol handler error: " + cause, cause); + AMQException amqe = new AMQException(null, "Protocol handler error: " + cause, cause); propagateExceptionToWaiters(amqe); _connection.exceptionReceived(cause); } @@ -334,7 +334,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter } if (!wasAnyoneInterested) { - throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + _frameListeners); + throw new AMQException(null, "AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + _frameListeners, null); } } catch (AMQException e) diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 386aae4ad1..f691637cdc 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -238,13 +238,12 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId); if (msg == null) { - throw new AMQException("Error: received content header without having received a BasicDeliver frame first"); + throw new AMQException(null, "Error: received content header without having received a BasicDeliver frame first", null); } if (msg.getContentHeader() != null) { - throw new AMQException( - "Error: received duplicate content header or did not receive correct number of content body frames"); + throw new AMQException(null, "Error: received duplicate content header or did not receive correct number of content body frames", null); } msg.setContentHeader(contentHeader); @@ -259,13 +258,13 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId); if (msg == null) { - throw new AMQException("Error: received content body without having received a JMSDeliver frame first"); + throw new AMQException(null, "Error: received content body without having received a JMSDeliver frame first", null); } if (msg.getContentHeader() == null) { _channelId2UnprocessedMsgMap.remove(channelId); - throw new AMQException("Error: received content body without having received a ContentHeader frame first"); + throw new AMQException(null, "Error: received content body without having received a ContentHeader frame first", null); } /*try @@ -365,11 +364,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession final AMQSession session = getSession(channelId); try { - session.closed(new AMQException(code, text)); + session.closed(new AMQException(code, text, null)); } catch (JMSException e) { - throw new AMQException("JMSException received while closing session", e); + throw new AMQException(null, "JMSException received while closing session", e); } return true; diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java index 85f98eab69..4691d48f29 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java @@ -109,7 +109,7 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener _lock.wait(timeout); if (!_ready) { - _error = new AMQTimeoutException("Server did not respond in a timely fashion"); + _error = new AMQTimeoutException("Server did not respond in a timely fashion", null); _ready = true; } } @@ -138,7 +138,7 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener } else { - throw new AMQException("Woken up due to " + _error.getClass(), _error); + throw new AMQException(null, "Woken up due to " + _error.getClass(), _error); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index 0f43115841..c995bf40da 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -248,7 +248,7 @@ public class AMQStateManager implements AMQMethodListener if (_currentState != s) { _logger.warn("State not achieved within permitted time. Current state " + _currentState + ", desired state: " + s); - throw new AMQException("State not achieved within permitted time. Current state " + _currentState + ", desired state: " + s); + throw new AMQException(null, "State not achieved within permitted time. Current state " + _currentState + ", desired state: " + s, null); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/state/IllegalStateTransitionException.java b/java/client/src/main/java/org/apache/qpid/client/state/IllegalStateTransitionException.java deleted file mode 100644 index 41fa1ba704..0000000000 --- a/java/client/src/main/java/org/apache/qpid/client/state/IllegalStateTransitionException.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.state; - -import org.apache.qpid.AMQException; - -/** - * @todo Not an AMQP exception as no status code. - * - * @todo Not used! Delete. - */ -public class IllegalStateTransitionException extends AMQException -{ - private AMQState _originalState; - - private Class _frame; - - public IllegalStateTransitionException(AMQState originalState, Class frame) - { - super("No valid state transition defined for receiving frame " + frame + - " from state " + originalState); - _originalState = originalState; - _frame = frame; - } - - public AMQState getOriginalState() - { - return _originalState; - } - - public Class getFrameClass() - { - return _frame; - } -} diff --git a/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java b/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java index 8a0b5e7d84..1fd657c5fb 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java @@ -76,7 +76,7 @@ public class StateWaiter implements StateListener } else { - throw new AMQException("Error: " + _throwable, _throwable); // FIXME: this will wrap FailoverException in throwable which will prevent it being caught. + throw new AMQException(null, "Error: " + _throwable, _throwable); // FIXME: this will wrap FailoverException in throwable which will prevent it being caught. } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java b/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java index da16baaad9..6e47e2ce28 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java @@ -31,19 +31,16 @@ import org.apache.qpid.jms.BrokerDetails; * <tr><th> Responsibilities <th> Collaborations * <tr><td> Represent absence of a transport medium. * </table> + * + * @todo Error code never used. This is not an AMQException. */ public class AMQNoTransportForProtocolException extends AMQTransportConnectionException { BrokerDetails _details; - public AMQNoTransportForProtocolException(BrokerDetails details) - { - this(details, "No Transport exists for specified broker protocol"); - } - - public AMQNoTransportForProtocolException(BrokerDetails details, String message) + public AMQNoTransportForProtocolException(BrokerDetails details, String message, Throwable cause) { - super(null, message, null); + super(null, message, cause); _details = details; } diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java b/java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java index 24b4e03b39..6bef6216bd 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java @@ -31,6 +31,8 @@ import org.apache.qpid.protocol.AMQConstant; * <tr><th> Responsibilities <th> Collaborations * <tr><td> Represent failure to connect through the transport medium. * </table> + * + * @todo Error code never used. This is not an AMQException. */ public class AMQTransportConnectionException extends AMQException { diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 0bc83e9804..b9193ce14e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -67,7 +67,7 @@ public class TransportConnection if (transport == -1) { - throw new AMQNoTransportForProtocolException(details); + throw new AMQNoTransportForProtocolException(details, null, null); } if (transport == _currentInstance) diff --git a/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java b/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java index 1791e7ede3..1818132be0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java +++ b/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java @@ -30,21 +30,13 @@ import org.apache.qpid.protocol.AMQConstant; * <tr><th> Responsibilities <th> Collaborations * <tr><td> Represent failure to create an in VM broker. * </table> + * + * @todo Error code never used. This is not an AMQException. */ public class AMQVMBrokerCreationException extends AMQTransportConnectionException { private int _port; - /** - * @param port - * - * @deprecated - */ - public AMQVMBrokerCreationException(int port) - { - this(null, port, "Unable to create vm broker", null); - } - public AMQVMBrokerCreationException(AMQConstant errorCode, int port, String message, Throwable cause) { super(errorCode, message, cause); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java index 5e45d1d537..4a114321aa 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java @@ -67,27 +67,27 @@ public class ChannelCloseMethodHandlerNoCloseOk implements StateAwareMethodListe _logger.error("Channel close received with errorCode " + errorCode + ", and reason " + reason); if (errorCode == AMQConstant.NO_CONSUMERS) { - throw new AMQNoConsumersException("Error: " + reason, null); + throw new AMQNoConsumersException("Error: " + reason, null, null); } else if (errorCode == AMQConstant.NO_ROUTE) { - throw new AMQNoRouteException("Error: " + reason, null); + throw new AMQNoRouteException("Error: " + reason, null, null); } else if (errorCode == AMQConstant.INVALID_ARGUMENT) { _logger.debug("Broker responded with Invalid Argument."); - throw new AMQInvalidArgumentException(String.valueOf(reason)); + throw new AMQInvalidArgumentException(String.valueOf(reason), null); } else if (errorCode == AMQConstant.INVALID_ROUTING_KEY) { _logger.debug("Broker responded with Invalid Routing Key."); - throw new AMQInvalidRoutingKeyException(String.valueOf(reason)); + throw new AMQInvalidRoutingKeyException(String.valueOf(reason), null); } else { - throw new AMQChannelClosedException(errorCode, "Error: " + reason); + throw new AMQChannelClosedException(errorCode, "Error: " + reason, null); } } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQConnectionWaitException.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQConnectionWaitException.java index 2baaa344ef..a8a505294e 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQConnectionWaitException.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQConnectionWaitException.java @@ -34,9 +34,8 @@ import org.apache.qpid.AMQException; */
public class AMQConnectionWaitException extends AMQException
{
- public AMQConnectionWaitException(String s, Throwable e)
+ public AMQConnectionWaitException(String s, Throwable cause)
{
- super(s, e);
-
+ super(null, s, cause);
}
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedBodyTypeException.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedBodyTypeException.java index 951bd22df0..00f3ddc395 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedBodyTypeException.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedBodyTypeException.java @@ -39,8 +39,9 @@ import org.apache.qpid.framing.AMQBody; */
public class AMQUnexpectedBodyTypeException extends AMQException
{
- public AMQUnexpectedBodyTypeException(Class<? extends AMQBody> expectedClass, AMQBody body)
+ public AMQUnexpectedBodyTypeException(Class<? extends AMQBody> expectedClass, AMQBody body, Throwable cause)
{
- super("Unexpected body type. Expected: " + expectedClass.getName() + "; got: " + body.getClass().getName());
+ super(null, "Unexpected body type. Expected: " + expectedClass.getName() + "; got: " + body.getClass().getName(),
+ cause);
}
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedFrameTypeException.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedFrameTypeException.java index 4dd318f90d..11096ccf7e 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedFrameTypeException.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedFrameTypeException.java @@ -38,8 +38,8 @@ import org.apache.qpid.AMQException; */
public class AMQUnexpectedFrameTypeException extends AMQException
{
- public AMQUnexpectedFrameTypeException(String s)
+ public AMQUnexpectedFrameTypeException(String s, Throwable cause)
{
- super(s);
+ super(null, s, cause);
}
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java index ee5aa48db9..480a6f3603 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java @@ -107,7 +107,7 @@ public class ClusteredProtocolHandler extends AMQPFastProtocolHandler implements buffer(session, msg); break; default: - throw new AMQException("Received message while in state: " + state); + throw new AMQException(null, "Received message while in state: " + state, null); } JoinState latest = _groupMgr.getState(); if (!latest.equals(state)) diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java index 2f473b63fb..a9a7a55128 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java @@ -176,7 +176,7 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, } catch (Exception e) { - throw new AMQException("Could not connect to leader: " + e, e); + throw new AMQException(null, "Could not connect to leader: " + e, e); } } @@ -259,7 +259,7 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, catch (Exception e) { e.printStackTrace(); - throw new AMQException("Could not connect to prospect: " + e, e); + throw new AMQException(null, "Could not connect to prospect: " + e, e); } } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java index b01ec491ec..6529c7f3e2 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java @@ -207,7 +207,7 @@ public class MinaBrokerProxy extends Broker implements MethodHandler } else { - throw new AMQUnexpectedBodyTypeException(AMQMethodBody.class, body); + throw new AMQUnexpectedBodyTypeException(AMQMethodBody.class, body, null); } } @@ -260,7 +260,7 @@ public class MinaBrokerProxy extends Broker implements MethodHandler } else { - throw new AMQUnexpectedFrameTypeException("Received message of unrecognised type: " + object); + throw new AMQUnexpectedFrameTypeException("Received message of unrecognised type: " + object, null); } } diff --git a/java/common/src/main/java/org/apache/qpid/AMQChannelClosedException.java b/java/common/src/main/java/org/apache/qpid/AMQChannelClosedException.java index 251e91c1b9..1b2eabdc86 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQChannelClosedException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQChannelClosedException.java @@ -34,8 +34,8 @@ import org.apache.qpid.protocol.AMQConstant; */ public class AMQChannelClosedException extends AMQException { - public AMQChannelClosedException(AMQConstant errorCode, String msg) + public AMQChannelClosedException(AMQConstant errorCode, String msg, Throwable cause) { - super(errorCode, msg); + super(errorCode, msg, cause); } } diff --git a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java index 9efd271e4d..9d8672f433 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -39,22 +39,17 @@ public class AMQChannelException extends AMQException { private final int _classId; private final int _methodId; - /* AMQP version for which exception ocurred */ + + /** AMQP version for which exception ocurred, major code. */ private final byte major; - private final byte minor; - public AMQChannelException(AMQConstant errorCode, String msg, int classId, int methodId, byte major, byte minor, Throwable t) - { - super(errorCode, msg, t); - _classId = classId; - _methodId = methodId; - this.major = major; - this.minor = minor; - } + /** AMQP version for which exception ocurred, minor code. */ + private final byte minor; - public AMQChannelException(AMQConstant errorCode, String msg, int classId, int methodId, byte major, byte minor) + public AMQChannelException(AMQConstant errorCode, String msg, int classId, int methodId, byte major, byte minor, + Throwable cause) { - super(errorCode, msg); + super(errorCode, msg, cause); _classId = classId; _methodId = methodId; this.major = major; @@ -63,6 +58,7 @@ public class AMQChannelException extends AMQException public AMQFrame getCloseFrame(int channel) { - return ChannelCloseBody.createAMQFrame(channel, major, minor, _classId, _methodId, getErrorCode().getCode(), new AMQShortString(getMessage())); + return ChannelCloseBody.createAMQFrame(channel, major, minor, _classId, _methodId, getErrorCode().getCode(), + new AMQShortString(getMessage())); } } diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java index 931c6cd87a..e95e805e9f 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java @@ -34,8 +34,8 @@ import org.apache.qpid.protocol.AMQConstant; */ public class AMQConnectionClosedException extends AMQException { - public AMQConnectionClosedException(AMQConstant errorCode, String msg) + public AMQConnectionClosedException(AMQConstant errorCode, String msg, Throwable cause) { - super(errorCode, msg); + super(errorCode, msg, cause); } } diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java index 7edfa648ed..ba9f69a05c 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java @@ -40,24 +40,19 @@ public class AMQConnectionException extends AMQException { private final int _classId; private final int _methodId; - /* AMQP version for which exception ocurred */ + + /** AMQP version for which exception ocurred, major code. */ private final byte major; + + /** AMQP version for which exception ocurred, minor code. */ private final byte minor; + boolean _closeConnetion; public AMQConnectionException(AMQConstant errorCode, String msg, int classId, int methodId, byte major, byte minor, - Throwable t) + Throwable cause) { - super(errorCode, msg, t); - _classId = classId; - _methodId = methodId; - this.major = major; - this.minor = minor; - } - - public AMQConnectionException(AMQConstant errorCode, String msg, int classId, int methodId, byte major, byte minor) - { - super(errorCode, msg); + super(errorCode, msg, cause); _classId = classId; _methodId = methodId; this.major = major; @@ -69,5 +64,4 @@ public class AMQConnectionException extends AMQException return ConnectionCloseBody.createAMQFrame(channel, major, minor, _classId, _methodId, getErrorCode().getCode(), new AMQShortString(getMessage())); } - } diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java index 72fa2ae984..c043a00836 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java @@ -12,8 +12,8 @@ package org.apache.qpid; */
public class AMQConnectionFailureException extends AMQException
{
- public AMQConnectionFailureException(String message)
+ public AMQConnectionFailureException(String message, Throwable cause)
{
- super(message);
+ super(null, message, cause);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/AMQDisconnectedException.java b/java/common/src/main/java/org/apache/qpid/AMQDisconnectedException.java index e62b2c10a2..5ec5c42ab9 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQDisconnectedException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQDisconnectedException.java @@ -32,8 +32,8 @@ package org.apache.qpid; */ public class AMQDisconnectedException extends AMQException { - public AMQDisconnectedException(String msg) + public AMQDisconnectedException(String msg, Throwable cause) { - super(msg); + super(null, msg, cause); } } diff --git a/java/common/src/main/java/org/apache/qpid/AMQException.java b/java/common/src/main/java/org/apache/qpid/AMQException.java index 41599ed880..6cbb98fd86 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQException.java @@ -44,48 +44,11 @@ public class AMQException extends Exception * * @param errorCode The error code. May be null if not to be set. * @param msg The exception message. May be null if not to be set. - * @param t The underlying cause of the exception. May be null if not to be set. + * @param cause The underlying cause of the exception. May be null if not to be set. */ - public AMQException(AMQConstant errorCode, String msg, Throwable t) + public AMQException(AMQConstant errorCode, String msg, Throwable cause) { - super(((msg == null) ? "" : msg) + ((errorCode == null) ? "" : (" [error code " + errorCode + "]")), t); - _errorCode = errorCode; - } - - /** - * @param message - * - * @deprecated Use {@link #AMQException(org.apache.qpid.protocol.AMQConstant, String, Throwable)} instead. - */ - public AMQException(String message) - { - super(message); - // fixme This method needs removed and all AMQExceptions need a valid error code - _errorCode = AMQConstant.getConstant(-1); - } - - /** - * @param msg - * @param t - * - * @deprecated Use {@link #AMQException(org.apache.qpid.protocol.AMQConstant, String, Throwable)} instead. - */ - public AMQException(String msg, Throwable t) - { - super(msg, t); - // fixme This method needs removed and all AMQExceptions need a valid error code - _errorCode = AMQConstant.getConstant(-1); - } - - /** - * @param errorCode - * @param msg - * - * @deprecated Use {@link #AMQException(org.apache.qpid.protocol.AMQConstant, String, Throwable)} instead. - */ - public AMQException(AMQConstant errorCode, String msg) - { - super(msg + " [error code " + errorCode + ']'); + super(((msg == null) ? "" : msg) + ((errorCode == null) ? "" : (" [error code " + errorCode + "]")), cause); _errorCode = errorCode; } diff --git a/java/common/src/main/java/org/apache/qpid/AMQInvalidArgumentException.java b/java/common/src/main/java/org/apache/qpid/AMQInvalidArgumentException.java index 278128f924..15c8bea0a4 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQInvalidArgumentException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQInvalidArgumentException.java @@ -32,8 +32,8 @@ import org.apache.qpid.protocol.AMQConstant; */ public class AMQInvalidArgumentException extends AMQException { - public AMQInvalidArgumentException(String message) + public AMQInvalidArgumentException(String message, Throwable cause) { - super(AMQConstant.INVALID_ARGUMENT, message); + super(AMQConstant.INVALID_ARGUMENT, message, cause); } } diff --git a/java/common/src/main/java/org/apache/qpid/AMQInvalidRoutingKeyException.java b/java/common/src/main/java/org/apache/qpid/AMQInvalidRoutingKeyException.java index b5ec9845d6..c117968a29 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQInvalidRoutingKeyException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQInvalidRoutingKeyException.java @@ -32,8 +32,8 @@ import org.apache.qpid.protocol.AMQConstant; */ public class AMQInvalidRoutingKeyException extends AMQException { - public AMQInvalidRoutingKeyException(String message) + public AMQInvalidRoutingKeyException(String message, Throwable cause) { - super(AMQConstant.INVALID_ROUTING_KEY, message); + super(AMQConstant.INVALID_ROUTING_KEY, message, cause); } } diff --git a/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java b/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java index 0f8d9c47db..4ae8282af5 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java @@ -32,8 +32,8 @@ import org.apache.qpid.protocol.AMQConstant; */ public class AMQTimeoutException extends AMQException { - public AMQTimeoutException(String message) + public AMQTimeoutException(String message, Throwable cause) { - super(AMQConstant.REQUEST_TIMEOUT, message); + super(AMQConstant.REQUEST_TIMEOUT, message, cause); } } diff --git a/java/common/src/main/java/org/apache/qpid/AMQUndeliveredException.java b/java/common/src/main/java/org/apache/qpid/AMQUndeliveredException.java index 03220cc95e..1502c0efc5 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQUndeliveredException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQUndeliveredException.java @@ -34,9 +34,9 @@ public class AMQUndeliveredException extends AMQException { private Object _bounced; - public AMQUndeliveredException(AMQConstant errorCode, String msg, Object bounced) + public AMQUndeliveredException(AMQConstant errorCode, String msg, Object bounced, Throwable cause) { - super(errorCode, msg); + super(errorCode, msg, cause); _bounced = bounced; } diff --git a/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java b/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java index c4aa992a01..f483b9947b 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java +++ b/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java @@ -15,8 +15,8 @@ package org.apache.qpid; */
public class AMQUnknownExchangeType extends AMQException
{
- public AMQUnknownExchangeType(String message)
+ public AMQUnknownExchangeType(String message, Throwable cause)
{
- super(message);
+ super(null, message, cause);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/AMQUnresolvedAddressException.java b/java/common/src/main/java/org/apache/qpid/AMQUnresolvedAddressException.java index 6cc9c3fe00..eee3e6afcf 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQUnresolvedAddressException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQUnresolvedAddressException.java @@ -37,9 +37,9 @@ public class AMQUnresolvedAddressException extends AMQException { String _broker; - public AMQUnresolvedAddressException(String message, String broker) + public AMQUnresolvedAddressException(String message, String broker, Throwable cause) { - super(message); + super(null, message, cause); _broker = broker; } diff --git a/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java b/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java index 1e5cc57fff..73a336321c 100644 --- a/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java +++ b/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java @@ -36,25 +36,8 @@ import org.apache.qpid.protocol.AMQConstant; */ public class PropertyException extends AMQException { - public PropertyException(String message) + public PropertyException(String message, Throwable cause) { - super(message); + super(null, message, cause); } - - /* - public PropertyException(String msg, Throwable t) - { - super(msg, t); - } - - public PropertyException(AMQConstant errorCode, String msg, Throwable t) - { - super(errorCode, msg, t); - } - - public PropertyException(AMQConstant errorCode, String msg) - { - super(errorCode, msg); - } - */ } diff --git a/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java b/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java index b3c310d23c..6e2b25fb2c 100644 --- a/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java +++ b/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java @@ -80,7 +80,7 @@ public class PropertyUtils if (replacement == null) { - throw new PropertyException("Property ${" + propertyName + "} has not been set"); + throw new PropertyException("Property ${" + propertyName + "} has not been set", null); } fragment = replacement; @@ -145,7 +145,7 @@ public class PropertyUtils int endName = value.indexOf('}', pos); if (endName < 0) { - throw new PropertyException("Syntax error in property: " + value); + throw new PropertyException("Syntax error in property: " + value, null); } String propertyName = value.substring(pos + 2, endName); diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java b/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java index cd5ccf8e04..843b6a1e8c 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java @@ -34,8 +34,8 @@ import org.apache.qpid.protocol.AMQConstant; */ public class AMQFrameDecodingException extends AMQException { - public AMQFrameDecodingException(AMQConstant errorCode, String message, Throwable t) + public AMQFrameDecodingException(AMQConstant errorCode, String message, Throwable cause) { - super(errorCode, message, t); + super(errorCode, message, cause); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java index 23a1ce367e..0982847aac 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java @@ -111,7 +111,7 @@ public abstract class AMQMethodBody extends AMQBody public AMQChannelException getChannelException(AMQConstant code, String message)
{
- return new AMQChannelException(code, message, getClazz(), getMethod(), major, minor);
+ return new AMQChannelException(code, message, getClazz(), getMethod(), major, minor, null);
}
public AMQChannelException getChannelException(AMQConstant code, String message, Throwable cause)
@@ -121,7 +121,7 @@ public abstract class AMQMethodBody extends AMQBody public AMQConnectionException getConnectionException(AMQConstant code, String message)
{
- return new AMQConnectionException(code, message, getClazz(), getMethod(), major, minor);
+ return new AMQConnectionException(code, message, getClazz(), getMethod(), major, minor, null);
}
public AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause)
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolClassException.java b/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolClassException.java index e48fd2e7f9..ab09c1de6d 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolClassException.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolClassException.java @@ -32,8 +32,8 @@ package org.apache.qpid.framing; */ public class AMQProtocolClassException extends AMQProtocolHeaderException { - public AMQProtocolClassException(String message) + public AMQProtocolClassException(String message, Throwable cause) { - super(message); + super(message, cause); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java b/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java index 1ce49aba83..6b819364da 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java @@ -34,8 +34,8 @@ import org.apache.qpid.AMQException; */ public class AMQProtocolHeaderException extends AMQException { - public AMQProtocolHeaderException(String message) + public AMQProtocolHeaderException(String message, Throwable cause) { - super(message); + super(null, message, cause); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolInstanceException.java b/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolInstanceException.java index 9049eace2a..3165c373a9 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolInstanceException.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolInstanceException.java @@ -32,8 +32,8 @@ package org.apache.qpid.framing; */ public class AMQProtocolInstanceException extends AMQProtocolHeaderException { - public AMQProtocolInstanceException(String message) + public AMQProtocolInstanceException(String message, Throwable cause) { - super(message); + super(message, cause); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolVersionException.java b/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolVersionException.java index 9074931617..c9b0973ea6 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolVersionException.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolVersionException.java @@ -32,8 +32,8 @@ package org.apache.qpid.framing; */ public class AMQProtocolVersionException extends AMQProtocolHeaderException { - public AMQProtocolVersionException(String message) + public AMQProtocolVersionException(String message, Throwable cause) { - super(message); + super(message, cause); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java index 8b40fe72eb..4c253b9973 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java @@ -144,7 +144,7 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData if(_protocolHeader.length != 4) { - throw new AMQProtocolHeaderException("Protocol header should have exactly four octets"); + throw new AMQProtocolHeaderException("Protocol header should have exactly four octets", null); } for(int i = 0; i < 4; i++) { @@ -152,7 +152,7 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData { try { - throw new AMQProtocolHeaderException("Protocol header is not correct: Got " + new String(_protocolHeader,"ISO-8859-1") + " should be: " + new String(AMQP_HEADER, "ISO-8859-1")); + throw new AMQProtocolHeaderException("Protocol header is not correct: Got " + new String(_protocolHeader,"ISO-8859-1") + " should be: " + new String(AMQP_HEADER, "ISO-8859-1"), null); } catch (UnsupportedEncodingException e) { @@ -163,12 +163,12 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData if (_protocolClass != CURRENT_PROTOCOL_CLASS) { throw new AMQProtocolClassException("Protocol class " + CURRENT_PROTOCOL_CLASS + " was expected; received " + - _protocolClass); + _protocolClass, null); } if (_protocolInstance != TCP_PROTOCOL_INSTANCE) { throw new AMQProtocolInstanceException("Protocol instance " + TCP_PROTOCOL_INSTANCE + " was expected; received " + - _protocolInstance); + _protocolInstance, null); } ProtocolVersion pv = new ProtocolVersion(_protocolMajor, _protocolMinor); @@ -178,7 +178,7 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData { // TODO: add list of available versions in list to msg... throw new AMQProtocolVersionException("Protocol version " + - _protocolMajor + "." + _protocolMinor + " not suppoerted by this version of the Qpid broker."); + _protocolMajor + "." + _protocolMinor + " not suppoerted by this version of the Qpid broker.", null); } } diff --git a/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java b/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java index 1d9e30c24e..58ea392306 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java @@ -268,7 +268,7 @@ public class TxnBufferTest extends TestCase { public void prepare() throws AMQException { - throw new AMQException("Fail!"); + throw new AMQException(null, "Fail!", null); } } |
