diff options
| author | Rupert Smith <rupertlssmith@apache.org> | 2007-05-21 15:11:23 +0000 |
|---|---|---|
| committer | Rupert Smith <rupertlssmith@apache.org> | 2007-05-21 15:11:23 +0000 |
| commit | 625e140b590838df60d603f42d552a9275aae2ca (patch) | |
| tree | 6b0acb350f3ead0da52b0301bfee74101d6bb7d4 /java/broker | |
| parent | 21d2df094acb8530b2fb902b5ed9a1d7db8463fd (diff) | |
| download | qpid-python-625e140b590838df60d603f42d552a9275aae2ca.tar.gz | |
Refactored exceptions to have single constructors and made room for wrapped causes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@540165 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
37 files changed, 456 insertions, 540 deletions
diff --git a/java/broker/src/main/grammar/SelectorParser.jj b/java/broker/src/main/grammar/SelectorParser.jj index adec1b348d..61638e5e26 100644 --- a/java/broker/src/main/grammar/SelectorParser.jj +++ b/java/broker/src/main/grammar/SelectorParser.jj @@ -94,7 +94,7 @@ public class SelectorParser { return this.JmsSelector();
}
catch (Throwable e) {
- throw (AMQInvalidArgumentException)new AMQInvalidArgumentException(sql).initCause(e);
+ throw new AMQInvalidArgumentException(sql, e);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 1de4d16ad4..5de59f47a3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -212,7 +212,7 @@ public class AMQChannel { if (_currentMessage == null) { - throw new AMQException("Received content header without previously receiving a BasicPublish frame"); + throw new AMQException(null, "Received content header without previously receiving a BasicPublish frame", null); } else { @@ -239,7 +239,7 @@ public class AMQChannel { if (_currentMessage == null) { - throw new AMQException("Received content body without previously receiving a JmsPublishBody"); + throw new AMQException(null, "Received content body without previously receiving a JmsPublishBody", null); } if (_log.isTraceEnabled()) @@ -883,7 +883,7 @@ public class AMQChannel { if (!isTransactional()) { - throw new AMQException("Fatal error: commit called on non-transactional channel"); + throw new AMQException(null, "Fatal error: commit called on non-transactional channel", null); } _txnContext.commit(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/ConsumerTagNotUniqueException.java b/java/broker/src/main/java/org/apache/qpid/server/ConsumerTagNotUniqueException.java index 9a98af5689..3253650d14 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ConsumerTagNotUniqueException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ConsumerTagNotUniqueException.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,6 +20,16 @@ */ package org.apache.qpid.server; -public class ConsumerTagNotUniqueException extends Exception -{ -} +/** + * ConsumerTagNotUniqueException indicates that a client has attempted to connect with a consumer tag that is already + * used. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Represents error when clients connects with a non-unique tag. + * </table> + * + * @todo Consider replacing with an AMQNotAllowedException, as this is the status code returned when this happens. + */ +public class ConsumerTagNotUniqueException extends Exception +{ } diff --git a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java index d61bb8916a..37c5f38ea3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java @@ -41,9 +41,9 @@ public abstract class RequiredDeliveryException extends AMQException { private final AMQMessage _amqMessage; - public RequiredDeliveryException(String message, AMQMessage payload) + public RequiredDeliveryException(String message, AMQMessage payload, Throwable cause) { - super(message); + super(null, message, cause); // Increment the reference as this message is in the routing phase // and so will have the ref decremented as routing fails. diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java index 30bbdea2ef..1604d94539 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java @@ -181,8 +181,8 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap if (unacked.getKey() > deliveryTag) { //This should not occur now. - throw new AMQException("UnacknowledgedMessageMap is out of order:" + unacked.getKey() + - " When deliveryTag is:" + deliveryTag + "ES:" + _map.entrySet().toString()); + throw new AMQException(null, "UnacknowledgedMessageMap is out of order:" + unacked.getKey() + + " When deliveryTag is:" + deliveryTag + "ES:" + _map.entrySet().toString(), null); } it.remove(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java index c349b44d6d..39a5bba8b7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java @@ -55,7 +55,7 @@ public class DefaultExchangeFactory implements ExchangeFactory if (exchClass == null) { - throw new AMQUnknownExchangeType("Unknown exchange type: " + type); + throw new AMQUnknownExchangeType("Unknown exchange type: " + type, null); } try { @@ -65,11 +65,11 @@ public class DefaultExchangeFactory implements ExchangeFactory } catch (InstantiationException e) { - throw new AMQException("Unable to create exchange: " + e, e); + throw new AMQException(null, "Unable to create exchange: " + e, e); } catch (IllegalAccessException e) { - throw new AMQException("Unable to create exchange: " + e, e); + throw new AMQException(null, "Unable to create exchange: " + e, e); } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java index 9066af70d9..f3bdecc32e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java @@ -71,7 +71,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry getMessageStore().createExchange(exchange); } catch (InternalErrorException e) { - throw new AMQException("problem registering excahgne " + exchange, e); + throw new AMQException(null, "problem registering excahgne " + exchange, e); } } } @@ -99,14 +99,14 @@ public class DefaultExchangeRegistry implements ExchangeRegistry getMessageStore().removeExchange(e); } catch (InternalErrorException e1) { - throw new AMQException("Problem unregistering Exchange " + name, e1); + throw new AMQException(null, "Problem unregistering Exchange " + name, e1); } } e.close(); } else { - throw new AMQException("Unknown exchange " + name); + throw new AMQException(null, "Unknown exchange " + name, null); } } @@ -138,7 +138,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry // TODO: check where the exchange is validated if (exch == null) { - throw new AMQException("Exchange '" + exchange + "' does not exist"); + throw new AMQException(null, "Exchange '" + exchange + "' does not exist", null); } exch.route(payload); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java index ab103fbd2a..01242f90de 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java @@ -126,7 +126,7 @@ public class DestNameExchange extends AbstractExchange catch (JMException ex) { _logger.error("Exception occured in creating the direct exchange mbean", ex); - throw new AMQException("Exception occured in creating the direct exchange mbean", ex); + throw new AMQException(null, "Exception occured in creating the direct exchange mbean", ex); } } @@ -156,8 +156,8 @@ public class DestNameExchange extends AbstractExchange if (!_index.remove(routingKey, queue)) { - throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() + - " with routing key " + routingKey + ". No queue was registered with that routing key"); + throw new AMQException(null, "Queue " + queue + " was not registered with exchange " + this.getName() + + " with routing key " + routingKey + ". No queue was registered with that routing key", null); } } @@ -171,7 +171,7 @@ public class DestNameExchange extends AbstractExchange String msg = "Routing key " + routingKey + " is not known to " + this; if (info.isMandatory()) { - throw new NoRouteException(msg, payload); + throw new NoRouteException(msg, payload, null); } else { diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java index e9c5b0024c..25ec0c3a2d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java @@ -216,7 +216,7 @@ public class DestWildExchange extends AbstractExchange if (info.isMandatory()) { String msg = "Topic " + routingKey + " is not known to " + this; - throw new NoRouteException(msg, payload); + throw new NoRouteException(msg, payload, null); } else { @@ -276,15 +276,15 @@ public class DestWildExchange extends AbstractExchange List<AMQQueue> queues = _routingKey2queues.get(routingKey); if (queues == null) { - throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() + - " with routing key " + routingKey + ". No queue was registered with that routing key"); + throw new AMQException(null, "Queue " + queue + " was not registered with exchange " + this.getName() + + " with routing key " + routingKey + ". No queue was registered with that routing key", null); } boolean removedQ = queues.remove(queue); if (!removedQ) { - throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() + - " with routing key " + routingKey); + throw new AMQException(null, "Queue " + queue + " was not registered with exchange " + this.getName() + + " with routing key " + routingKey, null); } if (queues.isEmpty()) { @@ -301,7 +301,7 @@ public class DestWildExchange extends AbstractExchange catch (JMException ex) { _logger.error("Exception occured in creating the topic exchenge mbean", ex); - throw new AMQException("Exception occured in creating the topic exchenge mbean", ex); + throw new AMQException(null, "Exception occured in creating the topic exchenge mbean", ex); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInUseException.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInUseException.java index c77f114428..07550dd808 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInUseException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInUseException.java @@ -38,8 +38,8 @@ import org.apache.qpid.AMQException; */ public class ExchangeInUseException extends AMQException { - public ExchangeInUseException(String exchangeName) + public ExchangeInUseException(String exchangeName, Throwable cause) { - super("Exchange " + exchangeName + " is currently in use"); + super(null, "Exchange " + exchangeName + " is currently in use", cause); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java index b3690d3e10..28d4b19f2e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -98,7 +98,7 @@ public class FanoutExchange extends AbstractExchange catch (JMException ex)
{
_logger.error("Exception occured in creating the direct exchange mbean", ex);
- throw new AMQException("Exception occured in creating the direct exchange mbean", ex);
+ throw new AMQException(null, "Exception occured in creating the direct exchange mbean", ex);
}
}
@@ -129,8 +129,8 @@ public class FanoutExchange extends AbstractExchange if (!_queues.remove(queue))
{
- throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() +
- ". ");
+ throw new AMQException(null, "Queue " + queue + " was not registered with exchange " + this.getName() +
+ ". ", null);
}
}
@@ -143,7 +143,7 @@ public class FanoutExchange extends AbstractExchange String msg = "No queues bound to " + this;
if (publishInfo.isMandatory())
{
- throw new NoRouteException(msg, payload);
+ throw new NoRouteException(msg, payload, null);
}
else
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index b4b2bc20bc..8205924207 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -231,7 +231,7 @@ public class HeadersExchange extends AbstractExchange if (payload.getMessagePublishInfo().isMandatory()) { - throw new NoRouteException(msg, payload); + throw new NoRouteException(msg, payload, null); } else { @@ -284,7 +284,7 @@ public class HeadersExchange extends AbstractExchange catch (JMException ex) { _logger.error("Exception occured in creating the HeadersExchangeMBean", ex); - throw new AMQException("Exception occured in creating the HeadersExchangeMBean", ex); + throw new AMQException(null, "Exception occured in creating the HeadersExchangeMBean", ex); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/NoRouteException.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/NoRouteException.java index 1d6ab3842d..c787103c00 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/NoRouteException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/NoRouteException.java @@ -36,9 +36,9 @@ import org.apache.qpid.server.queue.AMQMessage; */ public class NoRouteException extends RequiredDeliveryException { - public NoRouteException(String msg, AMQMessage message) + public NoRouteException(String msg, AMQMessage message, Throwable cause) { - super(msg, message); + super(msg, message, cause); } public AMQConstant getReplyCode() diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java index 56eae279dc..9346eecbb2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java @@ -33,6 +33,8 @@ import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.ConsumerTagNotUniqueException; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.ExistingExclusiveSubscriptionException; +import org.apache.qpid.server.queue.ExistingSubscriptionPreventsExclusiveException; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -146,14 +148,14 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic AMQConstant.NOT_ALLOWED.getCode(), // replyCode msg)); // replyText } - catch (AMQQueue.ExistingExclusiveSubscription e) + catch (ExistingExclusiveSubscriptionException e) { throw body.getChannelException(AMQConstant.ACCESS_REFUSED, "Cannot subscribe to queue " + queue.getName() + " as it already has an existing exclusive consumer"); } - catch (AMQQueue.ExistingSubscriptionPreventsExclusive e) + catch (ExistingSubscriptionPreventsExclusiveException e) { throw body.getChannelException(AMQConstant.ACCESS_REFUSED, "Cannot subscribe to queue " diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java index fef00942a0..43986adea7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java @@ -69,7 +69,7 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener SaslServer ss = session.getSaslServer(); if (ss == null) { - throw new AMQException("No SASL context set up in session"); + throw new AMQException(null, "No SASL context set up in session", null); } AuthenticationResult authResult = authMgr.authenticate(ss, body.response); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java index 29d6c26b66..5dbd1b18de 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java @@ -138,7 +138,7 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< catch (SaslException e) { disposeSaslServer(session); - throw new AMQException("SASL error: " + e, e); + throw new AMQException(null, "SASL error: " + e, e); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java index 2b123bcb2d..f24c96f87f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java @@ -79,7 +79,7 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo AMQShortString routingKey = body.routingKey; if (exchangeName == null) { - throw new AMQException("Exchange exchange must not be null"); + throw new AMQException(null, "Exchange exchange must not be null", null); } Exchange exchange = virtualHost.getExchangeRegistry().getExchange(exchangeName); AMQFrame response; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java index be3ffcc698..855d1a2add 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java @@ -96,7 +96,7 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange else if (!exchange.getType().equals(body.type)) { - throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + body.exchange + " of type " + exchange.getType() + " to " + body.type +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor()); + throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + body.exchange + " of type " + exchange.getType() + " to " + body.type +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(), null); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index ec9041c309..f9e94af697 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -110,7 +110,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar store.createQueue(queue); } catch (Exception e) { - throw new AMQException("Problem when creating queue " + queue, e); + throw new AMQException(null, "Problem when creating queue " + queue, e); } } queueRegistry.registerQueue(queue); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java index cfea4637ab..eb89bf78e5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java @@ -114,7 +114,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB store.destroyQueue(queue); } catch (Exception e) { - throw new AMQException("problem when destroying queue " + queue, e); + throw new AMQException(null, "problem when destroying queue " + queue, e); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java b/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java index 84526dbc11..31313cf024 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java +++ b/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java @@ -71,7 +71,7 @@ public abstract class DefaultManagedObject extends StandardMBean implements Mana } catch (JMException e) { - throw new AMQException("Error registering managed object " + this + ": " + e, e); + throw new AMQException(null, "Error registering managed object " + this + ": " + e, e); } } @@ -88,7 +88,7 @@ public abstract class DefaultManagedObject extends StandardMBean implements Mana } catch (JMException e) { - throw new AMQException("Error unregistering managed object: " + this + ": " + e, e); + throw new AMQException(null, "Error unregistering managed object: " + this + ": " + e, e); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index d430f1af94..82e969b496 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -168,7 +168,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, catch (JMException ex) { _logger.error("AMQProtocolSession MBean creation has failed ", ex); - throw new AMQException("AMQProtocolSession MBean creation has failed ", ex); + throw new AMQException(null, "AMQProtocolSession MBean creation has failed ", ex); } } @@ -199,7 +199,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } else { - throw new UnknnownMessageTypeException(message); + throw new UnknnownMessageTypeException(message, null); } } @@ -321,7 +321,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } if (!wasAnyoneInterested) { - throw new AMQNoMethodHandlerException(evt); + throw new AMQNoMethodHandlerException(evt, null); } } catch (AMQChannelException e) @@ -425,7 +425,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, AMQChannel channel = getChannel(channelId); if (channel == null) { - throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId); + throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId, null); } return channel; } @@ -454,14 +454,14 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { if (_closed) { - throw new AMQException("Session is closed"); + throw new AMQException(null, "Session is closed", null); } final int channelId = channel.getChannelId(); if (_closingChannelsList.contains(channelId)) { - throw new AMQException("Session is marked awaiting channel close"); + throw new AMQException(null, "Session is marked awaiting channel close", null); } if (_channelMap.size() == _maxNoOfChannels) @@ -469,7 +469,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, String errorMessage = toString() + ": maximum number of channels has been reached (" + _maxNoOfChannels + "); can't create channel"; _logger.error(errorMessage); - throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage); + throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage, null); } else { diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java index a7599a3e0d..ee6e090e24 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java @@ -39,8 +39,8 @@ import org.apache.qpid.protocol.AMQMethodEvent; */
public class AMQNoMethodHandlerException extends AMQException
{
- public AMQNoMethodHandlerException(AMQMethodEvent<AMQMethodBody> evt)
+ public AMQNoMethodHandlerException(AMQMethodEvent<AMQMethodBody> evt, Throwable cause)
{
- super("AMQMethodEvent " + evt + " was not processed by any listener on Broker.");
+ super(null, "AMQMethodEvent " + evt + " was not processed by any listener on Broker.", cause);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java index 6e72aa062f..d053884e69 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java @@ -39,8 +39,8 @@ import org.apache.qpid.framing.AMQDataBlock; */
public class UnknnownMessageTypeException extends AMQException
{
- public UnknnownMessageTypeException(AMQDataBlock message)
+ public UnknnownMessageTypeException(AMQDataBlock message, Throwable cause)
{
- super("Unknown message type: " + message.getClass().getName() + ": " + message);
+ super(null, "Unknown message type: " + message.getClass().getName() + ": " + message, cause);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 6ffe1af018..95f75fdb36 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -20,31 +20,33 @@ */ package org.apache.qpid.server.queue; +import java.util.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + + +/** Combines the information that make up a deliverable message into a more manageable form. */ + +import org.apache.log4j.Logger; + +import org.apache.mina.common.ByteBuffer; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQBody; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; -import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.messageStore.MessageStore; -import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.messageStore.StorableMessage; import org.apache.qpid.server.messageStore.StorableQueue; +import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.registry.ApplicationRegistry; - -/** Combines the information that make up a deliverable message into a more manageable form. */ - -import org.apache.log4j.Logger; -import org.apache.mina.common.ByteBuffer; - -import java.util.*; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.txn.TransactionalContext; /** * Combines the information that make up a deliverable message into a more manageable form. @@ -56,7 +58,7 @@ public class AMQMessage implements StorableMessage // The ordered list of queues into which this message is enqueued. private List<StorableQueue> _queues = new LinkedList<StorableQueue>(); // Indicates whether this message is staged - private boolean _isStaged = false; + private boolean _isStaged = false; /** * Used in clustering @@ -89,18 +91,15 @@ public class AMQMessage implements StorableMessage */ private boolean _immediate; - // private Subscription _takenBySubcription; - // private AtomicBoolean _taken = new AtomicBoolean(false); + // private Subscription _takenBySubcription; + // private AtomicBoolean _taken = new AtomicBoolean(false); private TransientMessageData _transientMessageData = new TransientMessageData(); - private Set<Subscription> _rejectedBy = null; - private Map<AMQQueue, AtomicBoolean> _takenMap = new HashMap<AMQQueue, AtomicBoolean>(); private Map<AMQQueue, Subscription> _takenBySubcriptionMap = new HashMap<AMQQueue, Subscription>(); - private final int hashcode = System.identityHashCode(this); private long _expiration; @@ -111,8 +110,10 @@ public class AMQMessage implements StorableMessage public void setExpiration() { - long expiration = ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getExpiration(); - long timestamp = ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getTimestamp(); + long expiration = + ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getExpiration(); + long timestamp = + ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getTimestamp(); if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("advanced.synced-clocks", false)) { @@ -125,10 +126,10 @@ public class AMQMessage implements StorableMessage { if (timestamp != 0L) { - //todo perhaps use arrival time + // todo perhaps use arrival time long diff = (System.currentTimeMillis() - timestamp); - if (diff > 1000L || diff < 1000L) + if ((diff > 1000L) || (diff < 1000L)) { _expiration = expiration + diff; } @@ -159,11 +160,12 @@ public class AMQMessage implements StorableMessage { try { - return _index < _messageHandle.getBodyCount(getStoreContext(), _messageId) - 1; + return _index < (_messageHandle.getBodyCount(getStoreContext(), _messageId) - 1); } catch (AMQException e) { _log.error("Unable to get body count: " + e, e); + return false; } } @@ -173,7 +175,10 @@ public class AMQMessage implements StorableMessage try { - AMQBody cb = getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(), _messageId, ++_index)); + AMQBody cb = + getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(), + _messageId, ++_index)); + return new AMQFrame(_channel, cb); } catch (AMQException e) @@ -209,11 +214,12 @@ public class AMQMessage implements StorableMessage { try { - return _index < _messageHandle.getBodyCount(getStoreContext(), _messageId) - 1; + return _index < (_messageHandle.getBodyCount(getStoreContext(), _messageId) - 1); } catch (AMQException e) { _log.error("Error getting body count: " + e, e); + return false; } } @@ -236,8 +242,7 @@ public class AMQMessage implements StorableMessage } } - public AMQMessage(Long messageId, MessagePublishInfo info, - TransactionalContext txnContext) + public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext) { _messageId = messageId; _txnContext = txnContext; @@ -257,8 +262,7 @@ public class AMQMessage implements StorableMessage * @throws AMQException */ public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory, TransactionalContext txnConext) - throws - AMQException + throws AMQException { _messageId = messageId; _messageHandle = factory.createMessageHandle(store, this, true); @@ -274,10 +278,8 @@ public class AMQMessage implements StorableMessage * @param txnContext * @param contentHeader */ - public AMQMessage(Long messageId, MessagePublishInfo info, - TransactionalContext txnContext, ContentHeaderBody contentHeader) - throws - AMQException + public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext, + ContentHeaderBody contentHeader) throws AMQException { this(messageId, info, txnContext); setContentHeaderBody(contentHeader); @@ -294,13 +296,9 @@ public class AMQMessage implements StorableMessage * @param contentBodies * @throws AMQException */ - public AMQMessage(Long messageId, MessagePublishInfo info, - TransactionalContext txnContext, - ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues, - List<ContentChunk> contentBodies, MessageStore messageStore, StoreContext storeContext, - MessageHandleFactory messageHandleFactory) - throws - AMQException + public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext, + ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues, List<ContentChunk> contentBodies, + MessageStore messageStore, StoreContext storeContext, MessageHandleFactory messageHandleFactory) throws AMQException { this(messageId, info, txnContext, contentHeader); _transientMessageData.setDestinationQueues(destinationQueues); @@ -311,9 +309,7 @@ public class AMQMessage implements StorableMessage } } - protected AMQMessage(AMQMessage msg) - throws - AMQException + protected AMQMessage(AMQMessage msg) throws AMQException { _messageId = msg._messageId; _messageHandle = msg._messageHandle; @@ -322,9 +318,9 @@ public class AMQMessage implements StorableMessage _transientMessageData = msg._transientMessageData; } - //======================================================================== + // ======================================================================== // Interface StorableMessage - //======================================================================== + // ======================================================================== public long getMessageId() { @@ -342,10 +338,12 @@ public class AMQMessage implements StorableMessage result = new byte[headerBody.getSize()]; bufferedResult = ByteBuffer.wrap(result); headerBody.writePayload(bufferedResult); - } catch (AMQException e) + } + catch (AMQException e) { _log.error("Error when getting message header", e); } + return result; } @@ -355,10 +353,12 @@ public class AMQMessage implements StorableMessage try { result = _messageHandle.getContentHeaderBody(_txnContext.getStoreContext(), _messageId).getSize(); - } catch (AMQException e) + } + catch (AMQException e) { _log.error("Error when getting message header size", e); } + return result; } @@ -372,7 +372,7 @@ public class AMQMessage implements StorableMessage return _messageHandle.getMessagePayload().length; } - public boolean isEnqueued() + public boolean isEnqueued() { return _queues.size() > 0; } @@ -401,6 +401,7 @@ public class AMQMessage implements StorableMessage { _log.debug("The queue position is " + _queues.indexOf(queue)); } + return _queues.indexOf(queue); } @@ -424,44 +425,40 @@ public class AMQMessage implements StorableMessage return new BodyContentIterator(); } - public ContentHeaderBody getContentHeaderBody() - throws - AMQException + public ContentHeaderBody getContentHeaderBody() throws AMQException { if (_transientMessageData != null) { return _transientMessageData.getContentHeaderBody(); - } else + } + else { return _messageHandle.getContentHeaderBody(getStoreContext(), _messageId); } } - public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) - throws - AMQException + public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) throws AMQException { _transientMessageData.setContentHeaderBody(contentHeaderBody); } public void routingComplete(MessageStore store, StoreContext storeContext, MessageHandleFactory factory) - throws - AMQException + throws AMQException { final boolean persistent = isPersistent(); _messageHandle = factory.createMessageHandle(store, this, persistent); - //if (persistent) - // { - _txnContext.beginTranIfNecessary(); - // } + // if (persistent) + // { + _txnContext.beginTranIfNecessary(); + // } // enqueuing the messages ensure that if required the destinations are recorded to a // persistent store - // for (AMQQueue q : _transientMessageData.getDestinationQueues()) - // { - // _messageHandle.enqueue(storeContext, _messageId, q); - // } + // for (AMQQueue q : _transientMessageData.getDestinationQueues()) + // { + // _messageHandle.enqueue(storeContext, _messageId, q); + // } if (_transientMessageData.getContentHeaderBody().bodySize == 0) { @@ -469,9 +466,7 @@ public class AMQMessage implements StorableMessage } } - public boolean addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk) - throws - AMQException + public boolean addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk) throws AMQException { _transientMessageData.addBodyLength(contentChunk.getSize()); final boolean allContentReceived = isAllContentReceived(); @@ -479,21 +474,20 @@ public class AMQMessage implements StorableMessage if (allContentReceived) { deliver(storeContext); + return true; - } else + } + else { return false; } } - public boolean isAllContentReceived() - throws - AMQException + public boolean isAllContentReceived() throws AMQException { return _transientMessageData.isAllContentReceived(); } - /** * Creates a long-lived reference to this message, and increments the count of such references, as an atomic * operation. @@ -501,6 +495,7 @@ public class AMQMessage implements StorableMessage public AMQMessage takeReference() { _referenceCount.incrementAndGet(); + return this; } @@ -510,10 +505,10 @@ public class AMQMessage implements StorableMessage protected void incrementReference() { _referenceCount.incrementAndGet(); -// if (_log.isDebugEnabled()) -// { -// _log.debug("Ref count on message " + debugIdentity() + " incremented " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); -// } + // if (_log.isDebugEnabled()) + // { + // _log.debug("Ref count on message " + debugIdentity() + " incremented " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); + // } } /** @@ -524,9 +519,7 @@ public class AMQMessage implements StorableMessage * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that * failed */ - public void decrementReference(StoreContext storeContext) - throws - MessageCleanupException + public void decrementReference(StoreContext storeContext) throws MessageCleanupException { int count = _referenceCount.decrementAndGet(); @@ -538,10 +531,10 @@ public class AMQMessage implements StorableMessage { try { -// if (_log.isDebugEnabled()) -// { -// _log.debug("Decremented ref count on message " + debugIdentity() + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); -// } + // if (_log.isDebugEnabled()) + // { + // _log.debug("Decremented ref count on message " + debugIdentity() + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); + // } // must check if the handle is null since there may be cases where we decide to throw away a message // and the handle has not yet been constructed @@ -552,15 +545,17 @@ public class AMQMessage implements StorableMessage } catch (AMQException e) { - //to maintain consistency, we revert the count + // to maintain consistency, we revert the count incrementReference(); - throw new MessageCleanupException(_messageId, e); + throw new MessageCleanupException("Failed to cleanup message with id " + _messageId, e); } - } else + } + else { if (count < 0) { - throw new MessageCleanupException("Reference count for message id " + debugIdentity() + " has gone below 0."); + throw new MessageCleanupException("Reference count for message id " + debugIdentity() + " has gone below 0.", + null); } } } @@ -587,7 +582,7 @@ public class AMQMessage implements StorableMessage public boolean isTaken(AMQQueue queue) { - //return _taken.get(); + // return _taken.get(); synchronized (this) { @@ -604,15 +599,15 @@ public class AMQMessage implements StorableMessage public boolean taken(AMQQueue queue, Subscription sub) { -// if (_taken.getAndSet(true)) -// { -// return true; -// } -// else -// { -// _takenBySubcription = sub; -// return false; -// } + // if (_taken.getAndSet(true)) + // { + // return true; + // } + // else + // { + // _takenBySubcription = sub; + // return false; + // } synchronized (this) { @@ -625,10 +620,12 @@ public class AMQMessage implements StorableMessage if (taken.getAndSet(true)) { return true; - } else + } + else { _takenMap.put(queue, taken); _takenBySubcriptionMap.put(queue, sub); + return false; } } @@ -641,9 +638,8 @@ public class AMQMessage implements StorableMessage _log.trace("Releasing Message:" + debugIdentity()); } -// _taken.set(false); -// _takenBySubcription = null; - + // _taken.set(false); + // _takenBySubcription = null; synchronized (this) { @@ -651,7 +647,8 @@ public class AMQMessage implements StorableMessage if (taken == null) { taken = new AtomicBoolean(false); - } else + } + else { taken.set(false); } @@ -672,9 +669,11 @@ public class AMQMessage implements StorableMessage if (_tokens.contains(token)) { return true; - } else + } + else { _tokens.add(token); + return false; } } @@ -687,28 +686,23 @@ public class AMQMessage implements StorableMessage * @param queue the queue * @throws org.apache.qpid.AMQException if there is an error enqueuing the message */ - public void enqueue(AMQQueue queue) - throws - AMQException + public void enqueue(AMQQueue queue) throws AMQException { _transientMessageData.addDestinationQueue(queue); } - public void dequeue(StoreContext storeContext, AMQQueue queue) - throws - AMQException + public void dequeue(StoreContext storeContext, AMQQueue queue) throws AMQException { _messageHandle.dequeue(storeContext, _messageId, queue); } - public boolean isPersistent() - throws - AMQException + public boolean isPersistent() throws AMQException { if (_transientMessageData != null) { return _transientMessageData.isPersistent(); - } else + } + else { return _messageHandle.isPersistent(getStoreContext(), _messageId); } @@ -720,29 +714,27 @@ public class AMQMessage implements StorableMessage * @throws NoConsumersException if the message is marked for immediate delivery but has not been marked as delivered * to a consumer */ - public void checkDeliveredToConsumer() - throws - NoConsumersException + public void checkDeliveredToConsumer() throws NoConsumersException { if (_immediate && !_deliveredToConsumer) { - throw new NoConsumersException(this); + throw new NoConsumersException(this, null); } } - public MessagePublishInfo getMessagePublishInfo() - throws - AMQException + public MessagePublishInfo getMessagePublishInfo() throws AMQException { MessagePublishInfo pb; if (_transientMessageData != null) { pb = _transientMessageData.getMessagePublishInfo(); - } else + } + else { pb = _messageHandle.getMessagePublishInfo(getStoreContext(), _messageId); } + return pb; } @@ -773,7 +765,7 @@ public class AMQMessage implements StorableMessage */ public boolean expired(StoreContext storecontext, AMQQueue queue) throws AMQException { - //note: If the storecontext isn't need then we can remove the getChannel() from Subscription. + // note: If the storecontext isn't need then we can remove the getChannel() from Subscription. if (_expiration != 0L) { @@ -782,6 +774,7 @@ public class AMQMessage implements StorableMessage if (now > _expiration) { dequeue(storecontext, queue); + return true; } } @@ -795,9 +788,7 @@ public class AMQMessage implements StorableMessage _deliveredToConsumer = true; } - private void deliver(StoreContext storeContext) - throws - AMQException + private void deliver(StoreContext storeContext) throws AMQException { // we get a reference to the destination queues now so that we can clear the // transient message data as quickly as possible @@ -806,12 +797,13 @@ public class AMQMessage implements StorableMessage { _log.debug("Delivering message " + debugIdentity() + " to " + destinationQueues); } + try { // first we allow the handle to know that the message has been fully received. This is useful if it is // maintaining any calculated values based on content chunks - _messageHandle.setPublishAndContentHeaderBody(storeContext, _messageId, _transientMessageData.getMessagePublishInfo(), - _transientMessageData.getContentHeaderBody()); + _messageHandle.setPublishAndContentHeaderBody(storeContext, _messageId, + _transientMessageData.getMessagePublishInfo(), _transientMessageData.getContentHeaderBody()); // we then allow the transactional context to do something with the message content // now that it has all been received, before we attempt delivery @@ -821,9 +813,9 @@ public class AMQMessage implements StorableMessage for (AMQQueue q : destinationQueues) { - //Increment the references to this message for each queue delivery. + // Increment the references to this message for each queue delivery. incrementReference(); - //normal deliver so add this message at the end. + // normal deliver so add this message at the end. _txnContext.deliver(this, q, false); } } @@ -835,182 +827,181 @@ public class AMQMessage implements StorableMessage } } -/* - public void writeDeliver(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag) - throws AMQException - { - ByteBuffer deliver = createEncodedDeliverFrame(protocolSession, channelId, deliveryTag, consumerTag); - AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, - getContentHeaderBody()); - - final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId); - if (bodyCount == 0) + /* + public void writeDeliver(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag) + throws AMQException { - SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver, - contentHeader); + ByteBuffer deliver = createEncodedDeliverFrame(protocolSession, channelId, deliveryTag, consumerTag); + AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, + getContentHeaderBody()); + + final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId); + if (bodyCount == 0) + { + SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver, + contentHeader); + + protocolSession.writeFrame(compositeBlock); + } + else + { + + // + // Optimise the case where we have a single content body. In that case we create a composite block + // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. + // + ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0); + + AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)); + AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody}; + CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent); + protocolSession.writeFrame(compositeBlock); + + // + // Now start writing out the other content bodies + // + for (int i = 1; i < bodyCount; i++) + { + cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i); + protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb))); + } + + + } + - protocolSession.writeFrame(compositeBlock); } - else + + public void writeGetOk(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize) throws AMQException { + ByteBuffer deliver = createEncodedGetOkFrame(protocolSession, channelId, deliveryTag, queueSize); + AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, + getContentHeaderBody()); - // - // Optimise the case where we have a single content body. In that case we create a composite block - // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. - // - ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0); + final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId); + if (bodyCount == 0) + { + SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver, + contentHeader); + protocolSession.writeFrame(compositeBlock); + } + else + { + + // + // Optimise the case where we have a single content body. In that case we create a composite block + // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. + // + ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0); + + AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)); + AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody}; + CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent); + protocolSession.writeFrame(compositeBlock); + + // + // Now start writing out the other content bodies + // + for (int i = 1; i < bodyCount; i++) + { + cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i); + protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb))); + } - AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)); - AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody}; - CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent); - protocolSession.writeFrame(compositeBlock); - // - // Now start writing out the other content bodies - // - for (int i = 1; i < bodyCount; i++) - { - cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i); - protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb))); } } - } + private ByteBuffer createEncodedDeliverFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag) + throws AMQException + { + MessagePublishInfo pb = getMessagePublishInfo(); + AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, protocolSession.getProtocolMajorVersion(), (byte) 0, consumerTag, + deliveryTag, pb.getExchange(), _messageHandle.isRedelivered(), + pb.getRoutingKey()); + ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem? + deliverFrame.writePayload(buf); + buf.flip(); + return buf; + } - public void writeGetOk(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize) throws AMQException - { - ByteBuffer deliver = createEncodedGetOkFrame(protocolSession, channelId, deliveryTag, queueSize); - AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, - getContentHeaderBody()); + private ByteBuffer createEncodedGetOkFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize) + throws AMQException + { + MessagePublishInfo pb = getMessagePublishInfo(); + AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId, + protocolSession.getProtocolMajorVersion(), + protocolSession.getProtocolMinorVersion(), + deliveryTag, pb.getExchange(), + queueSize, + _messageHandle.isRedelivered(), + pb.getRoutingKey()); + ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); // XXX: Could cast be a problem? + getOkFrame.writePayload(buf); + buf.flip(); + return buf; + } - final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId); - if (bodyCount == 0) + private ByteBuffer createEncodedReturnFrame(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText) throws AMQException { - SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver, - contentHeader); - protocolSession.writeFrame(compositeBlock); + AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId, + protocolSession.getProtocolMajorVersion(), + protocolSession.getProtocolMinorVersion(), + getMessagePublishInfo().getExchange(), + replyCode, replyText, + getMessagePublishInfo().getRoutingKey()); + ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem? + returnFrame.writePayload(buf); + buf.flip(); + return buf; } - else + + public void writeReturn(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText) + throws AMQException { + ByteBuffer returnFrame = createEncodedReturnFrame(protocolSession, channelId, replyCode, replyText); + + AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, + getContentHeaderBody()); + Iterator<AMQDataBlock> bodyFrameIterator = getBodyFrameIterator(protocolSession, channelId); // // Optimise the case where we have a single content body. In that case we create a composite block // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. // - ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0); - - AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)); - AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody}; - CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent); - protocolSession.writeFrame(compositeBlock); + if (bodyFrameIterator.hasNext()) + { + AMQDataBlock firstContentBody = bodyFrameIterator.next(); + AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody}; + CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent); + protocolSession.writeFrame(compositeBlock); + } + else + { + CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, + new AMQDataBlock[]{contentHeader}); + protocolSession.writeFrame(compositeBlock); + } // // Now start writing out the other content bodies + // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded // - for (int i = 1; i < bodyCount; i++) + while (bodyFrameIterator.hasNext()) { - cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i); - protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb))); + protocolSession.writeFrame(bodyFrameIterator.next()); } - - } - - - } - - - private ByteBuffer createEncodedDeliverFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag) - throws AMQException - { - MessagePublishInfo pb = getMessagePublishInfo(); - AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, protocolSession.getProtocolMajorVersion(), (byte) 0, consumerTag, - deliveryTag, pb.getExchange(), _messageHandle.isRedelivered(), - pb.getRoutingKey()); - ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem? - deliverFrame.writePayload(buf); - buf.flip(); - return buf; - } - - private ByteBuffer createEncodedGetOkFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize) - throws AMQException - { - MessagePublishInfo pb = getMessagePublishInfo(); - AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId, - protocolSession.getProtocolMajorVersion(), - protocolSession.getProtocolMinorVersion(), - deliveryTag, pb.getExchange(), - queueSize, - _messageHandle.isRedelivered(), - pb.getRoutingKey()); - ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); // XXX: Could cast be a problem? - getOkFrame.writePayload(buf); - buf.flip(); - return buf; - } - - private ByteBuffer createEncodedReturnFrame(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText) throws AMQException - { - AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId, - protocolSession.getProtocolMajorVersion(), - protocolSession.getProtocolMinorVersion(), - getMessagePublishInfo().getExchange(), - replyCode, replyText, - getMessagePublishInfo().getRoutingKey()); - ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem? - returnFrame.writePayload(buf); - buf.flip(); - return buf; - } - - public void writeReturn(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText) - throws AMQException - { - ByteBuffer returnFrame = createEncodedReturnFrame(protocolSession, channelId, replyCode, replyText); - - AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, - getContentHeaderBody()); - - Iterator<AMQDataBlock> bodyFrameIterator = getBodyFrameIterator(protocolSession, channelId); - // - // Optimise the case where we have a single content body. In that case we create a composite block - // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. - // - if (bodyFrameIterator.hasNext()) - { - AMQDataBlock firstContentBody = bodyFrameIterator.next(); - AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody}; - CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent); - protocolSession.writeFrame(compositeBlock); - } - else - { - CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, - new AMQDataBlock[]{contentHeader}); - protocolSession.writeFrame(compositeBlock); - } - - // - // Now start writing out the other content bodies - // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded - // - while (bodyFrameIterator.hasNext()) - { - protocolSession.writeFrame(bodyFrameIterator.next()); - } - } -*/ + */ public AMQMessageHandle getMessageHandle() { return _messageHandle; } - public long getSize() { try @@ -1022,15 +1013,13 @@ public class AMQMessage implements StorableMessage catch (AMQException e) { _log.error(e.toString(), e); + return 0; } } - - public void restoreTransientMessageData() - throws - AMQException + public void restoreTransientMessageData() throws AMQException { TransientMessageData transientMessageData = new TransientMessageData(); transientMessageData.setMessagePublishInfo(getMessagePublishInfo()); @@ -1039,25 +1028,23 @@ public class AMQMessage implements StorableMessage _transientMessageData = transientMessageData; } - public void clearTransientMessageData() { _transientMessageData = null; } - public String toString() { -// return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " + -// _taken + " by :" + _takenBySubcription; + // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " + + // _taken + " by :" + _takenBySubcription; - return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " + - _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString(); + return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " + + _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString(); } public Subscription getDeliveredSubscription(AMQQueue queue) { -// return _takenBySubcription; + // return _takenBySubcription; synchronized (this) { return _takenBySubcriptionMap.get(queue); @@ -1074,7 +1061,8 @@ public class AMQMessage implements StorableMessage } _rejectedBy.add(subscription); - } else + } + else { _log.warn("Requesting rejection by null subscriber:" + debugIdentity()); } @@ -1084,10 +1072,11 @@ public class AMQMessage implements StorableMessage { boolean rejected = _rejectedBy != null; - if (rejected) // We have subscriptions that rejected this message + if (rejected) // We have subscriptions that rejected this message { return _rejectedBy.contains(subscription); - } else // This messasge hasn't been rejected yet. + } + else // This messasge hasn't been rejected yet. { return rejected; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index a17cbb87ff..a803ef1227 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -21,9 +21,9 @@ package org.apache.qpid.server.queue; import java.text.MessageFormat; -import java.util.List; -import java.util.Hashtable; import java.util.Collection; +import java.util.Hashtable; +import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; @@ -40,11 +40,11 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.exception.InternalErrorException; -import org.apache.qpid.server.messageStore.StorableMessage; -import org.apache.qpid.server.messageStore.StorableQueue; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.messageStore.StorableMessage; +import org.apache.qpid.server.messageStore.StorableQueue; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -55,49 +55,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost; */ public class AMQQueue implements Managable, Comparable, StorableQueue { - /** - * ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription - * already exists. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Represent failure to create a subscription, because an exclusive subscription already exists. - * </table> - * - * @todo Not an AMQP exception as no status code. - * - * @todo Move to top level, used outside this class. - */ - public static int s_queueID =0; - public static final class ExistingExclusiveSubscription extends AMQException - { - - public ExistingExclusiveSubscription() - { - super(""); - } - } - - /** - * ExistingSubscriptionPreventsExclusive signals a failure to create an exclusize subscription, as a subscription - * already exists. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Represent failure to create an exclusize subscription, as a subscription already exists. - * </table> - * - * @todo Not an AMQP exception as no status code. - * - * @todo Move to top level, used outside this class. - */ - public static final class ExistingSubscriptionPreventsExclusive extends AMQException - { - public ExistingSubscriptionPreventsExclusive() - { - super(""); - } - } + public static int s_queueID = 0; private static final Logger _logger = Logger.getLogger(AMQQueue.class); @@ -108,7 +66,6 @@ public class AMQQueue implements Managable, Comparable, StorableQueue // The list of enqueued messages. Hashtable<Long, StorableMessage> _messages = new Hashtable<Long, StorableMessage>(); - /** * null means shared */ @@ -236,12 +193,10 @@ public class AMQQueue implements Managable, Comparable, StorableQueue _subscribers = subscribers; _subscriptionFactory = subscriptionFactory; _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this); - _queueId = s_queueID++; + _queueId = s_queueID++; } - private AMQQueueMBean createMBean() - throws - AMQException + private AMQQueueMBean createMBean() throws AMQException { try { @@ -249,7 +204,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue } catch (JMException ex) { - throw new AMQException("AMQQueue MBean creation has failed ", ex); + throw new AMQException(null, "AMQQueue MBean creation has failed ", ex); } } @@ -443,9 +398,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue /** * Removes the AMQMessage from the top of the queue. */ - public synchronized void deleteMessageFromTop(StoreContext storeContext) - throws - AMQException + public synchronized void deleteMessageFromTop(StoreContext storeContext) throws AMQException { _deliveryMgr.removeAMessageFromTop(storeContext); } @@ -453,16 +406,12 @@ public class AMQQueue implements Managable, Comparable, StorableQueue /** * removes all the messages from the queue. */ - public synchronized long clearQueue(StoreContext storeContext) - throws - AMQException + public synchronized long clearQueue(StoreContext storeContext) throws AMQException { return _deliveryMgr.clearAllMessages(storeContext); } - public void bind(AMQShortString routingKey, FieldTable arguments, Exchange exchange) - throws - AMQException + public void bind(AMQShortString routingKey, FieldTable arguments, Exchange exchange) throws AMQException { exchange.registerQueue(routingKey, this, arguments); if (isDurable() && exchange.isDurable()) @@ -470,18 +419,17 @@ public class AMQQueue implements Managable, Comparable, StorableQueue try { _virtualHost.getMessageStore().bindQueue(exchange, routingKey, this, arguments); - } catch (InternalErrorException e) + } + catch (InternalErrorException e) { - throw new AMQException("Problem binding queue ", e); + throw new AMQException(null, "Problem binding queue ", e); } } _bindings.addBinding(routingKey, arguments, exchange); } - public void unBind(AMQShortString routingKey, FieldTable arguments, Exchange exchange) - throws - AMQException + public void unBind(AMQShortString routingKey, FieldTable arguments, Exchange exchange) throws AMQException { exchange.deregisterQueue(routingKey, this, arguments); if (isDurable() && exchange.isDurable()) @@ -489,9 +437,10 @@ public class AMQQueue implements Managable, Comparable, StorableQueue try { _virtualHost.getMessageStore().unbindQueue(exchange, routingKey, this, arguments); - } catch (InternalErrorException e) + } + catch (InternalErrorException e) { - throw new AMQException("problem unbinding queue", e); + throw new AMQException(null, "problem unbinding queue", e); } } @@ -506,14 +455,16 @@ public class AMQQueue implements Managable, Comparable, StorableQueue if (isExclusive()) { decrementSubscriberCount(); - throw new ExistingExclusiveSubscription(); - } else if (exclusive) + throw new ExistingExclusiveSubscriptionException(); + } + else if (exclusive) { decrementSubscriberCount(); - throw new ExistingSubscriptionPreventsExclusive(); + throw new ExistingSubscriptionPreventsExclusiveException(); } - } else if (exclusive) + } + else if (exclusive) { setExclusive(true); } @@ -559,9 +510,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue return _subscriberCount.decrementAndGet(); } - public void unregisterProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag) - throws - AMQException + public void unregisterProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag) throws AMQException { if (_logger.isDebugEnabled()) { @@ -576,8 +525,8 @@ public class AMQQueue implements Managable, Comparable, StorableQueue _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel, ps, consumerTag))) == null) { - throw new AMQException("Protocol session with channel " + channel + " and consumer tag " + consumerTag - + " and protocol session key " + ps.getKey() + " not registered with queue " + this); + throw new AMQException(null, "Protocol session with channel " + channel + " and consumer tag " + consumerTag + + " and protocol session key " + ps.getKey() + " not registered with queue " + this, null); } removedSubscription.close(); @@ -609,21 +558,21 @@ public class AMQQueue implements Managable, Comparable, StorableQueue return !_deliveryMgr.hasQueuedMessages(); } - public int delete(boolean checkUnused, boolean checkEmpty) - throws - AMQException + public int delete(boolean checkUnused, boolean checkEmpty) throws AMQException { if (checkUnused && !_subscribers.isEmpty()) { _logger.info("Will not delete " + this + " as it is in use."); return 0; - } else if (checkEmpty && _deliveryMgr.hasQueuedMessages()) + } + else if (checkEmpty && _deliveryMgr.hasQueuedMessages()) { _logger.info("Will not delete " + this + " as it is not empty."); return 0; - } else + } + else { delete(); @@ -631,9 +580,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue } } - public void delete() - throws - AMQException + public void delete() throws AMQException { if (!_deleted.getAndSet(true)) { @@ -650,9 +597,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue } } - protected void autodelete() - throws - AMQException + protected void autodelete() throws AMQException { if (_logger.isDebugEnabled()) { @@ -662,9 +607,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue delete(); } - public void processGet(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) - throws - AMQException + public void processGet(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException { // fixme not sure what this is doing. should we be passing deliverFirst through here? // This code is not used so when it is perhaps it should @@ -687,9 +630,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue // return _deliveryMgr; // } - public void process(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) - throws - AMQException + public void process(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException { _deliveryMgr.deliver(storeContext, getName(), msg, deliverFirst); try @@ -705,9 +646,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue } } - void dequeue(StoreContext storeContext, AMQMessage msg) - throws - FailedDequeueException + void dequeue(StoreContext storeContext, AMQMessage msg) throws FailedDequeueException { try { @@ -737,9 +676,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue return _subscribers; } - protected void updateReceivedMessageCount(AMQMessage msg) - throws - AMQException + protected void updateReceivedMessageCount(AMQMessage msg) throws AMQException { if (!msg.isRedelivered()) { @@ -752,7 +689,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue } catch (JMException e) { - throw new AMQException("Unable to get notification from manage queue: " + e, e); + throw new AMQException(null, "Unable to get notification from manage queue: " + e, e); } } @@ -783,9 +720,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue return "Queue(" + _name + ")@" + System.identityHashCode(this); } - public boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) - throws - AMQException + public boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) throws AMQException { return _deliveryMgr.performGet(session, channel, acks); } @@ -802,9 +737,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue public static interface Task { - public void doTask(AMQQueue queue) - throws - AMQException; + public void doTask(AMQQueue queue) throws AMQException; } public void addQueueDeleteTask(Task task) @@ -837,9 +770,9 @@ public class AMQQueue implements Managable, Comparable, StorableQueue _deliveryMgr.subscriberHasPendingResend(hasContent, subscription, msg); } - //======================================================================== + // ======================================================================== // Interface StorableQueue - //======================================================================== + // ======================================================================== public int getQueueID() { @@ -861,9 +794,9 @@ public class AMQQueue implements Managable, Comparable, StorableQueue _messages.put(m.getMessageId(), m); } - //======================================================================== + // ======================================================================== // Used by the Store - //======================================================================== + // ======================================================================== /** * Get the list of enqueud messages diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ExistingExclusiveSubscriptionException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ExistingExclusiveSubscriptionException.java new file mode 100644 index 0000000000..a5ff9e6326 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ExistingExclusiveSubscriptionException.java @@ -0,0 +1,22 @@ +package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+
+/**
+ * ExistingExclusiveSubscriptionException signals a failure to create a subscription, because an exclusive subscription
+ * already exists.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represent failure to create a subscription, because an exclusive subscription already exists.
+ * </table>
+ *
+ * @todo Not an AMQP exception as no status code.
+ */
+public final class ExistingExclusiveSubscriptionException extends AMQException
+{
+ public ExistingExclusiveSubscriptionException()
+ {
+ super(null, "", null);
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ExistingSubscriptionPreventsExclusiveException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ExistingSubscriptionPreventsExclusiveException.java new file mode 100644 index 0000000000..a13686eb56 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ExistingSubscriptionPreventsExclusiveException.java @@ -0,0 +1,22 @@ +package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+
+/**
+ * ExistingSubscriptionPreventsExclusiveException signals a failure to create an exclusize subscription, as a subscription
+ * already exists.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represent failure to create an exclusize subscription, as a subscription already exists.
+ * </table>
+ *
+ * @todo Not an AMQP exception as no status code.
+ */
+public final class ExistingSubscriptionPreventsExclusiveException extends AMQException
+{
+ public ExistingSubscriptionPreventsExclusiveException()
+ {
+ super(null, "", null);
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java index 6466e81dd2..d5c34152a8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java @@ -38,13 +38,8 @@ import org.apache.qpid.AMQException; */ public class FailedDequeueException extends AMQException { - public FailedDequeueException(String queue) + public FailedDequeueException(String queue, Throwable cause) { - super("Failed to dequeue message from " + queue); - } - - public FailedDequeueException(String queue, AMQException e) - { - super("Failed to dequeue message from " + queue, e); + super(null, "Failed to dequeue message from " + queue, cause); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java index 090096d3c3..2fdd2791b1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java @@ -40,13 +40,8 @@ import org.apache.qpid.AMQException; */ public class MessageCleanupException extends AMQException { - public MessageCleanupException(long messageId, AMQException e) + public MessageCleanupException(String message, Throwable cause) { - super("Failed to cleanup message with id " + messageId, e); - } - - public MessageCleanupException(String message) - { - super(message); + super(null, message, cause); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java index d6fd1eec89..afcdf062de 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java @@ -35,9 +35,9 @@ import org.apache.qpid.server.RequiredDeliveryException; */ public class NoConsumersException extends RequiredDeliveryException { - public NoConsumersException(AMQMessage message) + public NoConsumersException(AMQMessage message, Throwable cause) { - super("Immediate delivery is not possible.", message); + super("Immediate delivery is not possible.", message, cause); } public AMQConstant getReplyCode() diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java index 5539627820..560549c126 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java @@ -175,7 +175,7 @@ public class StorableMessageHandle implements AMQMessageHandle } } catch (Exception e) { - throw new AMQException("PRoblem during message enqueue", e); + throw new AMQException(null, "PRoblem during message enqueue", e); } } @@ -191,7 +191,7 @@ public class StorableMessageHandle implements AMQMessageHandle } } catch (Exception e) { - throw new AMQException("PRoblem during message dequeue", e); + throw new AMQException(null, "PRoblem during message dequeue", e); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index a7be9f2ad2..1cebf08fa6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -108,7 +108,7 @@ public class SubscriptionImpl implements Subscription AMQChannel channel = protocolSession.getChannel(channelId); if (channel == null) { - throw new AMQException(AMQConstant.NOT_FOUND, "channel :" + channelId + " not found in protocol session"); + throw new AMQException(AMQConstant.NOT_FOUND, "channel :" + channelId + " not found in protocol session", null); } this.channel = channel; diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/IllegalStateTransitionException.java b/java/broker/src/main/java/org/apache/qpid/server/state/IllegalStateTransitionException.java deleted file mode 100644 index cec67a8a6d..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/state/IllegalStateTransitionException.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.state; - -import org.apache.qpid.AMQException; - -/** - * @todo Not an AMQP exception as no status code. - * - * @todo Not used! Delete. - */ -public class IllegalStateTransitionException extends AMQException -{ - private AMQState _originalState; - - private Class _frame; - - public IllegalStateTransitionException(AMQState originalState, Class frame) - { - super("No valid state transition defined for receiving frame " + frame + " from state " + originalState); - _originalState = originalState; - _frame = frame; - } - - public AMQState getOriginalState() - { - return _originalState; - } - - public Class getFrameClass() - { - return _frame; - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java index 05756a8c23..6c001485b9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java @@ -87,7 +87,7 @@ public class DistributedTransactionalContext implements TransactionalContext _storeContext.setPayload(xid); } catch (Exception e) { - throw new AMQException("Problem during transaction begin", e); + throw new AMQException(null, "Problem during transaction begin", e); } } } @@ -105,7 +105,7 @@ public class DistributedTransactionalContext implements TransactionalContext } } catch (Exception e) { - throw new AMQException("Problem during transaction commit", e); + throw new AMQException(null, "Problem during transaction commit", e); } finally { @@ -125,7 +125,7 @@ public class DistributedTransactionalContext implements TransactionalContext } } catch (Exception e) { - throw new AMQException("Problem during transaction rollback", e); + throw new AMQException(null, "Problem during transaction rollback", e); } finally { @@ -152,7 +152,7 @@ public class DistributedTransactionalContext implements TransactionalContext _transactionManager.getTransaction((Xid) _storeContext.getPayload()).addRecord(new EnqueueRecord(_storeContext, message, queue, deliverFirst)); } catch (Exception e) { - throw new AMQException("Problem during transaction rollback", e); + throw new AMQException(null, "Problem during transaction rollback", e); } } @@ -196,7 +196,7 @@ public class DistributedTransactionalContext implements TransactionalContext { if (!unacknowledgedMessageMap.contains(deliveryTag)) { - throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel"); + throw new AMQException(null, "Multiple ack on delivery tag " + deliveryTag + " not known for channel", null); } LinkedList<UnacknowledgedMessage> acked = new LinkedList<UnacknowledgedMessage>(); @@ -219,7 +219,7 @@ public class DistributedTransactionalContext implements TransactionalContext if (msg == null) { _log.info("Single ack on delivery tag " + deliveryTag); - throw new AMQException("Single ack on delivery tag " + deliveryTag); + throw new AMQException(null, "Single ack on delivery tag " + deliveryTag, null); } if (_log.isDebugEnabled()) @@ -250,7 +250,7 @@ public class DistributedTransactionalContext implements TransactionalContext _transactionManager.getTransaction((Xid) _storeContext.getPayload()).addRecord(new DequeueRecord()); } catch (Exception e) { - throw new AMQException("Problem during message dequeue", e); + throw new AMQException(null, "Problem during message dequeue", e); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java index 6d776eec0f..93459beb45 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java @@ -126,7 +126,7 @@ public class LocalTransactionalContext implements TransactionalContext { if (!unacknowledgedMessageMap.contains(deliveryTag)) { - throw new AMQException("Ack with delivery tag " + deliveryTag + " not known for channel"); + throw new AMQException(null, "Ack with delivery tag " + deliveryTag + " not known for channel", null); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java index 496c94dae9..addb0c791f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -149,7 +149,7 @@ public class NonTransactionalContext implements TransactionalContext { if (!unacknowledgedMessageMap.contains(deliveryTag)) { - throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel"); + throw new AMQException(null, "Multiple ack on delivery tag " + deliveryTag + " not known for channel", null); } LinkedList<UnacknowledgedMessage> acked = new LinkedList<UnacknowledgedMessage>(); @@ -182,8 +182,8 @@ public class NonTransactionalContext implements TransactionalContext { _log.info("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channel.getChannelId()); - throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" + - _channel.getChannelId()); + throw new AMQException(null, "Single ack on delivery tag " + deliveryTag + " not known for channel:" + + _channel.getChannelId(), null); } if (!_browsedAcks.contains(deliveryTag)) |
