From 3b5d4734b777b54b52ce2710f404143aca8c5c6e Mon Sep 17 00:00:00 2001 From: Rupert Smith Date: Fri, 1 Jun 2007 14:33:07 +0000 Subject: QPID-402: FailoverException falling through to client. All blocking operations now wrapped in failover support wrappers. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@543496 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQConnection.java | 149 +- .../java/org/apache/qpid/client/AMQSession.java | 3613 +++++++++++--------- .../apache/qpid/client/BasicMessageConsumer.java | 162 +- .../org/apache/qpid/client/JMSAMQException.java | 28 +- .../qpid/client/failover/FailoverException.java | 26 +- .../qpid/client/failover/FailoverHandler.java | 135 +- .../qpid/client/failover/FailoverNoopSupport.java | 54 + .../failover/FailoverProtectedOperation.java | 30 + .../qpid/client/failover/FailoverRetrySupport.java | 130 + .../qpid/client/failover/FailoverSupport.java | 81 +- .../qpid/client/protocol/AMQProtocolHandler.java | 327 +- .../protocol/BlockingMethodFrameListener.java | 144 +- .../listener/SpecificMethodFrameListener.java | 2 +- .../client/util/FlowControllingBlockingQueue.java | 11 +- .../unit/client/channelclose/ChannelCloseTest.java | 136 +- .../qpid/test/unit/close/MessageRequeueTest.java | 74 +- .../test/unit/transacted/CommitRollbackTest.java | 31 +- .../apache/qpid/testutil/QpidClientConnection.java | 28 +- .../apache/qpid/AMQConnectionClosedException.java | 9 +- .../org/apache/qpid/AMQPInvalidClassException.java | 15 +- .../apache/qpid/protocol/AMQMethodListener.java | 4 +- .../org/apache/qpid/util/PrettyPrintingUtils.java | 2 + java/perftests/RunningPerformanceTests.txt | 2 +- java/perftests/pom.xml | 20 +- .../apache/qpid/server/failure/HeapExhaustion.java | 6 +- .../apache/qpid/testutil/QpidClientConnection.java | 271 -- .../qpid/testutil/QpidClientConnectionHelper.java | 275 ++ 27 files changed, 3228 insertions(+), 2537 deletions(-) create mode 100644 java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java create mode 100644 java/client/src/main/java/org/apache/qpid/client/failover/FailoverProtectedOperation.java create mode 100644 java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java delete mode 100644 java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java create mode 100644 java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnectionHelper.java (limited to 'java') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 347f5728e2..674f205af6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -20,34 +20,15 @@ */ package org.apache.qpid.client; -import java.io.IOException; -import java.net.ConnectException; -import java.nio.channels.UnresolvedAddressException; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.jms.*; -import javax.jms.IllegalStateException; -import javax.naming.NamingException; -import javax.naming.Reference; -import javax.naming.Referenceable; -import javax.naming.StringRefAddr; - import org.apache.log4j.Logger; import org.apache.qpid.AMQConnectionFailureException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUndeliveredException; import org.apache.qpid.AMQUnresolvedAddressException; -import org.apache.qpid.client.failover.FailoverSupport; +import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.failover.FailoverProtectedOperation; +import org.apache.qpid.client.failover.FailoverRetrySupport; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.transport.TransportConnection; @@ -67,6 +48,27 @@ import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.jms.FailoverPolicy; import org.apache.qpid.url.URLSyntaxException; +import javax.jms.*; +import javax.jms.IllegalStateException; +import javax.naming.NamingException; +import javax.naming.Reference; +import javax.naming.Referenceable; +import javax.naming.StringRefAddr; + +import java.io.IOException; +import java.net.ConnectException; +import java.nio.channels.UnresolvedAddressException; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable { private static final Logger _logger = Logger.getLogger(AMQConnection.class); @@ -96,8 +98,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private AMQProtocolHandler _protocolHandler; /** Maps from session id (Integer) to AMQSession instance */ - private final Map _sessions = new LinkedHashMap(); - + private final Map _sessions = new LinkedHashMap(); private String _clientName; @@ -486,72 +487,72 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect final int prefetchHigh, final int prefetchLow) throws JMSException { checkNotClosed(); + if (channelLimitReached()) { throw new ChannelLimitReachedException(_maximumChannelCount); } - else - { - return (org.apache.qpid.jms.Session) new FailoverSupport() + + return new FailoverRetrySupport( + new FailoverProtectedOperation() + { + public org.apache.qpid.jms.Session execute() throws JMSException, FailoverException { - public Object operation() throws JMSException + int channelId = _idFactory.incrementAndGet(); + + if (_logger.isDebugEnabled()) { - int channelId = _idFactory.incrementAndGet(); + _logger.debug("Write channel open frame for channel id " + channelId); + } + + // We must create the session and register it before actually sending the frame to the server to + // open it, so that there is no window where we could receive data on the channel and not be set + // up to handle it appropriately. + AMQSession session = + new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh, + prefetchLow); + // _protocolHandler.addSessionByChannel(channelId, session); + registerSession(channelId, session); - if (_logger.isDebugEnabled()) + boolean success = false; + try + { + createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted); + success = true; + } + catch (AMQException e) + { + JMSException jmse = new JMSException("Error creating session: " + e); + jmse.setLinkedException(e); + throw jmse; + } + finally + { + if (!success) { - _logger.debug("Write channel open frame for channel id " + channelId); + deregisterSession(channelId); } + } - // We must create the session and register it before actually sending the frame to the server to - // open it, so that there is no window where we could receive data on the channel and not be set - // up to handle it appropriately. - AMQSession session = - new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh, - prefetchLow); - //_protocolHandler.addSessionByChannel(channelId, session); - registerSession(channelId, session); - - boolean success = false; + if (_started) + { try { - createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted); - success = true; + session.start(); } catch (AMQException e) { - JMSException jmse = new JMSException("Error creating session: " + e); - jmse.setLinkedException(e); - throw jmse; - } - finally - { - if (!success) - { - deregisterSession(channelId); - } + throw new JMSAMQException(e); } - - if (_started) - { - try - { - session.start(); - } - catch (AMQException e) - { - throw new JMSAMQException(e); - } - } - - return session; } - }.execute(this); - } + + return session; + } + }, this).execute(); } private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) - throws AMQException + throws AMQException, FailoverException { // TODO: Be aware of possible changes to parameter order as versions change. @@ -581,7 +582,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } - private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) throws AMQException + private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) + throws AMQException, FailoverException { try { @@ -1128,14 +1130,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect * For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling. * The caller must hold the failover mutex before calling this method. */ - public void resubscribeSessions() throws JMSException, AMQException + public void resubscribeSessions() throws JMSException, AMQException, FailoverException { ArrayList sessions = new ArrayList(_sessions.values()); _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey? for (Iterator it = sessions.iterator(); it.hasNext();) { AMQSession s = (AMQSession) it.next(); - //_protocolHandler.addSessionByChannel(s.getChannelId(), s); + // _protocolHandler.addSessionByChannel(s.getChannelId(), s); reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted()); s.resubscribe(); } @@ -1223,7 +1225,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _taskPool.execute(task); } - public AMQSession getSession(int channelId) { return _sessions.get(channelId); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index b7615c5b7b..25c2d94377 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -20,47 +20,16 @@ */ package org.apache.qpid.client; -import java.io.Serializable; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.Map; -import java.util.Arrays; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.jms.BytesMessage; -import javax.jms.Destination; -import javax.jms.IllegalStateException; -import javax.jms.InvalidDestinationException; -import javax.jms.InvalidSelectorException; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.ObjectMessage; -import javax.jms.Queue; -import javax.jms.QueueBrowser; -import javax.jms.QueueReceiver; -import javax.jms.QueueSender; -import javax.jms.QueueSession; -import javax.jms.StreamMessage; -import javax.jms.TemporaryQueue; -import javax.jms.TemporaryTopic; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.jms.TopicPublisher; -import javax.jms.TopicSession; -import javax.jms.TopicSubscriber; - import org.apache.log4j.Logger; + import org.apache.qpid.AMQException; -import org.apache.qpid.AMQUndeliveredException; -import org.apache.qpid.AMQInvalidRoutingKeyException; import org.apache.qpid.AMQInvalidArgumentException; -import org.apache.qpid.client.failover.FailoverSupport; +import org.apache.qpid.AMQInvalidRoutingKeyException; +import org.apache.qpid.AMQUndeliveredException; +import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.failover.FailoverNoopSupport; +import org.apache.qpid.client.failover.FailoverProtectedOperation; +import org.apache.qpid.client.failover.FailoverRetrySupport; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.JMSBytesMessage; import org.apache.qpid.client.message.JMSMapMessage; @@ -70,21 +39,20 @@ import org.apache.qpid.client.message.JMSTextMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.client.protocol.BlockingMethodFrameListener; import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.AccessRequestBody; -import org.apache.qpid.framing.AccessRequestOkBody; import org.apache.qpid.framing.BasicAckBody; import org.apache.qpid.framing.BasicConsumeBody; import org.apache.qpid.framing.BasicConsumeOkBody; import org.apache.qpid.framing.BasicRecoverBody; +import org.apache.qpid.framing.BasicRecoverOkBody; +import org.apache.qpid.framing.BasicRejectBody; import org.apache.qpid.framing.ChannelCloseBody; import org.apache.qpid.framing.ChannelCloseOkBody; import org.apache.qpid.framing.ChannelFlowBody; +import org.apache.qpid.framing.ChannelFlowOkBody; import org.apache.qpid.framing.ExchangeBoundBody; import org.apache.qpid.framing.ExchangeBoundOkBody; import org.apache.qpid.framing.ExchangeDeclareBody; @@ -92,413 +60,624 @@ import org.apache.qpid.framing.ExchangeDeclareOkBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.framing.QueueBindBody; +import org.apache.qpid.framing.QueueBindOkBody; import org.apache.qpid.framing.QueueDeclareBody; +import org.apache.qpid.framing.QueueDeclareOkBody; import org.apache.qpid.framing.QueueDeleteBody; import org.apache.qpid.framing.QueueDeleteOkBody; import org.apache.qpid.framing.TxCommitBody; import org.apache.qpid.framing.TxCommitOkBody; import org.apache.qpid.framing.TxRollbackBody; import org.apache.qpid.framing.TxRollbackOkBody; -import org.apache.qpid.framing.QueueBindOkBody; -import org.apache.qpid.framing.QueueDeclareOkBody; -import org.apache.qpid.framing.ChannelFlowOkBody; -import org.apache.qpid.framing.BasicRecoverOkBody; -import org.apache.qpid.framing.BasicRejectBody; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.URLSyntaxException; +import javax.jms.BytesMessage; +import javax.jms.Destination; +import javax.jms.IllegalStateException; +import javax.jms.InvalidDestinationException; +import javax.jms.InvalidSelectorException; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.QueueBrowser; +import javax.jms.QueueReceiver; +import javax.jms.QueueSender; +import javax.jms.QueueSession; +import javax.jms.StreamMessage; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicPublisher; +import javax.jms.TopicSession; +import javax.jms.TopicSubscriber; + +import java.io.Serializable; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + */ public class AMQSession extends Closeable implements Session, QueueSession, TopicSession { + /** Used for debugging. */ private static final Logger _logger = Logger.getLogger(AMQSession.class); + /** Used for debugging in the dispatcher. */ + private static final Logger _dispatcherLogger = Logger.getLogger(Dispatcher.class); + + /** The default maximum number of prefetched message at which to suspend the channel. */ public static final int DEFAULT_PREFETCH_HIGH_MARK = 5000; + + /** The default minimum number of prefetched messages at which to resume the channel. */ public static final int DEFAULT_PREFETCH_LOW_MARK = 2500; + /** + * The default value for immediate flag used by producers created by this session is false. That is, a consumer does + * not need to be attached to a queue. + */ + protected static final boolean DEFAULT_IMMEDIATE = false; + + /** + * The default value for mandatory flag used by producers created by this session is true. That is, server will not + * silently drop messages where no queue is connected to the exchange for the message. + */ + protected static final boolean DEFAULT_MANDATORY = true; + + /** System property to enable strict AMQP compliance. */ + public static final String STRICT_AMQP = "STRICT_AMQP"; + + /** Strict AMQP default setting. */ + public static final String STRICT_AMQP_DEFAULT = "false"; + + /** System property to enable failure if strict AMQP compliance is violated. */ + public static final String STRICT_AMQP_FATAL = "STRICT_AMQP_FATAL"; + + /** Strickt AMQP failure default. */ + public static final String STRICT_AMQP_FATAL_DEFAULT = "true"; + + /** System property to enable immediate message prefetching. */ + public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH"; + + /** Immediate message prefetch default. */ + public static final String IMMEDIATE_PREFETCH_DEFAULT = "false"; + + /** The connection to which this session belongs. */ private AMQConnection _connection; + /** Used to indicate whether or not this is a transactional session. */ private boolean _transacted; + /** Holds the sessions acknowledgement mode. */ private int _acknowledgeMode; + /** Holds this session unique identifier, used to distinguish it from other sessions. */ private int _channelId; + /** @todo This does not appear to be set? */ private int _ticket; + /** Holds the high mark for prefetched message, at which the session is suspended. */ private int _defaultPrefetchHighMark = DEFAULT_PREFETCH_HIGH_MARK; + + /** Holds the low mark for prefetched messages, below which the session is resumed. */ private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK; + /** Holds the message listener, if any, which is attached to this session. */ private MessageListener _messageListener = null; + /** Used to indicate that this session has been started at least once. */ private AtomicBoolean _startedAtLeastOnce = new AtomicBoolean(false); /** - * Used to reference durable subscribers so they requests for unsubscribe can be handled correctly. Note this only - * keeps a record of subscriptions which have been created in the current instance. It does not remember - * subscriptions between executions of the client + * Used to reference durable subscribers so that requests for unsubscribe can be handled correctly. Note this only + * keeps a record of subscriptions which have been created in the current instance. It does not remember + * subscriptions between executions of the client. */ private final ConcurrentHashMap _subscriptions = - new ConcurrentHashMap(); - private final ConcurrentHashMap _reverseSubscriptionMap = - new ConcurrentHashMap(); + new ConcurrentHashMap(); - /** Used in the consume method. We generate the consume tag on the client so that we can use the nowait feature. */ - private int _nextTag = 1; + /** + * Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked + * up in the {@link #_subscriptions} map. + */ + private final ConcurrentHashMap _reverseSubscriptionMap = + new ConcurrentHashMap(); - /** This queue is bounded and is used to store messages before being dispatched to the consumer */ + /** + * Used to hold incoming messages. + * + * @todo Weaken the type once {@link FlowControllingBlockingQueue} implements Queue. + */ private final FlowControllingBlockingQueue _queue; + /** Holds the dispatcher thread for this session. */ private Dispatcher _dispatcher; + /** Holds the message factory factory for this session. */ private MessageFactoryRegistry _messageFactoryRegistry; - /** Set of all producers created by this session */ - private Map _producers = new ConcurrentHashMap(); - - /** Maps from consumer tag (String) to JMSMessageConsumer instance */ - private Map _consumers = new ConcurrentHashMap(); - - /** Maps from destination to count of JMSMessageConsumers */ - private ConcurrentHashMap _destinationConsumerCount = - new ConcurrentHashMap(); + /** Holds all of the producers created by this session, keyed by their unique identifiers. */ + private Map _producers = new ConcurrentHashMap(); /** - * Default value for immediate flag used by producers created by this session is false, i.e. a consumer does not - * need to be attached to a queue + * Used as a source of unique identifiers so that the consumers can be tagged to match them to BasicConsume methods. */ - protected static final boolean DEFAULT_IMMEDIATE = false; + private int _nextTag = 1; /** - * Default value for mandatory flag used by producers created by this sessio is true, i.e. server will not silently - * drop messages where no queue is connected to the exchange for the message + * Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right + * consumer. */ - protected static final boolean DEFAULT_MANDATORY = true; + private Map _consumers = + new ConcurrentHashMap(); + + /** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */ + private ConcurrentHashMap _destinationConsumerCount = + new ConcurrentHashMap(); /** - * The counter of the next producer id. This id is generated by the session and used only to allow the producer to - * identify itself to the session when deregistering itself.

Access to this id does not require to be - * synchronized since according to the JMS specification only one thread of control is allowed to create producers - * for any given session instance. + * Used as a source of unique identifiers for producers within the session. + * + *

Access to this id does not require to be synchronized since according to the JMS specification only one + * thread of control is allowed to create producers for any given session instance. */ private long _nextProducerId; - /** * Set when recover is called. This is to handle the case where recover() is called by application code during - * onMessage() processing. We need to make sure we do not send an auto ack if recover was called. + * onMessage() processing to enure that an auto ack is not sent. */ private boolean _inRecovery; + /** Used to indicates that the connection to which this session belongs, has been stopped. */ private boolean _connectionStopped; + /** Used to indicate that this session has a message listener attached to it. */ private boolean _hasMessageListeners; + /** Used to indicate that this session has been suspended. */ private boolean _suspended; + /** + * Used to protect the suspension of this session, so that critical code can be executed during suspension, + * without the session being resumed by other threads. + */ private final Object _suspensionLock = new Object(); - /** Boolean to control immediate prefetch . Records the first call to the dispatcher to prevent further flow(true) */ + /** + * Used to ensure that onlt the first call to start the dispatcher can unsuspend the channel. + * + * @todo This is accessed only within a synchronized method, so does not need to be atomic. + */ private final AtomicBoolean _firstDispatcher = new AtomicBoolean(true); - /** System property to enable strickt AMQP compliance */ - public static final String STRICT_AMQP = "STRICT_AMQP"; - /** Strickt AMQP default */ - public static final String STRICT_AMQP_DEFAULT = "false"; + /** Used to indicate that the session should start pre-fetching messages as soon as it is started. */ + private final boolean _immediatePrefetch; + /** Indicates that warnings should be generated on violations of the strict AMQP. */ private final boolean _strictAMQP; - /** System property to enable strickt AMQP compliance */ - public static final String STRICT_AMQP_FATAL = "STRICT_AMQP_FATAL"; - /** Strickt AMQP default */ - public static final String STRICT_AMQP_FATAL_DEFAULT = "true"; - + /** Indicates that runtime exceptions should be generated on vilations of the strict AMQP. */ private final boolean _strictAMQPFATAL; - /** System property to enable immediate message prefetching */ - public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH"; - /** Immediate message prefetch default */ - public static final String IMMEDIATE_PREFETCH_DEFAULT = "false"; - - private final boolean _immediatePrefetch; - - private static final Logger _dispatcherLogger = Logger.getLogger(Dispatcher.class); - - /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ - private class Dispatcher extends Thread + /** + * Creates a new session on a connection. + * + * @param con The connection on which to create the session. + * @param channelId The unique identifier for the session. + * @param transacted Indicates whether or not the session is transactional. + * @param acknowledgeMode The acknoledgement mode for the session. + * @param messageFactoryRegistry The message factory factory for the session. + * @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session. + * @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session. + */ + AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, + MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark) { - /** Track the 'stopped' state of the dispatcher, a session starts in the stopped state. */ - private final AtomicBoolean _closed = new AtomicBoolean(false); - - private final Object _lock = new Object(); + _strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT)); + _strictAMQPFATAL = + Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT)); + _immediatePrefetch = + _strictAMQP + || Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT)); - public Dispatcher() + _connection = con; + _transacted = transacted; + if (transacted) { - super("Dispatcher-Channel-" + _channelId); - if (_dispatcherLogger.isInfoEnabled()) - { - _dispatcherLogger.info(getName() + " created"); - } + _acknowledgeMode = javax.jms.Session.SESSION_TRANSACTED; } - - public void run() + else { - if (_dispatcherLogger.isInfoEnabled()) - { - _dispatcherLogger.info(getName() + " started"); - } - - UnprocessedMessage message; + _acknowledgeMode = acknowledgeMode; + } - // Allow disptacher to start stopped - synchronized (_lock) - { - while (connectionStopped()) - { - try - { - _lock.wait(); - } - catch (InterruptedException e) - { - // ignore - } - } - } + _channelId = channelId; + _messageFactoryRegistry = messageFactoryRegistry; + _defaultPrefetchHighMark = defaultPrefetchHighMark; + _defaultPrefetchLowMark = defaultPrefetchLowMark; - try - { - while (!_closed.get() && (message = (UnprocessedMessage) _queue.take()) != null) - { - synchronized (_lock) + if (_acknowledgeMode == NO_ACKNOWLEDGE) + { + _queue = + new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark, + new FlowControllingBlockingQueue.ThresholdListener() { - - while (connectionStopped()) + public void aboveThreshold(int currentValue) { - _lock.wait(); + if (_acknowledgeMode == NO_ACKNOWLEDGE) + { + _logger.debug( + "Above threshold(" + _defaultPrefetchHighMark + + ") so suspending channel. Current value is " + currentValue); + new Thread(new SuspenderRunner(true)).start(); + } } - dispatchMessage(message); - - while (connectionStopped()) + public void underThreshold(int currentValue) { - _lock.wait(); + if (_acknowledgeMode == NO_ACKNOWLEDGE) + { + _logger.debug( + "Below threshold(" + _defaultPrefetchLowMark + + ") so unsuspending channel. Current value is " + currentValue); + new Thread(new SuspenderRunner(false)).start(); + } } + }); + } + else + { + _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, null); + } + } - } + /** + * Creates a new session on a connection with the default message factory factory. + * + * @param con The connection on which to create the session. + * @param channelId The unique identifier for the session. + * @param transacted Indicates whether or not the session is transactional. + * @param acknowledgeMode The acknoledgement mode for the session. + * @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session. + * @param defaultPrefetchLow The number of prefetched messages at which to resume the session. + */ + AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, + int defaultPrefetchLow) + { + this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, + defaultPrefetchLow); + } - } - } - catch (InterruptedException e) - { - //ignore - } - if (_dispatcherLogger.isInfoEnabled()) - { - _dispatcherLogger.info(getName() + " thread terminating for channel " + _channelId); - } + /** + * Acknowledges all unacknowledged messages on the session, for all message consumers on the session. + * + * @throws IllegalStateException If the session is closed. + */ + public void acknowledge() throws IllegalStateException + { + if (isClosed()) + { + throw new IllegalStateException("Session is already closed"); } - // only call while holding lock - final boolean connectionStopped() + for (BasicMessageConsumer consumer : _consumers.values()) { - return _connectionStopped; + consumer.acknowledge(); } + } - boolean setConnectionStopped(boolean connectionStopped) - { - boolean currently; - synchronized (_lock) - { - currently = _connectionStopped; - _connectionStopped = connectionStopped; - _lock.notify(); + /** + * Acknowledge one or many messages. + * + * @param deliveryTag The tag of the last message to be acknowledged. + * @param multiple true to acknowledge all messages up to and including the one specified by the + * delivery tag, false to just acknowledge that message. + * + * @todo Be aware of possible changes to parameter order as versions change. + */ + public void acknowledgeMessage(long deliveryTag, boolean multiple) + { + final AMQFrame ackFrame = + BasicAckBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag, + multiple); - if (_dispatcherLogger.isDebugEnabled()) - { - _dispatcherLogger.debug("Set Dispatcher Connection " + (connectionStopped ? "Stopped" : "Started") + - ": Currently " + (currently ? "Stopped" : "Started")); - } - } - return currently; + if (_logger.isDebugEnabled()) + { + _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId); } - private void dispatchMessage(UnprocessedMessage message) - { - if (message.getDeliverBody() != null) - { - final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.getDeliverBody().consumerTag); + getProtocolHandler().writeFrame(ackFrame); + } - if (consumer == null || consumer.isClosed()) - { - if (_dispatcherLogger.isInfoEnabled()) - { - if (consumer == null) - { - _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + - "[" + message.getDeliverBody().deliveryTag + "] from queue " + - message.getDeliverBody().consumerTag + - " )without a handler - rejecting(requeue)..."); - } - else - { - _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + - "[" + message.getDeliverBody().deliveryTag + "] from queue " + - " consumer(" + consumer.debugIdentity() + - ") is closed rejecting(requeue)..."); - } - } - // Don't reject if we're already closing - if (!_closed.get()) - { - rejectMessage(message, true); - } - } - else + /** + * Binds the named queue, with the specified routing key, to the named exchange. + * + *

Note that this operation automatically retries in the event of fail-over. + * + * @param queueName The name of the queue to bind. + * @param routingKey The routing key to bind the queue with. + * @param arguments Additional arguments. + * @param exchangeName The exchange to bind the queue on. + * + * @throws AMQException If the queue cannot be bound for any reason. + * + * @todo Be aware of possible changes to parameter order as versions change. + * + * @todo Document the additional arguments that may be passed in the field table. Are these for headers exchanges? + */ + public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, + final AMQShortString exchangeName) throws AMQException + { + /*new FailoverRetrySupport(new FailoverProtectedOperation()*/ + new FailoverNoopSupport(new FailoverProtectedOperation() + { + public Object execute() throws AMQException, FailoverException { - consumer.notifyMessage(message, _channelId); + AMQFrame queueBind = + QueueBindBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), + arguments, // arguments + exchangeName, // exchange + false, // nowait + queueName, // queue + routingKey, // routingKey + getTicket()); // ticket + + getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class); + + return null; } - } - } - - public void close() - { - _closed.set(true); - interrupt(); + }, _connection).execute(); + } - //fixme awaitTermination + /** + * Closes the session with no timeout. + * + * @throws JMSException If the JMS provider fails to close the session due to some internal error. + */ + public void close() throws JMSException + { + close(-1); + } + /** + * Closes the session. + * + *

Note that this operation succeeds automatically if a fail-over interupts the sycnronous request to close + * the channel. This is because the channel is marked as closed before the request to close it is made, so the + * fail-over should not re-open it. + * + * @param timeout The timeout in milliseconds to wait for the session close acknoledgement from the broker. + * + * @throws JMSException If the JMS provider fails to close the session due to some internal error. + * + * @todo Be aware of possible changes to parameter order as versions change. + * + * @todo Not certain about the logic of ignoring the failover exception, because the channel won't be + * re-opened. May need to examine this more carefully. + * + * @todo Note that taking the failover mutex doesn't prevent this operation being interrupted by a failover, + * because the failover process sends the failover event before acquiring the mutex itself. + */ + public void close(long timeout) throws JMSException + { + if (_logger.isInfoEnabled()) + { + _logger.info("Closing session: " + this + ":" + + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); } - public void rollback() + // We must close down all producers and consumers in an orderly fashion. This is the only method + // that can be called from a different thread of control from the one controlling the session. + synchronized (_connection.getFailoverMutex()) { - - synchronized (_lock) + // Ensure we only try and close an open session. + if (!_closed.getAndSet(true)) { - boolean isStopped = connectionStopped(); + // we pass null since this is not an error case + closeProducersAndConsumers(null); - if (!isStopped) + try { - setConnectionStopped(true); - } - - rejectAllMessages(true); - _dispatcherLogger.debug("Session Pre Dispatch Queue cleared"); + getProtocolHandler().closeSession(this); - for (BasicMessageConsumer consumer : _consumers.values()) - { - if (!consumer.isNoConsume()) - { - consumer.rollback(); - } - else - { - // should perhaps clear the _SQ here. - //consumer._synchronousQueue.clear(); - consumer.clearReceiveQueue(); - } + final AMQFrame frame = + ChannelCloseBody.createAMQFrame(getChannelId(), getProtocolMajorVersion(), getProtocolMinorVersion(), + 0, // classId + 0, // methodId + AMQConstant.REPLY_SUCCESS.getCode(), // replyCode + new AMQShortString("JMS client closing channel")); // replyText + getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout); + // When control resumes at this point, a reply will have been received that + // indicates the broker has closed the channel successfully. + } + catch (AMQException e) + { + JMSException jmse = new JMSException("Error closing session: " + e); + jmse.setLinkedException(e); + throw jmse; + } + // This is ignored because the channel is already marked as closed so the fail-over process will + // not re-open it. + catch (FailoverException e) + { + _logger.debug( + "Got FailoverException during channel close, ignored as channel already marked as closed."); + } + finally + { + _connection.deregisterSession(_channelId); } - - setConnectionStopped(isStopped); } - } + } - public void rejectPending(BasicMessageConsumer consumer) + /** + * Called when the server initiates the closure of the session unilaterally. + * + * @param e the exception that caused this session to be closed. Null causes the + */ + public void closed(Throwable e) throws JMSException + { + synchronized (_connection.getFailoverMutex()) { - synchronized (_lock) + // An AMQException has an error code and message already and will be passed in when closure occurs as a + // result of a channel close request + _closed.set(true); + AMQException amqe; + if (e instanceof AMQException) { - boolean stopped = _dispatcher.connectionStopped(); - - if (!stopped) - { - _dispatcher.setConnectionStopped(true); - } + amqe = (AMQException) e; + } + else + { + amqe = new AMQException("Closing session forcibly", e); + } - // Reject messages on pre-receive queue - consumer.rollback(); + _connection.deregisterSession(_channelId); + closeProducersAndConsumers(amqe); + } + } - // Reject messages on pre-dispatch queue - rejectMessagesForConsumerTag(consumer.getConsumerTag(), true); + /** + * Commits all messages done in this transaction and releases any locks currently held. + * + *

If the commit fails, because the commit itself is interrupted by a fail-over between requesting that the + * commit be done, and receiving an acknowledgement that it has been done, then a JMSException will be thrown. + * The client will be unable to determine whether or not the commit actually happened on the broker in this case. + * + * @throws JMSException If the JMS provider fails to commit the transaction due to some internal error. This does + * not mean that the commit is known to have failed, merely that it is not known whether it + * failed or not. + * + * @todo Be aware of possible changes to parameter order as versions change. + */ + public void commit() throws JMSException + { + checkTransacted(); - // closeConsumer - consumer.markClosed(); + try + { + // Acknowledge up to message last delivered (if any) for each consumer. + // need to send ack for messages delivered to consumers so far + for (Iterator i = _consumers.values().iterator(); i.hasNext();) + { + // Sends acknowledgement to server + i.next().acknowledgeLastDelivered(); + } - _dispatcher.setConnectionStopped(stopped); + // Commits outstanding messages sent and outstanding acknowledgements. + final AMQProtocolHandler handler = getProtocolHandler(); - } + handler.syncWrite(TxCommitBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), + TxCommitOkBody.class); + } + catch (AMQException e) + { + throw new JMSAMQException("Failed to commit: " + e.getMessage(), e); + } + catch (FailoverException e) + { + throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e); } } + public void confirmConsumerCancelled(AMQShortString consumerTag) + { + // Remove the consumer from the map + BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag); + if (consumer != null) + { + // fixme this isn't right.. needs to check if _queue contains data for this consumer + if (consumer.isAutoClose()) // && _queue.isEmpty()) + { + consumer.closeWhenNoMessages(true); + } - AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, - MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark) - { + if (!consumer.isNoConsume()) + { + // Clean the Maps up first + // Flush any pending messages for this consumerTag + if (_dispatcher != null) + { + _logger.info("Dispatcher is not null"); + } + else + { + _logger.info("Dispatcher is null so created stopped dispatcher"); - _strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT)); - _strictAMQPFATAL = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT)); - _immediatePrefetch = _strictAMQP || Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT)); + startDistpatcherIfNecessary(true); + } - _connection = con; - _transacted = transacted; - if (transacted) - { - _acknowledgeMode = javax.jms.Session.SESSION_TRANSACTED; + _dispatcher.rejectPending(consumer); + } + else + { + // Just close the consumer + // fixme the CancelOK is being processed before the arriving messages.. + // The dispatcher is still to process them so the server sent in order but the client + // has yet to receive before the close comes in. + + // consumer.markClosed(); + } } else { - _acknowledgeMode = acknowledgeMode; + _logger.warn("Unable to confirm cancellation of consumer (" + consumerTag + "). Not found in consumer map."); } - _channelId = channelId; - _messageFactoryRegistry = messageFactoryRegistry; - _defaultPrefetchHighMark = defaultPrefetchHighMark; - _defaultPrefetchLowMark = defaultPrefetchLowMark; - if (_acknowledgeMode == NO_ACKNOWLEDGE) + } + + public QueueBrowser createBrowser(Queue queue) throws JMSException + { + if (isStrictAMQP()) { - _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark, - new FlowControllingBlockingQueue.ThresholdListener() - { - public void aboveThreshold(int currentValue) - { - if (_acknowledgeMode == NO_ACKNOWLEDGE) - { - _logger.warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + currentValue); - new Thread(new SuspenderRunner(true)).start(); - } - } - - public void underThreshold(int currentValue) - { - if (_acknowledgeMode == NO_ACKNOWLEDGE) - { - _logger.warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + currentValue); - new Thread(new SuspenderRunner(false)).start(); - } - } - }); + throw new UnsupportedOperationException(); } - else + + return createBrowser(queue, null); + } + + public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException + { + if (isStrictAMQP()) { - _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, null); + throw new UnsupportedOperationException(); } - } + checkNotClosed(); + checkValidQueue(queue); - AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow) - { - this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow); + return new AMQQueueBrowser(this, (AMQQueue) queue, messageSelector); } - public AMQConnection getAMQConnection() + public MessageConsumer createBrowserConsumer(Destination destination, String messageSelector, boolean noLocal) + throws JMSException { - return _connection; + checkValidDestination(destination); + + return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false, + messageSelector, null, true, true); } public BytesMessage createBytesMessage() throws JMSException @@ -506,1506 +685,1560 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi synchronized (_connection.getFailoverMutex()) { checkNotClosed(); + return new JMSBytesMessage(); } } - public MapMessage createMapMessage() throws JMSException + public MessageConsumer createConsumer(Destination destination) throws JMSException { - synchronized (_connection.getFailoverMutex()) - { - checkNotClosed(); - return new JMSMapMessage(); - } - } + checkValidDestination(destination); - public javax.jms.Message createMessage() throws JMSException - { - return createBytesMessage(); + return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, null, null, + false, false); } - public ObjectMessage createObjectMessage() throws JMSException + public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { - synchronized (_connection.getFailoverMutex()) - { - checkNotClosed(); - return (ObjectMessage) new JMSObjectMessage(); - } - } + checkValidDestination(destination); - public ObjectMessage createObjectMessage(Serializable object) throws JMSException - { - ObjectMessage msg = createObjectMessage(); - msg.setObject(object); - return msg; + return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, + messageSelector, null, false, false); } - public StreamMessage createStreamMessage() throws JMSException + public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) + throws JMSException { - synchronized (_connection.getFailoverMutex()) - { - checkNotClosed(); + checkValidDestination(destination); - return new JMSStreamMessage(); - } + return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false, + messageSelector, null, false, false); } - public TextMessage createTextMessage() throws JMSException + public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive, + String selector) throws JMSException { - synchronized (_connection.getFailoverMutex()) - { - checkNotClosed(); + checkValidDestination(destination); - return new JMSTextMessage(); - } + return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null, false, false); } - public TextMessage createTextMessage(String text) throws JMSException + public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal, + boolean exclusive, String selector) throws JMSException { + checkValidDestination(destination); - TextMessage msg = createTextMessage(); - msg.setText(text); - return msg; + return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, false, false); } - public boolean getTransacted() throws JMSException + public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive, + String selector, FieldTable rawSelector) throws JMSException { - checkNotClosed(); - return _transacted; + checkValidDestination(destination); + + return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, rawSelector, false, false); } - public int getAcknowledgeMode() throws JMSException + public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal, + boolean exclusive, String selector, FieldTable rawSelector) throws JMSException { - checkNotClosed(); - return _acknowledgeMode; + checkValidDestination(destination); + + return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, false, + false); } - public void commit() throws JMSException + public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { - checkTransacted(); - try + + checkNotClosed(); + AMQTopic origTopic = checkValidTopic(topic); + AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection); + TopicSubscriberAdaptor subscriber = _subscriptions.get(name); + if (subscriber != null) { - // Acknowledge up to message last delivered (if any) for each consumer. - //need to send ack for messages delivered to consumers so far - for (Iterator i = _consumers.values().iterator(); i.hasNext();) + if (subscriber.getTopic().equals(topic)) { - //Sends acknowledgement to server - i.next().acknowledgeLastDelivered(); + throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange " + + name); + } + else + { + unsubscribe(name); } - - // Commits outstanding messages sent and outstanding acknowledgements. - // TODO: Be aware of possible changes to parameter order as versions change. - final AMQProtocolHandler handler = getProtocolHandler(); - - handler.syncWrite(TxCommitBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), - getProtocolMinorVersion()), - TxCommitOkBody.class); - } - catch (AMQException e) - { - JMSException exception = new JMSException("Failed to commit: " + e.getMessage()); - exception.setLinkedException(e); - throw exception; } - } - - - public void rollback() throws JMSException - { - synchronized (_suspensionLock) + else { - checkTransacted(); - try + AMQShortString topicName; + if (topic instanceof AMQTopic) { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - - boolean isSuspended = isSuspended(); + topicName = ((AMQTopic) topic).getDestinationName(); + } + else + { + topicName = new AMQShortString(topic.getTopicName()); + } - if (!isSuspended) + if (_strictAMQP) + { + if (_strictAMQPFATAL) { - suspendChannel(true); + throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP."); } - - if (_dispatcher != null) + else { - _dispatcher.rollback(); + _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' " + + "for creation durableSubscriber. Requesting queue deletion regardless."); } - _connection.getProtocolHandler().syncWrite( - TxRollbackBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class); - - - if (!isSuspended) - { - suspendChannel(false); - } + deleteQueue(dest.getAMQQueueName()); } - catch (AMQException e) + else { - throw (JMSException) (new JMSException("Failed to rollback: " + e).initCause(e)); + // if the queue is bound to the exchange but NOT for this topic, then the JMS spec + // says we must trash the subscription. + if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) + && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName)) + { + deleteQueue(dest.getAMQQueueName()); + } } } + + subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); + + _subscriptions.put(name, subscriber); + _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); + + return subscriber; } - public void close() throws JMSException + /** Note, currently this does not handle reuse of the same name with different topics correctly. */ + public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) + throws JMSException { - close(-1); + checkNotClosed(); + checkValidTopic(topic); + AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection); + BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal); + TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer); + _subscriptions.put(name, subscriber); + _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); + + return subscriber; } - public void close(long timeout) throws JMSException + public MapMessage createMapMessage() throws JMSException { - if (_logger.isInfoEnabled()) + synchronized (_connection.getFailoverMutex()) { - _logger.info("Closing session: " + this + ":" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); + checkNotClosed(); + + return new JMSMapMessage(); } + } - // We must close down all producers and consumers in an orderly fashion. This is the only method - // that can be called from a different thread of control from the one controlling the session + public javax.jms.Message createMessage() throws JMSException + { + return createBytesMessage(); + } + + public ObjectMessage createObjectMessage() throws JMSException + { synchronized (_connection.getFailoverMutex()) { - //Ensure we only try and close an open session. - if (!_closed.getAndSet(true)) - { - // we pass null since this is not an error case - closeProducersAndConsumers(null); - - try - { + checkNotClosed(); - getProtocolHandler().closeSession(this); - // TODO: Be aware of possible changes to parameter order as versions change. - final AMQFrame frame = ChannelCloseBody.createAMQFrame(getChannelId(), - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - 0, // classId - 0, // methodId - AMQConstant.REPLY_SUCCESS.getCode(), // replyCode - new AMQShortString("JMS client closing channel")); // replyText + return (ObjectMessage) new JMSObjectMessage(); + } + } - getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout); - // When control resumes at this point, a reply will have been received that - // indicates the broker has closed the channel successfully + public ObjectMessage createObjectMessage(Serializable object) throws JMSException + { + ObjectMessage msg = createObjectMessage(); + msg.setObject(object); - } - catch (AMQException e) - { - JMSException jmse = new JMSException("Error closing session: " + e); - jmse.setLinkedException(e); - throw jmse; - } - finally - { - _connection.deregisterSession(_channelId); - } - } - } + return msg; } - private AMQProtocolHandler getProtocolHandler() + public BasicMessageProducer createProducer(Destination destination) throws JMSException { - return _connection.getProtocolHandler(); + return createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE); } + public BasicMessageProducer createProducer(Destination destination, boolean immediate) throws JMSException + { + return createProducerImpl(destination, DEFAULT_MANDATORY, immediate); + } - private byte getProtocolMinorVersion() + public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate) + throws JMSException { - return getProtocolHandler().getProtocolMinorVersion(); + return createProducerImpl(destination, mandatory, immediate); } - private byte getProtocolMajorVersion() + public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate, + boolean waitUntilSent) throws JMSException { - return getProtocolHandler().getProtocolMajorVersion(); + return createProducerImpl(destination, mandatory, immediate, waitUntilSent); } + public TopicPublisher createPublisher(Topic topic) throws JMSException + { + checkNotClosed(); - /** - * Close all producers or consumers. This is called either in the error case or when closing the session normally. - * - * @param amqe the exception, may be null to indicate no error has occurred - */ - private void closeProducersAndConsumers(AMQException amqe) throws JMSException + return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic), topic); + } + + public Queue createQueue(String queueName) throws JMSException { - JMSException jmse = null; - try - { - closeProducers(); - } - catch (JMSException e) - { - _logger.error("Error closing session: " + e, e); - jmse = e; - } - try + checkNotClosed(); + if (queueName.indexOf('/') == -1) { - closeConsumers(amqe); + return new AMQQueue(getDefaultQueueExchangeName(), new AMQShortString(queueName)); } - catch (JMSException e) + else { - _logger.error("Error closing session: " + e, e); - if (jmse == null) + try { - jmse = e; + return new AMQQueue(new AMQBindingURL(queueName)); } - } - if (jmse != null) - { - throw jmse; - } - } - + catch (URLSyntaxException urlse) + { + JMSException jmse = new JMSException(urlse.getReason()); + jmse.setLinkedException(urlse); - public boolean isSuspended() - { - return _suspended; + throw jmse; + } + } } - /** - * Called when the server initiates the closure of the session unilaterally. + * Declares the named queue. * - * @param e the exception that caused this session to be closed. Null causes the + *

Note that this operation automatically retries in the event of fail-over. + * + * @param name The name of the queue to declare. + * @param autoDelete + * @param durable Flag to indicate that the queue is durable. + * @param exclusive Flag to indicate that the queue is exclusive to this client. + * + * @throws AMQException If the queue cannot be declared for any reason. + * + * @todo Be aware of possible changes to parameter order as versions change. */ - public void closed(Throwable e) throws JMSException + public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable, + final boolean exclusive) throws AMQException { - synchronized (_connection.getFailoverMutex()) - { - // An AMQException has an error code and message already and will be passed in when closure occurs as a - // result of a channel close request - _closed.set(true); - AMQException amqe; - if (e instanceof AMQException) + new FailoverRetrySupport(new FailoverProtectedOperation() { - amqe = (AMQException) e; - } - else - { - amqe = new AMQException("Closing session forcibly", e); - } - _connection.deregisterSession(_channelId); - closeProducersAndConsumers(amqe); - } + public Object execute() throws AMQException, FailoverException + { + AMQFrame queueDeclare = + QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), + null, // arguments + autoDelete, // autoDelete + durable, // durable + exclusive, // exclusive + false, // nowait + false, // passive + name, // queue + getTicket()); // ticket + + getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class); + + return null; + } + }, _connection).execute(); } /** - * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after failover - * when the client has veoted resubscription.

The caller of this method must already hold the failover mutex. + * Creates a QueueReceiver + * + * @param destination + * + * @return QueueReceiver - a wrapper around our MessageConsumer + * + * @throws JMSException */ - void markClosed() + public QueueReceiver createQueueReceiver(Destination destination) throws JMSException { - _closed.set(true); - _connection.deregisterSession(_channelId); - markClosedProducersAndConsumers(); + checkValidDestination(destination); + AMQQueue dest = (AMQQueue) destination; + BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination); + return new QueueReceiverAdaptor(dest, consumer); } - private void markClosedProducersAndConsumers() + /** + * Creates a QueueReceiver using a message selector + * + * @param destination + * @param messageSelector + * + * @return QueueReceiver - a wrapper around our MessageConsumer + * + * @throws JMSException + */ + public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException { - try - { - // no need for a markClosed* method in this case since there is no protocol traffic closing a producer - closeProducers(); - } - catch (JMSException e) - { - _logger.error("Error closing session: " + e, e); - } - try - { - markClosedConsumers(); - } - catch (JMSException e) - { - _logger.error("Error closing session: " + e, e); - } + checkValidDestination(destination); + AMQQueue dest = (AMQQueue) destination; + BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination, messageSelector); + + return new QueueReceiverAdaptor(dest, consumer); } /** - * Called to close message producers cleanly. This may or may not be as a result of an error. There is - * currently no way of propagating errors to message producers (this is a JMS limitation). + * Creates a QueueReceiver wrapping a MessageConsumer + * + * @param queue + * + * @return QueueReceiver + * + * @throws JMSException */ - private void closeProducers() throws JMSException + public QueueReceiver createReceiver(Queue queue) throws JMSException { - // we need to clone the list of producers since the close() method updates the _producers collection - // which would result in a concurrent modification exception - final ArrayList clonedProducers = new ArrayList(_producers.values()); + checkNotClosed(); + AMQQueue dest = (AMQQueue) queue; + BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest); - final Iterator it = clonedProducers.iterator(); - while (it.hasNext()) - { - final BasicMessageProducer prod = (BasicMessageProducer) it.next(); - prod.close(); - } - // at this point the _producers map is empty + return new QueueReceiverAdaptor(dest, consumer); } /** - * Called to close message consumers cleanly. This may or may not be as a result of an error. + * Creates a QueueReceiver wrapping a MessageConsumer using a message selector * - * @param error not null if this is a result of an error occurring at the connection level + * @param queue + * @param messageSelector + * + * @return QueueReceiver + * + * @throws JMSException */ - private void closeConsumers(Throwable error) throws JMSException + public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException { - if (_dispatcher != null) - { - _dispatcher.close(); - _dispatcher = null; - } - // we need to clone the list of consumers since the close() method updates the _consumers collection - // which would result in a concurrent modification exception - final ArrayList clonedConsumers = new ArrayList(_consumers.values()); + checkNotClosed(); + AMQQueue dest = (AMQQueue) queue; + BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector); - final Iterator it = clonedConsumers.iterator(); - while (it.hasNext()) - { - final BasicMessageConsumer con = it.next(); - if (error != null) - { - con.notifyError(error); - } - else - { - con.close(); - } - } - // at this point the _consumers map will be empty + return new QueueReceiverAdaptor(dest, consumer); } - private void markClosedConsumers() throws JMSException + public QueueSender createSender(Queue queue) throws JMSException { - if (_dispatcher != null) - { - _dispatcher.close(); - _dispatcher = null; - } - // we need to clone the list of consumers since the close() method updates the _consumers collection - // which would result in a concurrent modification exception - final ArrayList clonedConsumers = new ArrayList(_consumers.values()); + checkNotClosed(); - final Iterator it = clonedConsumers.iterator(); - while (it.hasNext()) + // return (QueueSender) createProducer(queue); + return new QueueSenderAdapter(createProducer(queue), queue); + } + + public StreamMessage createStreamMessage() throws JMSException + { + synchronized (_connection.getFailoverMutex()) { - final BasicMessageConsumer con = it.next(); - con.markClosed(); + checkNotClosed(); + + return new JMSStreamMessage(); } - // at this point the _consumers map will be empty } /** - * Asks the broker to resend all unacknowledged messages for the session. + * Creates a non-durable subscriber + * + * @param topic + * + * @return TopicSubscriber - a wrapper round our MessageConsumer + * + * @throws JMSException + */ + public TopicSubscriber createSubscriber(Topic topic) throws JMSException + { + checkNotClosed(); + AMQTopic dest = checkValidTopic(topic); + + // AMQTopic dest = new AMQTopic(topic.getTopicName()); + return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); + } + + /** + * Creates a non-durable subscriber with a message selector + * + * @param topic + * @param messageSelector + * @param noLocal + * + * @return TopicSubscriber - a wrapper round our MessageConsumer * * @throws JMSException */ - public void recover() throws JMSException + public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { checkNotClosed(); - checkNotTransacted(); // throws IllegalStateException if a transacted session - // this is set only here, and the before the consumer's onMessage is called it is set to false - _inRecovery = true; - try - { + AMQTopic dest = checkValidTopic(topic); - boolean isSuspended = isSuspended(); + // AMQTopic dest = new AMQTopic(topic.getTopicName()); + return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal)); + } - if (!isSuspended) - { - suspendChannel(true); - } + public TemporaryQueue createTemporaryQueue() throws JMSException + { + checkNotClosed(); - for (BasicMessageConsumer consumer : _consumers.values()) - { - consumer.clearUnackedMessages(); - } + return new AMQTemporaryQueue(this); + } - if (_dispatcher != null) - { - _dispatcher.rollback(); - } + public TemporaryTopic createTemporaryTopic() throws JMSException + { + checkNotClosed(); - if (isStrictAMQP()) - { - // We can't use the BasicRecoverBody-OK method as it isn't part of the spec. - _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), - getProtocolMinorVersion(), - false)); // requeue - _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order."); - } - else - { + return new AMQTemporaryTopic(this); + } - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - _connection.getProtocolHandler().syncWrite(BasicRecoverBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), - getProtocolMinorVersion(), - false) // requeue - , BasicRecoverOkBody.class); - } - if (!isSuspended) - { - suspendChannel(false); - } - } - catch (AMQException e) + public TextMessage createTextMessage() throws JMSException + { + synchronized (_connection.getFailoverMutex()) { - throw new JMSAMQException(e); + checkNotClosed(); + + return new JMSTextMessage(); } } - boolean isInRecovery() + public TextMessage createTextMessage(String text) throws JMSException { - return _inRecovery; - } - void setInRecovery(boolean inRecovery) - { - _inRecovery = inRecovery; + TextMessage msg = createTextMessage(); + msg.setText(text); + + return msg; } - public void acknowledge() throws JMSException + public Topic createTopic(String topicName) throws JMSException { - if (isClosed()) + checkNotClosed(); + + if (topicName.indexOf('/') == -1) { - throw new IllegalStateException("Session is already closed"); + return new AMQTopic(getDefaultTopicExchangeName(), new AMQShortString(topicName)); } - for (BasicMessageConsumer consumer : _consumers.values()) + else { - consumer.acknowledge(); - } - + try + { + return new AMQTopic(new AMQBindingURL(topicName)); + } + catch (URLSyntaxException urlse) + { + JMSException jmse = new JMSException(urlse.getReason()); + jmse.setLinkedException(urlse); + throw jmse; + } + } } - - public MessageListener getMessageListener() throws JMSException + public void declareExchange(AMQShortString name, AMQShortString type, boolean nowait) throws AMQException { -// checkNotClosed(); - return _messageListener; + declareExchange(name, type, getProtocolHandler(), nowait); } - public void setMessageListener(MessageListener listener) throws JMSException + public int getAcknowledgeMode() throws JMSException { -// checkNotClosed(); -// -// if (_dispatcher != null && !_dispatcher.connectionStopped()) -// { -// throw new javax.jms.IllegalStateException("Attempt to set listener while session is started."); -// } -// -// // We are stopped -// for (Iterator i = _consumers.values().iterator(); i.hasNext();) -// { -// BasicMessageConsumer consumer = i.next(); -// -// if (consumer.isReceiving()) -// { -// throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously."); -// } -// } -// -// _messageListener = listener; -// -// for (Iterator i = _consumers.values().iterator(); i.hasNext();) -// { -// i.next().setMessageListener(_messageListener); -// } + checkNotClosed(); + return _acknowledgeMode; } - public void run() + public AMQConnection getAMQConnection() { - throw new java.lang.UnsupportedOperationException(); + return _connection; } - public BasicMessageProducer createProducer(Destination destination, boolean mandatory, - boolean immediate, boolean waitUntilSent) - throws JMSException + public int getChannelId() { - return createProducerImpl(destination, mandatory, immediate, waitUntilSent); + return _channelId; } - public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate) - throws JMSException + public int getDefaultPrefetch() { - return createProducerImpl(destination, mandatory, immediate); + return _defaultPrefetchHighMark; } - public BasicMessageProducer createProducer(Destination destination, boolean immediate) - throws JMSException + public int getDefaultPrefetchHigh() { - return createProducerImpl(destination, DEFAULT_MANDATORY, immediate); + return _defaultPrefetchHighMark; } - public BasicMessageProducer createProducer(Destination destination) throws JMSException + public int getDefaultPrefetchLow() { - return createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE); + return _defaultPrefetchLowMark; } - private BasicMessageProducer createProducerImpl(Destination destination, boolean mandatory, - boolean immediate) - throws JMSException + public AMQShortString getDefaultQueueExchangeName() { - return createProducerImpl(destination, mandatory, immediate, false); + return _connection.getDefaultQueueExchangeName(); } - private BasicMessageProducer createProducerImpl(final Destination destination, final boolean mandatory, - final boolean immediate, final boolean waitUntilSent) - throws JMSException + public AMQShortString getDefaultTopicExchangeName() { - return (BasicMessageProducer) new FailoverSupport() - { - public Object operation() throws JMSException - { - checkNotClosed(); - long producerId = getNextProducerId(); - BasicMessageProducer producer = new BasicMessageProducer(_connection, (AMQDestination) destination, _transacted, _channelId, - AMQSession.this, getProtocolHandler(), - producerId, immediate, mandatory, waitUntilSent); - registerProducer(producerId, producer); - return producer; - } - }.execute(_connection); + return _connection.getDefaultTopicExchangeName(); } - /** - * Creates a QueueReceiver - * - * @param destination - * - * @return QueueReceiver - a wrapper around our MessageConsumer - * - * @throws JMSException - */ - public QueueReceiver createQueueReceiver(Destination destination) throws JMSException + public MessageListener getMessageListener() throws JMSException { - checkValidDestination(destination); - AMQQueue dest = (AMQQueue) destination; - BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination); - return new QueueReceiverAdaptor(dest, consumer); + // checkNotClosed(); + return _messageListener; } - /** - * Creates a QueueReceiver using a message selector - * - * @param destination - * @param messageSelector - * - * @return QueueReceiver - a wrapper around our MessageConsumer - * - * @throws JMSException - */ - public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException + public AMQShortString getTemporaryQueueExchangeName() { - checkValidDestination(destination); - AMQQueue dest = (AMQQueue) destination; - BasicMessageConsumer consumer = (BasicMessageConsumer) - createConsumer(destination, messageSelector); - return new QueueReceiverAdaptor(dest, consumer); + return _connection.getTemporaryQueueExchangeName(); } - public MessageConsumer createConsumer(Destination destination) throws JMSException + public AMQShortString getTemporaryTopicExchangeName() { - checkValidDestination(destination); - return createConsumerImpl(destination, - _defaultPrefetchHighMark, - _defaultPrefetchLowMark, - false, - false, - null, - null, - false, - false); + return _connection.getTemporaryTopicExchangeName(); } - public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException + public int getTicket() { - checkValidDestination(destination); - return createConsumerImpl(destination, - _defaultPrefetchHighMark, - _defaultPrefetchLowMark, - false, - false, - messageSelector, - null, - false, - false); + return _ticket; } - public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) - throws JMSException - { - checkValidDestination(destination); - return createConsumerImpl(destination, - _defaultPrefetchHighMark, - _defaultPrefetchLowMark, - noLocal, - false, - messageSelector, - null, - false, - false); - } - - public MessageConsumer createBrowserConsumer(Destination destination, - String messageSelector, - boolean noLocal) - throws JMSException - { - checkValidDestination(destination); - return createConsumerImpl(destination, - _defaultPrefetchHighMark, - _defaultPrefetchLowMark, - noLocal, - false, - messageSelector, - null, - true, - true); - } - - public MessageConsumer createConsumer(Destination destination, - int prefetch, - boolean noLocal, - boolean exclusive, - String selector) throws JMSException + public boolean getTransacted() throws JMSException { - checkValidDestination(destination); - return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null, false, false); - } + checkNotClosed(); + return _transacted; + } - public MessageConsumer createConsumer(Destination destination, - int prefetchHigh, - int prefetchLow, - boolean noLocal, - boolean exclusive, - String selector) throws JMSException + public boolean hasConsumer(Destination destination) { - checkValidDestination(destination); - return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, false, false); + AtomicInteger counter = _destinationConsumerCount.get(destination); + + return (counter != null) && (counter.get() != 0); } - public MessageConsumer createConsumer(Destination destination, - int prefetch, - boolean noLocal, - boolean exclusive, - String selector, - FieldTable rawSelector) throws JMSException + public boolean isStrictAMQP() { - checkValidDestination(destination); - return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, - selector, rawSelector, false, false); + return _strictAMQP; } - public MessageConsumer createConsumer(Destination destination, - int prefetchHigh, - int prefetchLow, - boolean noLocal, - boolean exclusive, - String selector, - FieldTable rawSelector) throws JMSException + public boolean isSuspended() { - checkValidDestination(destination); - return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, - selector, rawSelector, false, false); + return _suspended; } - protected MessageConsumer createConsumerImpl(final Destination destination, - final int prefetchHigh, - final int prefetchLow, - final boolean noLocal, - final boolean exclusive, - String selector, - final FieldTable rawSelector, - final boolean noConsume, - final boolean autoClose) throws JMSException + /** + * Invoked by the MINA IO thread (indirectly) when a message is received from the transport. Puts the message onto + * the queue read by the dispatcher. + * + * @param message the message that has been received + */ + public void messageReceived(UnprocessedMessage message) { - checkTemporaryDestination(destination); - - final String messageSelector; - - if (_strictAMQP && !(selector == null || selector.equals(""))) - { - if (_strictAMQPFATAL) - { - throw new UnsupportedOperationException("Selectors not currently supported by AMQP."); - } - else - { - messageSelector = null; - } - } - else + if (_logger.isDebugEnabled()) { - messageSelector = selector; + _logger.debug("Message[" + + ((message.getDeliverBody() == null) ? ("B:" + message.getBounceBody()) : ("D:" + message.getDeliverBody())) + + "] received in session with channel id " + _channelId); } - return (org.apache.qpid.jms.MessageConsumer) new FailoverSupport() + if (message.getDeliverBody() == null) { - public Object operation() throws JMSException - { - checkNotClosed(); - - AMQDestination amqd = (AMQDestination) destination; - - final AMQProtocolHandler protocolHandler = getProtocolHandler(); - // TODO: Define selectors in AMQP - // TODO: construct the rawSelector from the selector string if rawSelector == null - final FieldTable ft = FieldTableFactory.newFieldTable(); - //if (rawSelector != null) - // ft.put("headers", rawSelector.getDataAsBytes()); - if (rawSelector != null) - { - ft.addAll(rawSelector); - } - - BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, _connection, amqd, messageSelector, noLocal, - _messageFactoryRegistry, AMQSession.this, - protocolHandler, ft, prefetchHigh, prefetchLow, exclusive, - _acknowledgeMode, noConsume, autoClose); - - if (_messageListener != null) - { - consumer.setMessageListener(_messageListener); - } - - try - { - registerConsumer(consumer, false); - } - catch (AMQInvalidArgumentException ise) - { - JMSException ex = new InvalidSelectorException(ise.getMessage()); - ex.setLinkedException(ise); - throw ex; - } - catch (AMQInvalidRoutingKeyException e) - { - JMSException ide = new InvalidDestinationException("Invalid routing key:" + amqd.getRoutingKey().toString()); - ide.setLinkedException(e); - throw ide; - } - catch (AMQException e) - { - JMSException ex = new JMSException("Error registering consumer: " + e); - - if (_logger.isDebugEnabled()) - { - e.printStackTrace(); - } - ex.setLinkedException(e); - throw ex; - } - - synchronized (destination) - { - _destinationConsumerCount.putIfAbsent(destination, new AtomicInteger()); - _destinationConsumerCount.get(destination).incrementAndGet(); - } - - return consumer; - } - }.execute(_connection); + // Return of the bounced message. + returnBouncedMessage(message); + } + else + { + _queue.add(message); + } } - private void checkTemporaryDestination(Destination destination) - throws JMSException + /** + * Stops message delivery in this session, and restarts message delivery with the oldest unacknowledged message. + * + *

All consumers deliver messages in a serial order. Acknowledging a received message automatically acknowledges all + * messages that have been delivered to the client. + * + *

Restarting a session causes it to take the following actions: + * + *

    + *
  • Stop message delivery.
  • + *
  • Mark all messages that might have been delivered but not acknowledged as "redelivered". + *
  • Restart the delivery sequence including all unacknowledged messages that had been previously delivered. + * Redelivered messages do not have to be delivered in exactly their original delivery order.
  • + *
+ * + *

If the recover operation is interrupted by a fail-over, between asking that the broker begin recovery and + * receiving acknolwedgement that it hasm then a JMSException will be thrown. In this case it will not be possible + * for the client to determine whether the broker is going to recover the session or not. + * + * @throws JMSException If the JMS provider fails to stop and restart message delivery due to some internal error. + * Not that this does not necessarily mean that the recovery has failed, but simply that it + * is not possible to tell if it has or not. + * + * @todo Be aware of possible changes to parameter order as versions change. + */ + public void recover() throws JMSException { - if ((destination instanceof TemporaryDestination)) + // Ensure that the session is open. + checkNotClosed(); + + // Ensure that the session is not transacted. + checkNotTransacted(); + + // this is set only here, and the before the consumer's onMessage is called it is set to false + _inRecovery = true; + try { - _logger.debug("destination is temporary"); - final TemporaryDestination tempDest = (TemporaryDestination) destination; - if (tempDest.getSession() != this) + + boolean isSuspended = isSuspended(); + + if (!isSuspended) { - _logger.debug("destination is on different session"); - throw new JMSException("Cannot consume from a temporary destination created onanother session"); + suspendChannel(true); } - if (tempDest.isDeleted()) + + for (BasicMessageConsumer consumer : _consumers.values()) { - _logger.debug("destination is deleted"); - throw new JMSException("Cannot consume from a deleted destination"); + consumer.clearUnackedMessages(); } - } - } + if (_dispatcher != null) + { + _dispatcher.rollback(); + } - public boolean hasConsumer(Destination destination) - { - AtomicInteger counter = _destinationConsumerCount.get(destination); + if (isStrictAMQP()) + { + // We can't use the BasicRecoverBody-OK method as it isn't part of the spec. + _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, + getProtocolMajorVersion(), getProtocolMinorVersion(), false)); // requeue + _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order."); + } + else + { - return (counter != null) && (counter.get() != 0); - } + _connection.getProtocolHandler().syncWrite( + BasicRecoverBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), false) // requeue + , BasicRecoverOkBody.class); + } - public void declareExchange(AMQShortString name, AMQShortString type, boolean nowait) throws AMQException - { - declareExchange(name, type, getProtocolHandler(), nowait); + if (!isSuspended) + { + suspendChannel(false); + } + } + catch (AMQException e) + { + throw new JMSAMQException("Recover failed: " + e.getMessage(), e); + } + catch (FailoverException e) + { + throw new JMSAMQException("Recovery was interrupted by fail-over. Recovery status is not known.", e); + } } - private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException + public void rejectMessage(UnprocessedMessage message, boolean requeue) { - declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait); - } - private void declareExchange(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException - { - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - null, // arguments - false, // autoDelete - false, // durable - name, // exchange - false, // internal - nowait, // nowait - false, // passive - getTicket(), // ticket - type); // type + if (_logger.isTraceEnabled()) + { + _logger.trace("Rejecting Unacked message:" + message.getDeliverBody().deliveryTag); + } - protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); + rejectMessage(message.getDeliverBody().deliveryTag, requeue); } - - public void createQueue(AMQShortString name, boolean autoDelete, boolean durable, boolean exclusive) throws AMQException + public void rejectMessage(AbstractJMSMessage message, boolean requeue) { - AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - null, // arguments - autoDelete, // autoDelete - durable, // durable - exclusive, // exclusive - false, // nowait - false, // passive - name, // queue - getTicket()); // ticket + if (_logger.isTraceEnabled()) + { + _logger.trace("Rejecting Abstract message:" + message.getDeliveryTag()); + } - getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class); + rejectMessage(message.getDeliveryTag(), requeue); } - - public void bindQueue(AMQShortString queueName, AMQShortString routingKey, FieldTable arguments, AMQShortString exchangeName) throws AMQException + public void rejectMessage(long deliveryTag, boolean requeue) { - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - arguments, // arguments - exchangeName, // exchange - false, // nowait - queueName, // queue - routingKey, // routingKey - getTicket()); // ticket + if ((_acknowledgeMode == CLIENT_ACKNOWLEDGE) || (_acknowledgeMode == SESSION_TRANSACTED)) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Rejecting delivery tag:" + deliveryTag); + } + AMQFrame basicRejectBody = + BasicRejectBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag, + requeue); - getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class); + _connection.getProtocolHandler().writeFrame(basicRejectBody); + } } /** - * Declare the queue. + * Commits all messages done in this transaction and releases any locks currently held. * - * @param amqd - * @param protocolHandler + *

If the rollback fails, because the rollback itself is interrupted by a fail-over between requesting that the + * rollback be done, and receiving an acknowledgement that it has been done, then a JMSException will be thrown. + * The client will be unable to determine whether or not the rollback actually happened on the broker in this case. * - * @return the queue name. This is useful where the broker is generating a queue name on behalf of the client. + * @throws JMSException If the JMS provider fails to rollback the transaction due to some internal error. This does + * not mean that the rollback is known to have failed, merely that it is not known whether it + * failed or not. * - * @throws AMQException + * @todo Be aware of possible changes to parameter order as versions change. */ - private AMQShortString declareQueue(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException + public void rollback() throws JMSException { - // For queues (but not topics) we generate the name in the client rather than the - // server. This allows the name to be reused on failover if required. In general, - // the destination indicates whether it wants a name generated or not. - if (amqd.isNameRequired()) + synchronized (_suspensionLock) { - amqd.setQueueName(protocolHandler.generateQueueName()); - } + checkTransacted(); - //TODO verify the destiation is valid. else throw + try + { + boolean isSuspended = isSuspended(); - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - null, // arguments - amqd.isAutoDelete(), // autoDelete - amqd.isDurable(), // durable - amqd.isExclusive(), // exclusive - false, // nowait - false, // passive - amqd.getAMQQueueName(), // queue - getTicket()); // ticket + if (!isSuspended) + { + suspendChannel(true); + } - protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class); - return amqd.getAMQQueueName(); - } + if (_dispatcher != null) + { + _dispatcher.rollback(); + } - private void bindQueue(AMQDestination amqd, AMQShortString queueName, AMQProtocolHandler protocolHandler, FieldTable ft) throws AMQException - { - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - ft, // arguments - amqd.getExchangeName(), // exchange - false, // nowait - queueName, // queue - amqd.getRoutingKey(), // routingKey - getTicket()); // ticket + _connection.getProtocolHandler().syncWrite(TxRollbackBody.createAMQFrame(_channelId, + getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class); + if (!isSuspended) + { + suspendChannel(false); + } + } + catch (AMQException e) + { + throw new JMSAMQException("Failed to rollback: " + e, e); + } + catch (FailoverException e) + { + throw new JMSAMQException("Fail-over interrupted rollback. Status of the rollback is uncertain.", e); + } + } + } - protocolHandler.syncWrite(queueBind, QueueBindOkBody.class); + public void run() + { + throw new java.lang.UnsupportedOperationException(); } - /** - * Register to consume from the queue. - * - * @param queueName - * - * @return the consumer tag generated by the broker - */ - private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, - boolean nowait, String messageSelector) throws AMQException + public void setMessageListener(MessageListener listener) throws JMSException { - //need to generate a consumer tag on the client so we can exploit the nowait flag - AMQShortString tag = new AMQShortString(Integer.toString(_nextTag++)); + // checkNotClosed(); + // + // if (_dispatcher != null && !_dispatcher.connectionStopped()) + // { + // throw new javax.jms.IllegalStateException("Attempt to set listener while session is started."); + // } + // + // // We are stopped + // for (Iterator i = _consumers.values().iterator(); i.hasNext();) + // { + // BasicMessageConsumer consumer = i.next(); + // + // if (consumer.isReceiving()) + // { + // throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously."); + // } + // } + // + // _messageListener = listener; + // + // for (Iterator i = _consumers.values().iterator(); i.hasNext();) + // { + // i.next().setMessageListener(_messageListener); + // } + + } + + /*public void setTicket(int ticket) + { + _ticket = ticket; + }*/ - FieldTable arguments = FieldTableFactory.newFieldTable(); - if (messageSelector != null && !messageSelector.equals("")) - { - arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector); - } - if (consumer.isAutoClose()) + public void unsubscribe(String name) throws JMSException + { + checkNotClosed(); + TopicSubscriberAdaptor subscriber = _subscriptions.get(name); + if (subscriber != null) { - arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE); + // send a queue.delete for the subscription + deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); + _subscriptions.remove(name); + _reverseSubscriptionMap.remove(subscriber); } - if (consumer.isNoConsume()) + else { - arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE); + if (_strictAMQP) + { + if (_strictAMQPFATAL) + { + throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP."); + } + else + { + _logger.warn("Unable to determine if subscription already exists for '" + name + "' for unsubscribe." + + " Requesting queue deletion regardless."); + } + + deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); + } + else + { + + if (isQueueBound(getDefaultTopicExchangeName(), AMQTopic.getDurableTopicQueueName(name, _connection))) + { + deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); + } + else + { + throw new InvalidDestinationException("Unknown subscription exchange:" + name); + } + } } + } - consumer.setConsumerTag(tag); - // we must register the consumer in the map before we actually start listening - _consumers.put(tag, consumer); + protected MessageConsumer createConsumerImpl(final Destination destination, final int prefetchHigh, + final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector, + final boolean noConsume, final boolean autoClose) throws JMSException + { + checkTemporaryDestination(destination); - try + final String messageSelector; + + if (_strictAMQP && !((selector == null) || selector.equals(""))) { - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - arguments, // arguments - tag, // consumerTag - consumer.isExclusive(), // exclusive - consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck - consumer.isNoLocal(), // noLocal - nowait, // nowait - queueName, // queue - getTicket()); // ticket - if (nowait) + if (_strictAMQPFATAL) { - protocolHandler.writeFrame(jmsConsume); + throw new UnsupportedOperationException("Selectors not currently supported by AMQP."); } else { - protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class); + messageSelector = null; } } - catch (AMQException e) + else { - // clean-up the map in the event of an error - _consumers.remove(tag); - throw e; + messageSelector = selector; } + + return new FailoverRetrySupport( + new FailoverProtectedOperation() + { + public MessageConsumer execute() throws JMSException, FailoverException + { + checkNotClosed(); + + AMQDestination amqd = (AMQDestination) destination; + + final AMQProtocolHandler protocolHandler = getProtocolHandler(); + // TODO: Define selectors in AMQP + // TODO: construct the rawSelector from the selector string if rawSelector == null + final FieldTable ft = FieldTableFactory.newFieldTable(); + // if (rawSelector != null) + // ft.put("headers", rawSelector.getDataAsBytes()); + if (rawSelector != null) + { + ft.addAll(rawSelector); + } + + BasicMessageConsumer consumer = + new BasicMessageConsumer(_channelId, _connection, amqd, messageSelector, noLocal, + _messageFactoryRegistry, AMQSession.this, protocolHandler, ft, prefetchHigh, prefetchLow, + exclusive, _acknowledgeMode, noConsume, autoClose); + + if (_messageListener != null) + { + consumer.setMessageListener(_messageListener); + } + + try + { + registerConsumer(consumer, false); + } + catch (AMQInvalidArgumentException ise) + { + JMSException ex = new InvalidSelectorException(ise.getMessage()); + ex.setLinkedException(ise); + throw ex; + } + catch (AMQInvalidRoutingKeyException e) + { + JMSException ide = + new InvalidDestinationException("Invalid routing key:" + amqd.getRoutingKey().toString()); + ide.setLinkedException(e); + throw ide; + } + catch (AMQException e) + { + JMSException ex = new JMSException("Error registering consumer: " + e); + + if (_logger.isDebugEnabled()) + { + e.printStackTrace(); + } + + ex.setLinkedException(e); + throw ex; + } + + synchronized (destination) + { + _destinationConsumerCount.putIfAbsent(destination, new AtomicInteger()); + _destinationConsumerCount.get(destination).incrementAndGet(); + } + + return consumer; + } + }, _connection).execute(); } - public Queue createQueue(String queueName) throws JMSException + /** + * Called by the MessageConsumer when closing, to deregister the consumer from the map from consumerTag to consumer + * instance. + * + * @param consumer the consum + */ + void deregisterConsumer(BasicMessageConsumer consumer) { - checkNotClosed(); - if (queueName.indexOf('/') == -1) - { - return new AMQQueue(getDefaultQueueExchangeName(), new AMQShortString(queueName)); - } - else + if (_consumers.remove(consumer.getConsumerTag()) != null) { - try - { - return new AMQQueue(new AMQBindingURL(queueName)); - } - catch (URLSyntaxException urlse) + String subscriptionName = _reverseSubscriptionMap.remove(consumer); + if (subscriptionName != null) { - JMSException jmse = new JMSException(urlse.getReason()); - jmse.setLinkedException(urlse); + _subscriptions.remove(subscriptionName); + } - throw jmse; + Destination dest = consumer.getDestination(); + synchronized (dest) + { + if (_destinationConsumerCount.get(dest).decrementAndGet() == 0) + { + _destinationConsumerCount.remove(dest); + } } } } - public AMQShortString getDefaultQueueExchangeName() + void deregisterProducer(long producerId) { - return _connection.getDefaultQueueExchangeName(); + _producers.remove(new Long(producerId)); } - /** - * Creates a QueueReceiver wrapping a MessageConsumer - * - * @param queue - * - * @return QueueReceiver - * - * @throws JMSException - */ - public QueueReceiver createReceiver(Queue queue) throws JMSException + boolean isInRecovery() { - checkNotClosed(); - AMQQueue dest = (AMQQueue) queue; - BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest); - return new QueueReceiverAdaptor(dest, consumer); + return _inRecovery; + } + + boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName) throws JMSException + { + return isQueueBound(exchangeName, queueName, null); } /** - * Creates a QueueReceiver wrapping a MessageConsumer using a message selector + * Tests whether or not the specified queue is bound to the specified exchange under a particular routing key. * - * @param queue - * @param messageSelector + *

Note that this operation automatically retries in the event of fail-over. * - * @return QueueReceiver + * @param exchangeName The exchange name to test for binding against. + * @param queueName The queue name to check if bound. + * @param routingKey The routing key to check if the queue is bound under. * - * @throws JMSException + * @return true if the queue is bound to the exchange and routing key, false if not. + * + * @throws JMSException If the query fails for any reason. + * + * @todo Be aware of possible changes to parameter order as versions change. */ - public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException + boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey) + throws JMSException { - checkNotClosed(); - AMQQueue dest = (AMQQueue) queue; - BasicMessageConsumer consumer = (BasicMessageConsumer) - createConsumer(dest, messageSelector); - return new QueueReceiverAdaptor(dest, consumer); - } + try + { + AMQMethodEvent response = + new FailoverRetrySupport( + new FailoverProtectedOperation() + { + public AMQMethodEvent execute() throws AMQException, FailoverException + { + AMQFrame boundFrame = + ExchangeBoundBody.createAMQFrame(_channelId, getProtocolMajorVersion(), + getProtocolMinorVersion(), exchangeName, // exchange + queueName, // queue + routingKey); // routingKey - public QueueSender createSender(Queue queue) throws JMSException - { - checkNotClosed(); - //return (QueueSender) createProducer(queue); - return new QueueSenderAdapter(createProducer(queue), queue); - } + return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class); - public Topic createTopic(String topicName) throws JMSException - { - checkNotClosed(); + } + }, _connection).execute(); - if (topicName.indexOf('/') == -1) - { - return new AMQTopic(getDefaultTopicExchangeName(), new AMQShortString(topicName)); + // Extract and return the response code from the query. + ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod(); + + return (responseBody.replyCode == 0); } - else + catch (AMQException e) { - try - { - return new AMQTopic(new AMQBindingURL(topicName)); - } - catch (URLSyntaxException urlse) - { - JMSException jmse = new JMSException(urlse.getReason()); - jmse.setLinkedException(urlse); - - throw jmse; - } + throw new JMSAMQException("Queue bound query failed: " + e.getMessage(), e); } } - public AMQShortString getDefaultTopicExchangeName() - { - return _connection.getDefaultTopicExchangeName(); - } - /** - * Creates a non-durable subscriber - * - * @param topic - * - * @return TopicSubscriber - a wrapper round our MessageConsumer - * - * @throws JMSException + * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after failover + * when the client has veoted resubscription.

The caller of this method must already hold the failover mutex. */ - public TopicSubscriber createSubscriber(Topic topic) throws JMSException + void markClosed() { - checkNotClosed(); - AMQTopic dest = checkValidTopic(topic); - //AMQTopic dest = new AMQTopic(topic.getTopicName()); - return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); + _closed.set(true); + _connection.deregisterSession(_channelId); + markClosedProducersAndConsumers(); + } /** - * Creates a non-durable subscriber with a message selector - * - * @param topic - * @param messageSelector - * @param noLocal - * - * @return TopicSubscriber - a wrapper round our MessageConsumer + * Resubscribes all producers and consumers. This is called when performing failover. * - * @throws JMSException + * @throws AMQException */ - public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException + void resubscribe() throws AMQException { - checkNotClosed(); - AMQTopic dest = checkValidTopic(topic); - //AMQTopic dest = new AMQTopic(topic.getTopicName()); - return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal)); + resubscribeProducers(); + resubscribeConsumers(); } - public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException + void setHasMessageListeners() { + _hasMessageListeners = true; + } + void setInRecovery(boolean inRecovery) + { + _inRecovery = inRecovery; + } - checkNotClosed(); - AMQTopic origTopic = checkValidTopic(topic); - AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection); - TopicSubscriberAdaptor subscriber = _subscriptions.get(name); - if (subscriber != null) + /** + * Starts the session, which ensures that it is not suspended and that its event dispatcher is running. + * + * @throws AMQException If the session cannot be started for any reason. + * + * @todo This should be controlled by _stopped as it pairs with the stop method fixme or check the + * FlowControlledBlockingQueue _queue to see if we have flow controlled. will result in sending Flow messages + * for each subsequent call to flow.. only need to do this if we have called stop. + */ + void start() throws AMQException + { + // Check if the session has perviously been started and suspended, in which case it must be unsuspended. + if (_startedAtLeastOnce.getAndSet(true)) { - if (subscriber.getTopic().equals(topic)) - { - throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange " + - name); - } - else - { - unsubscribe(name); - } + suspendChannel(false); } - else + + // If the event dispatcher is not running then start it too. + if (hasMessageListeners()) { - AMQShortString topicName; - if (topic instanceof AMQTopic) - { - topicName = ((AMQTopic) topic).getDestinationName(); - } - else - { - topicName = new AMQShortString(topic.getTopicName()); - } + startDistpatcherIfNecessary(); + } + } - if (_strictAMQP) + synchronized void startDistpatcherIfNecessary() + { + // If IMMEDIATE_PREFETCH is not set then we need to start fetching + if (!_immediatePrefetch) + { + // We do this now if this is the first call on a started connection + if (isSuspended() && _startedAtLeastOnce.get() && _firstDispatcher.getAndSet(false)) { - if (_strictAMQPFATAL) - { - throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP."); - } - else + try { - _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' " - + "for creation durableSubscriber. Requesting queue deletion regardless."); + suspendChannel(false); } - - deleteQueue(dest.getAMQQueueName()); - } - else - { - // if the queue is bound to the exchange but NOT for this topic, then the JMS spec - // says we must trash the subscription. - if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) && - !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName)) + catch (AMQException e) { - deleteQueue(dest.getAMQQueueName()); + _logger.info("Unsuspending channel threw an exception:" + e); } } } - subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); - - _subscriptions.put(name, subscriber); - _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); - - return subscriber; + startDistpatcherIfNecessary(false); } - void deleteQueue(AMQShortString queueName) throws JMSException + synchronized void startDistpatcherIfNecessary(boolean initiallyStopped) { - try + if (_dispatcher == null) { - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - false, // ifEmpty - false, // ifUnused - true, // nowait - queueName, // queue - getTicket()); // ticket - getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class); + _dispatcher = new Dispatcher(); + _dispatcher.setDaemon(true); + _dispatcher.setConnectionStopped(initiallyStopped); + _dispatcher.start(); } - catch (AMQException e) + else { - throw new JMSAMQException(e); + _dispatcher.setConnectionStopped(initiallyStopped); } } - /** Note, currently this does not handle reuse of the same name with different topics correctly. */ - public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) - throws JMSException + void stop() throws AMQException { - checkNotClosed(); - checkValidTopic(topic); - AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection); - BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal); - TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer); - _subscriptions.put(name, subscriber); - _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); - return subscriber; + // Stop the server delivering messages to this session. + suspendChannel(true); + + if (_dispatcher != null) + { + _dispatcher.setConnectionStopped(true); + } } - public TopicPublisher createPublisher(Topic topic) throws JMSException + /* + * Binds the named queue, with the specified routing key, to the named exchange. + * + *

Note that this operation automatically retries in the event of fail-over. + * + * @param queueName The name of the queue to bind. + * @param routingKey The routing key to bind the queue with. + * @param arguments Additional arguments. + * @param exchangeName The exchange to bind the queue on. + * + * @throws AMQException If the queue cannot be bound for any reason. + */ + /*private void bindQueue(AMQDestination amqd, AMQShortString queueName, AMQProtocolHandler protocolHandler, FieldTable ft) + throws AMQException, FailoverException { - checkNotClosed(); - return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic), topic); + AMQFrame queueBind = + QueueBindBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), ft, // arguments + amqd.getExchangeName(), // exchange + false, // nowait + queueName, // queue + amqd.getRoutingKey(), // routingKey + getTicket()); // ticket + + protocolHandler.syncWrite(queueBind, QueueBindOkBody.class); + }*/ + + private void checkNotTransacted() throws JMSException + { + if (getTransacted()) + { + throw new IllegalStateException("Session is transacted"); + } + } + + private void checkTemporaryDestination(Destination destination) throws JMSException + { + if ((destination instanceof TemporaryDestination)) + { + _logger.debug("destination is temporary"); + final TemporaryDestination tempDest = (TemporaryDestination) destination; + if (tempDest.getSession() != this) + { + _logger.debug("destination is on different session"); + throw new JMSException("Cannot consume from a temporary destination created onanother session"); + } + + if (tempDest.isDeleted()) + { + _logger.debug("destination is deleted"); + throw new JMSException("Cannot consume from a deleted destination"); + } + } } - public QueueBrowser createBrowser(Queue queue) throws JMSException + private void checkTransacted() throws JMSException { - if (isStrictAMQP()) + if (!getTransacted()) { - throw new UnsupportedOperationException(); + throw new IllegalStateException("Session is not transacted"); } - - return createBrowser(queue, null); } - public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException + private void checkValidDestination(Destination destination) throws InvalidDestinationException { - if (isStrictAMQP()) + if (destination == null) { - throw new UnsupportedOperationException(); + throw new javax.jms.InvalidDestinationException("Invalid Queue"); } - - checkNotClosed(); - checkValidQueue(queue); - return new AMQQueueBrowser(this, (AMQQueue) queue, messageSelector); } - public TemporaryQueue createTemporaryQueue() throws JMSException + private void checkValidQueue(Queue queue) throws InvalidDestinationException { - checkNotClosed(); - return new AMQTemporaryQueue(this); + if (queue == null) + { + throw new javax.jms.InvalidDestinationException("Invalid Queue"); + } } - public TemporaryTopic createTemporaryTopic() throws JMSException + /* + * I could have combined the last 3 methods, but this way it improves readability + */ + private AMQTopic checkValidTopic(Topic topic) throws JMSException { - checkNotClosed(); - return new AMQTemporaryTopic(this); + if (topic == null) + { + throw new javax.jms.InvalidDestinationException("Invalid Topic"); + } + + if ((topic instanceof TemporaryDestination) && (((TemporaryDestination) topic).getSession() != this)) + { + throw new javax.jms.InvalidDestinationException( + "Cannot create a subscription on a temporary topic created in another session"); + } + + if (!(topic instanceof AMQTopic)) + { + throw new javax.jms.InvalidDestinationException( + "Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: " + + topic.getClass().getName()); + } + + return (AMQTopic) topic; } - public void unsubscribe(String name) throws JMSException + /** + * Called to close message consumers cleanly. This may or may not be as a result of an error. + * + * @param error not null if this is a result of an error occurring at the connection level + */ + private void closeConsumers(Throwable error) throws JMSException { - checkNotClosed(); - TopicSubscriberAdaptor subscriber = _subscriptions.get(name); - if (subscriber != null) + if (_dispatcher != null) { - // send a queue.delete for the subscription - deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); - _subscriptions.remove(name); - _reverseSubscriptionMap.remove(subscriber); + _dispatcher.close(); + _dispatcher = null; } - else + // we need to clone the list of consumers since the close() method updates the _consumers collection + // which would result in a concurrent modification exception + final ArrayList clonedConsumers = new ArrayList(_consumers.values()); + + final Iterator it = clonedConsumers.iterator(); + while (it.hasNext()) { - if (_strictAMQP) + final BasicMessageConsumer con = it.next(); + if (error != null) { - if (_strictAMQPFATAL) - { - throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP."); - } - else - { - _logger.warn("Unable to determine if subscription already exists for '" + name + "' for unsubscribe." - + " Requesting queue deletion regardless."); - } - - deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); + con.notifyError(error); } else { - - if (isQueueBound(getDefaultTopicExchangeName(), AMQTopic.getDurableTopicQueueName(name, _connection))) - { - deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); - } - else - { - throw new InvalidDestinationException("Unknown subscription exchange:" + name); - } + con.close(); } } + // at this point the _consumers map will be empty } - boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName) throws JMSException + /** + * Called to close message producers cleanly. This may or may not be as a result of an error. There is + * currently no way of propagating errors to message producers (this is a JMS limitation). + */ + private void closeProducers() throws JMSException { - return isQueueBound(exchangeName, queueName, null); + // we need to clone the list of producers since the close() method updates the _producers collection + // which would result in a concurrent modification exception + final ArrayList clonedProducers = new ArrayList(_producers.values()); + + final Iterator it = clonedProducers.iterator(); + while (it.hasNext()) + { + final BasicMessageProducer prod = (BasicMessageProducer) it.next(); + prod.close(); + } + // at this point the _producers map is empty } - boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey) throws JMSException + /** + * Close all producers or consumers. This is called either in the error case or when closing the session normally. + * + * @param amqe the exception, may be null to indicate no error has occurred + */ + private void closeProducersAndConsumers(AMQException amqe) throws JMSException { - - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - exchangeName, // exchange - queueName, // queue - routingKey); // routingKey - AMQMethodEvent response = null; + JMSException jmse = null; try { - response = getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class); + closeProducers(); } - catch (AMQException e) + catch (JMSException e) { - throw new JMSAMQException(e); + _logger.error("Error closing session: " + e, e); + jmse = e; } - ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod(); - return (responseBody.replyCode == 0); //ExchangeBoundHandler.OK); Remove Broker compile dependency - } - private void checkTransacted() throws JMSException - { - if (!getTransacted()) + try { - throw new IllegalStateException("Session is not transacted"); + closeConsumers(amqe); + } + catch (JMSException e) + { + _logger.error("Error closing session: " + e, e); + if (jmse == null) + { + jmse = e; + } } - } - private void checkNotTransacted() throws JMSException - { - if (getTransacted()) + if (jmse != null) { - throw new IllegalStateException("Session is transacted"); + throw jmse; } } /** - * Invoked by the MINA IO thread (indirectly) when a message is received from the transport. Puts the message onto - * the queue read by the dispatcher. + * Register to consume from the queue. * - * @param message the message that has been received + * @param queueName */ - public void messageReceived(UnprocessedMessage message) + private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName, + AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException { - if (_logger.isDebugEnabled()) + // need to generate a consumer tag on the client so we can exploit the nowait flag + AMQShortString tag = new AMQShortString(Integer.toString(_nextTag++)); + + FieldTable arguments = FieldTableFactory.newFieldTable(); + if ((messageSelector != null) && !messageSelector.equals("")) + { + arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector); + } + + if (consumer.isAutoClose()) { - _logger.debug("Message[" + (message.getDeliverBody() == null ? - "B:" + message.getBounceBody() : "D:" + message.getDeliverBody()) - + "] received in session with channel id " + _channelId); + arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE); } - if (message.getDeliverBody() == null) + if (consumer.isNoConsume()) { - // Return of the bounced message. - returnBouncedMessage(message); + arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE); } - else + + consumer.setConsumerTag(tag); + // we must register the consumer in the map before we actually start listening + _consumers.put(tag, consumer); + + try { - _queue.add(message); + // TODO: Be aware of possible changes to parameter order as versions change. + AMQFrame jmsConsume = + BasicConsumeBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), arguments, // arguments + tag, // consumerTag + consumer.isExclusive(), // exclusive + consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck + consumer.isNoLocal(), // noLocal + nowait, // nowait + queueName, // queue + getTicket()); // ticket + + if (nowait) + { + protocolHandler.writeFrame(jmsConsume); + } + else + { + protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class); + } + } + catch (AMQException e) + { + // clean-up the map in the event of an error + _consumers.remove(tag); + throw e; } } - private void returnBouncedMessage(final UnprocessedMessage message) + private BasicMessageProducer createProducerImpl(Destination destination, boolean mandatory, boolean immediate) + throws JMSException + { + return createProducerImpl(destination, mandatory, immediate, false); + } + + private BasicMessageProducer createProducerImpl(final Destination destination, final boolean mandatory, + final boolean immediate, final boolean waitUntilSent) throws JMSException { - _connection.performConnectionTask( - new Runnable() + return new FailoverRetrySupport( + new FailoverProtectedOperation() { - public void run() + public BasicMessageProducer execute() throws JMSException, FailoverException { - try - { - // Bounced message is processed here, away from the mina thread - AbstractJMSMessage bouncedMessage = _messageFactoryRegistry.createMessage(0, - false, - message.getBounceBody().exchange, - message.getBounceBody().routingKey, - message.getContentHeader(), - message.getBodies()); - - AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode); - AMQShortString reason = message.getBounceBody().replyText; - _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); - - //@TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. - if (errorCode == AMQConstant.NO_CONSUMERS) - { - _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage)); - } - else if (errorCode == AMQConstant.NO_ROUTE) - { - _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage)); - } - else - { - _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage)); - } + checkNotClosed(); + long producerId = getNextProducerId(); + BasicMessageProducer producer = + new BasicMessageProducer(_connection, (AMQDestination) destination, _transacted, _channelId, + AMQSession.this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent); + registerProducer(producerId, producer); + + return producer; + } + }, _connection).execute(); + } - } - catch (Exception e) + private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException + { + declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait); + } + + /** + * Declares the named exchange and type of exchange. + * + *

Note that this operation automatically retries in the event of fail-over. + * + * @param name The name of the exchange to declare. + * @param type The type of the exchange to declare. + * @param protocolHandler The protocol handler to process the communication through. + * @param nowait + * + * @throws AMQException If the exchange cannot be declared for any reason. + * + * @todo Be aware of possible changes to parameter order as versions change. + */ + private void declareExchange(final AMQShortString name, final AMQShortString type, + final AMQProtocolHandler protocolHandler, final boolean nowait) throws AMQException + { + new FailoverNoopSupport(new FailoverProtectedOperation() + { + public Object execute() throws AMQException, FailoverException + { + AMQFrame exchangeDeclare = + ExchangeDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), + null, // arguments + false, // autoDelete + false, // durable + name, // exchange + false, // internal + nowait, // nowait + false, // passive + getTicket(), // ticket + type); // type + + protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); + + return null; + } + }, _connection).execute(); + } + + /** + * Declares a queue for a JMS destination. + * + *

Note that for queues but not topics the name is generated in the client rather than the server. This allows + * the name to be reused on failover if required. In general, the destination indicates whether it wants a name + * generated or not. + * + *

Note that this operation automatically retries in the event of fail-over. + * + * @param amqd The destination to declare as a queue. + * @param protocolHandler The protocol handler to communicate through. + * + * @return The name of the decalred queue. This is useful where the broker is generating a queue name on behalf of + * the client. + * + * @throws AMQException If the queue cannot be declared for any reason. + * + * @todo Verify the destiation is valid or throw an exception. + * + * @todo Be aware of possible changes to parameter order as versions change. + */ + private AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) + throws AMQException + { + /*return new FailoverRetrySupport(*/ + return new FailoverNoopSupport( + new FailoverProtectedOperation() + { + public AMQShortString execute() throws AMQException, FailoverException + { + // Generate the queue name if the destination indicates that a client generated name is to be used. + if (amqd.isNameRequired()) { - _logger.error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e); + amqd.setQueueName(protocolHandler.generateQueueName()); } + + AMQFrame queueDeclare = + QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), + null, // arguments + amqd.isAutoDelete(), // autoDelete + amqd.isDurable(), // durable + amqd.isExclusive(), // exclusive + false, // nowait + false, // passive + amqd.getAMQQueueName(), // queue + getTicket()); // ticket + + protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class); + + return amqd.getAMQQueueName(); } - }); + }, _connection).execute(); } /** - * Acknowledge a message or several messages. This method can be called via AbstractJMSMessage or from a - * BasicConsumer. The former where the mode is CLIENT_ACK and the latter where the mode is AUTO_ACK or similar. + * Undeclares the specified queue. + * + *

Note that this operation automatically retries in the event of fail-over. * - * @param deliveryTag the tag of the last message to be acknowledged - * @param multiple if true will acknowledge all messages up to and including the one specified by the delivery - * tag + * @param queueName The name of the queue to delete. + * + * @throws JMSException If the queue could not be deleted for any reason. + * + * @todo Be aware of possible changes to parameter order as versions change. */ - public void acknowledgeMessage(long deliveryTag, boolean multiple) + private void deleteQueue(final AMQShortString queueName) throws JMSException { - // TODO: Be aware of possible changes to parameter order as versions change. - final AMQFrame ackFrame = BasicAckBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - deliveryTag, // deliveryTag - multiple); // multiple - if (_logger.isDebugEnabled()) + try + { + new FailoverRetrySupport(new FailoverProtectedOperation() + { + public Object execute() throws AMQException, FailoverException + { + AMQFrame queueDeleteFrame = + QueueDeleteBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), + false, // ifEmpty + false, // ifUnused + true, // nowait + queueName, // queue + getTicket()); // ticket + + getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class); + + return null; + } + }, _connection).execute(); + } + catch (AMQException e) { - _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId); + throw new JMSAMQException("The queue deletion failed: " + e.getMessage(), e); } - getProtocolHandler().writeFrame(ackFrame); - } - - public int getDefaultPrefetch() - { - return _defaultPrefetchHighMark; } - public int getDefaultPrefetchHigh() + private long getNextProducerId() { - return _defaultPrefetchHighMark; + return ++_nextProducerId; } - public int getDefaultPrefetchLow() + private AMQProtocolHandler getProtocolHandler() { - return _defaultPrefetchLowMark; + return _connection.getProtocolHandler(); } - public int getChannelId() + private byte getProtocolMajorVersion() { - return _channelId; + return getProtocolHandler().getProtocolMajorVersion(); } - void start() throws AMQException + private byte getProtocolMinorVersion() { - //fixme This should be controlled by _stopped as it pairs with the stop method - //fixme or check the FlowControlledBlockingQueue _queue to see if we have flow controlled. - //will result in sending Flow messages for each subsequent call to flow.. only need to do this - // if we have called stop. - if (_startedAtLeastOnce.getAndSet(true)) - { - //then we stopped this and are restarting, so signal server to resume delivery - suspendChannel(false); - } - - if (hasMessageListeners()) - { - startDistpatcherIfNecessary(); - } + return getProtocolHandler().getProtocolMinorVersion(); } private boolean hasMessageListeners() @@ -2013,56 +2246,45 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi return _hasMessageListeners; } - void setHasMessageListeners() - { - _hasMessageListeners = true; - } - - synchronized void startDistpatcherIfNecessary() + private void markClosedConsumers() throws JMSException { - // If IMMEDIATE_PREFETCH is not set then we need to start fetching - if (!_immediatePrefetch) + if (_dispatcher != null) { - // We do this now if this is the first call on a started connection - if (isSuspended() && _startedAtLeastOnce.get() && _firstDispatcher.getAndSet(false)) - { - try - { - suspendChannel(false); - } - catch (AMQException e) - { - _logger.info("Suspending channel threw an exception:" + e); - } - } + _dispatcher.close(); + _dispatcher = null; } + // we need to clone the list of consumers since the close() method updates the _consumers collection + // which would result in a concurrent modification exception + final ArrayList clonedConsumers = new ArrayList(_consumers.values()); - startDistpatcherIfNecessary(false); + final Iterator it = clonedConsumers.iterator(); + while (it.hasNext()) + { + final BasicMessageConsumer con = it.next(); + con.markClosed(); + } + // at this point the _consumers map will be empty } - synchronized void startDistpatcherIfNecessary(boolean initiallyStopped) + private void markClosedProducersAndConsumers() { - if (_dispatcher == null) + try { - _dispatcher = new Dispatcher(); - _dispatcher.setDaemon(true); - _dispatcher.setConnectionStopped(initiallyStopped); - _dispatcher.start(); + // no need for a markClosed* method in this case since there is no protocol traffic closing a producer + closeProducers(); } - else + catch (JMSException e) { - _dispatcher.setConnectionStopped(initiallyStopped); + _logger.error("Error closing session: " + e, e); } - } - - void stop() throws AMQException - { - //stop the server delivering messages to this session - suspendChannel(true); - if (_dispatcher != null) + try { - _dispatcher.setConnectionStopped(true); + markClosedConsumers(); + } + catch (JMSException e) + { + _logger.error("Error closing session: " + e, e); } } @@ -2073,7 +2295,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * * @throws AMQException */ - void registerConsumer(BasicMessageConsumer consumer, boolean nowait) throws AMQException + private void registerConsumer(BasicMessageConsumer consumer, boolean nowait) throws AMQException // , FailoverException { AMQDestination amqd = consumer.getDestination(); @@ -2083,7 +2305,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi AMQShortString queueName = declareQueue(amqd, protocolHandler); - bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable()); + // bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable()); + bindQueue(queueName, amqd.getRoutingKey(), consumer.getRawSelectorFieldTable(), amqd.getExchangeName()); // If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch if (!_immediatePrefetch) @@ -2098,7 +2321,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi try { suspendChannel(true); - _logger.info("Prefetching delayed existing messages will not flow until requested via receive*() or setML()."); + _logger.info( + "Prefetching delayed existing messages will not flow until requested via receive*() or setML()."); } catch (AMQException e) { @@ -2116,36 +2340,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelector()); } - catch (JMSException e) //thrown by getMessageSelector + catch (JMSException e) // thrown by getMessageSelector { throw new AMQException(e.getMessage(), e); } - } - - /** - * Called by the MessageConsumer when closing, to deregister the consumer from the map from consumerTag to consumer - * instance. - * - * @param consumer the consum - */ - void deregisterConsumer(BasicMessageConsumer consumer) - { - if (_consumers.remove(consumer.getConsumerTag()) != null) + catch (FailoverException e) { - String subscriptionName = _reverseSubscriptionMap.remove(consumer); - if (subscriptionName != null) - { - _subscriptions.remove(subscriptionName); - } - - Destination dest = consumer.getDestination(); - synchronized (dest) - { - if (_destinationConsumerCount.get(dest).decrementAndGet() == 0) - { - _destinationConsumerCount.remove(dest); - } - } + throw new AMQException("Fail-over exception interrupted basic consume.", e); } } @@ -2154,35 +2355,54 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _producers.put(new Long(producerId), producer); } - void deregisterProducer(long producerId) - { - _producers.remove(new Long(producerId)); - } - - private long getNextProducerId() + private void rejectAllMessages(boolean requeue) { - return ++_nextProducerId; + rejectMessagesForConsumerTag(null, requeue); } /** - * Resubscribes all producers and consumers. This is called when performing failover. - * - * @throws AMQException + * @param consumerTag The consumerTag to prune from queue or all if null + * @param requeue Should the removed messages be requeued (or discarded. Possibly to DLQ) */ - void resubscribe() throws AMQException - { - resubscribeProducers(); - resubscribeConsumers(); - } - private void resubscribeProducers() throws AMQException + private void rejectMessagesForConsumerTag(AMQShortString consumerTag, boolean requeue) { - ArrayList producers = new ArrayList(_producers.values()); - _logger.info(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: removeKey - for (Iterator it = producers.iterator(); it.hasNext();) + Iterator messages = _queue.iterator(); + if (_logger.isInfoEnabled()) { - BasicMessageProducer producer = (BasicMessageProducer) it.next(); - producer.resubscribe(); + _logger.info("Rejecting messages from _queue for Consumer tag(" + consumerTag + ") (PDispatchQ) requeue:" + + requeue); + + if (messages.hasNext()) + { + _logger.info("Checking all messages in _queue for Consumer tag(" + consumerTag + ")"); + } + else + { + _logger.info("No messages in _queue to reject"); + } + } + while (messages.hasNext()) + { + UnprocessedMessage message = (UnprocessedMessage) messages.next(); + + if ((consumerTag == null) || message.getDeliverBody().consumerTag.equals(consumerTag)) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Removing message(" + System.identityHashCode(message) + ") from _queue DT:" + + message.getDeliverBody().deliveryTag); + } + + messages.remove(); + + rejectMessage(message, requeue); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Rejected the message(" + message.getDeliverBody() + ") for consumer :" + consumerTag); + } + } } } @@ -2198,284 +2418,349 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - private void suspendChannel(boolean suspend) throws AMQException + private void resubscribeProducers() throws AMQException { - synchronized (_suspensionLock) + ArrayList producers = new ArrayList(_producers.values()); + _logger.info(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: removeKey + for (Iterator it = producers.iterator(); it.hasNext();) { - if (_logger.isDebugEnabled()) + BasicMessageProducer producer = (BasicMessageProducer) it.next(); + producer.resubscribe(); + } + } + + private void returnBouncedMessage(final UnprocessedMessage message) + { + _connection.performConnectionTask(new Runnable() { - _logger.debug("Setting channel flow : " + (suspend ? "suspended" : "unsuspended")); - } + public void run() + { + try + { + // Bounced message is processed here, away from the mina thread + AbstractJMSMessage bouncedMessage = + _messageFactoryRegistry.createMessage(0, false, message.getBounceBody().exchange, + message.getBounceBody().routingKey, message.getContentHeader(), message.getBodies()); - _suspended = suspend; + AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode); + AMQShortString reason = message.getBounceBody().replyText; + _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), - getProtocolMinorVersion(), - !suspend); // active + // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. + if (errorCode == AMQConstant.NO_CONSUMERS) + { + _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage)); + } + else if (errorCode == AMQConstant.NO_ROUTE) + { + _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage)); + } + else + { + _connection.exceptionReceived( + new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage)); + } - _connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class); - } + } + catch (Exception e) + { + _logger.error( + "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", + e); + } + } + }); } - - public void confirmConsumerCancelled(AMQShortString consumerTag) + /** + * Suspends or unsuspends this session. + * + * @param suspend true indicates that the session should be suspended, false indicates that it + * should be unsuspended. + * + * @throws AMQException If the session cannot be suspended for any reason. + * + * @todo Be aware of possible changes to parameter order as versions change. + */ + private void suspendChannel(boolean suspend) throws AMQException // , FailoverException { - - // Remove the consumer from the map - BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag); - if (consumer != null) + synchronized (_suspensionLock) { -// fixme this isn't right.. needs to check if _queue contains data for this consumer - if (consumer.isAutoClose())// && _queue.isEmpty()) - { - consumer.closeWhenNoMessages(true); - } - - if (!consumer.isNoConsume()) + try { - //Clean the Maps up first - //Flush any pending messages for this consumerTag - if (_dispatcher != null) + if (_logger.isDebugEnabled()) { - _logger.info("Dispatcher is not null"); + _logger.debug("Setting channel flow : " + (suspend ? "suspended" : "unsuspended")); } - else - { - _logger.info("Dispatcher is null so created stopped dispatcher"); - startDistpatcherIfNecessary(true); - } + _suspended = suspend; - _dispatcher.rejectPending(consumer); + AMQFrame channelFlowFrame = + ChannelFlowBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), + !suspend); + + _connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class); } - else + catch (FailoverException e) { - //Just close the consumer - //fixme the CancelOK is being processed before the arriving messages.. - // The dispatcher is still to process them so the server sent in order but the client - // has yet to receive before the close comes in. - -// consumer.markClosed(); + throw new AMQException("Fail-over interrupted suspend/unsuspend channel.", e); } } - else - { - _logger.warn("Unable to confirm cancellation of consumer (" + consumerTag + "). Not found in consumer map."); - } - - - } - - /* - * I could have combined the last 3 methods, but this way it improves readability - */ - private AMQTopic checkValidTopic(Topic topic) throws JMSException - { - if (topic == null) - { - throw new javax.jms.InvalidDestinationException("Invalid Topic"); - } - if ((topic instanceof TemporaryDestination) && ((TemporaryDestination) topic).getSession() != this) - { - throw new javax.jms.InvalidDestinationException("Cannot create a subscription on a temporary topic created in another session"); - } - if (!(topic instanceof AMQTopic)) - { - throw new javax.jms.InvalidDestinationException("Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: " + topic.getClass().getName()); - } - return (AMQTopic) topic; } - private void checkValidQueue(Queue queue) throws InvalidDestinationException + /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ + private class Dispatcher extends Thread { - if (queue == null) + + /** Track the 'stopped' state of the dispatcher, a session starts in the stopped state. */ + private final AtomicBoolean _closed = new AtomicBoolean(false); + + private final Object _lock = new Object(); + + public Dispatcher() { - throw new javax.jms.InvalidDestinationException("Invalid Queue"); + super("Dispatcher-Channel-" + _channelId); + if (_dispatcherLogger.isInfoEnabled()) + { + _dispatcherLogger.info(getName() + " created"); + } } - } - private void checkValidDestination(Destination destination) throws InvalidDestinationException - { - if (destination == null) + public void close() { - throw new javax.jms.InvalidDestinationException("Invalid Queue"); + _closed.set(true); + interrupt(); + + // fixme awaitTermination + } - } + public void rejectPending(BasicMessageConsumer consumer) + { + synchronized (_lock) + { + boolean stopped = _dispatcher.connectionStopped(); - public AMQShortString getTemporaryTopicExchangeName() - { - return _connection.getTemporaryTopicExchangeName(); - } + if (!stopped) + { + _dispatcher.setConnectionStopped(true); + } - public AMQShortString getTemporaryQueueExchangeName() - { - return _connection.getTemporaryQueueExchangeName(); - } + // Reject messages on pre-receive queue + consumer.rollback(); + // Reject messages on pre-dispatch queue + rejectMessagesForConsumerTag(consumer.getConsumerTag(), true); - public int getTicket() - { - return _ticket; - } + // closeConsumer + consumer.markClosed(); - public void setTicket(int ticket) - { - _ticket = ticket; - } + _dispatcher.setConnectionStopped(stopped); + } + } - public void requestAccess(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write, boolean read) throws AMQException - { - getProtocolHandler().writeCommandFrameAndWaitForReply(AccessRequestBody.createAMQFrame(getChannelId(), - getProtocolMajorVersion(), - getProtocolMinorVersion(), - active, - exclusive, - passive, - read, - realm, - write), - new BlockingMethodFrameListener(_channelId) - { - - public boolean processMethod(int channelId, AMQMethodBody frame) throws AMQException - { - if (frame instanceof AccessRequestOkBody) - { - setTicket(((AccessRequestOkBody) frame).getTicket()); - return true; - } - else - { - return false; - } - } - }); + public void rollback() + { - } + synchronized (_lock) + { + boolean isStopped = connectionStopped(); - private class SuspenderRunner implements Runnable - { - private boolean _suspend; + if (!isStopped) + { + setConnectionStopped(true); + } + + rejectAllMessages(true); + + _dispatcherLogger.debug("Session Pre Dispatch Queue cleared"); + + for (BasicMessageConsumer consumer : _consumers.values()) + { + if (!consumer.isNoConsume()) + { + consumer.rollback(); + } + else + { + // should perhaps clear the _SQ here. + // consumer._synchronousQueue.clear(); + consumer.clearReceiveQueue(); + } + + } + + setConnectionStopped(isStopped); + } - public SuspenderRunner(boolean suspend) - { - _suspend = suspend; } public void run() { - try + if (_dispatcherLogger.isInfoEnabled()) { - suspendChannel(_suspend); + _dispatcherLogger.info(getName() + " started"); } - catch (AMQException e) + + UnprocessedMessage message; + + // Allow disptacher to start stopped + synchronized (_lock) { - _logger.warn("Unable to suspend channel"); + while (connectionStopped()) + { + try + { + _lock.wait(); + } + catch (InterruptedException e) + { + // ignore + } + } } - } - } + try + { + while (!_closed.get() && ((message = (UnprocessedMessage) _queue.take()) != null)) + { + synchronized (_lock) + { + + while (connectionStopped()) + { + _lock.wait(); + } - private void rejectAllMessages(boolean requeue) - { - rejectMessagesForConsumerTag(null, requeue); - } + dispatchMessage(message); - /** - * @param consumerTag The consumerTag to prune from queue or all if null - * @param requeue Should the removed messages be requeued (or discarded. Possibly to DLQ) - */ + while (connectionStopped()) + { + _lock.wait(); + } - private void rejectMessagesForConsumerTag(AMQShortString consumerTag, boolean requeue) - { - Iterator messages = _queue.iterator(); - if (_logger.isInfoEnabled()) - { - _logger.info("Rejecting messages from _queue for Consumer tag(" + consumerTag + - ") (PDispatchQ) requeue:" + requeue); + } - if (messages.hasNext()) + } + } + catch (InterruptedException e) { - _logger.info("Checking all messages in _queue for Consumer tag(" + consumerTag + ")"); + // ignore } - else + + if (_dispatcherLogger.isInfoEnabled()) { - _logger.info("No messages in _queue to reject"); + _dispatcherLogger.info(getName() + " thread terminating for channel " + _channelId); } } - while (messages.hasNext()) + + // only call while holding lock + final boolean connectionStopped() { - UnprocessedMessage message = (UnprocessedMessage) messages.next(); + return _connectionStopped; + } - if (consumerTag == null || message.getDeliverBody().consumerTag.equals(consumerTag)) + boolean setConnectionStopped(boolean connectionStopped) + { + boolean currently; + synchronized (_lock) { - if (_logger.isDebugEnabled()) + currently = _connectionStopped; + _connectionStopped = connectionStopped; + _lock.notify(); + + if (_dispatcherLogger.isDebugEnabled()) { - _logger.debug("Removing message(" + System.identityHashCode(message) + - ") from _queue DT:" + message.getDeliverBody().deliveryTag); + _dispatcherLogger.debug("Set Dispatcher Connection " + (connectionStopped ? "Stopped" : "Started") + + ": Currently " + (currently ? "Stopped" : "Started")); } + } - messages.remove(); + return currently; + } - rejectMessage(message, requeue); + private void dispatchMessage(UnprocessedMessage message) + { + if (message.getDeliverBody() != null) + { + final BasicMessageConsumer consumer = + (BasicMessageConsumer) _consumers.get(message.getDeliverBody().consumerTag); - if (_logger.isDebugEnabled()) + if ((consumer == null) || consumer.isClosed()) { - _logger.debug("Rejected the message(" + message.getDeliverBody() + ") for consumer :" + consumerTag); + if (_dispatcherLogger.isInfoEnabled()) + { + if (consumer == null) + { + _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "[" + + message.getDeliverBody().deliveryTag + "] from queue " + + message.getDeliverBody().consumerTag + " )without a handler - rejecting(requeue)..."); + } + else + { + _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "[" + + message.getDeliverBody().deliveryTag + "] from queue " + " consumer(" + + consumer.debugIdentity() + ") is closed rejecting(requeue)..."); + } + } + // Don't reject if we're already closing + if (!_closed.get()) + { + rejectMessage(message, true); + } + } + else + { + consumer.notifyMessage(message, _channelId); } } } } - - public void rejectMessage(UnprocessedMessage message, boolean requeue) + /*public void requestAccess(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write, + boolean read) throws AMQException { + getProtocolHandler().writeCommandFrameAndWaitForReply(AccessRequestBody.createAMQFrame(getChannelId(), + getProtocolMajorVersion(), getProtocolMinorVersion(), active, exclusive, passive, read, realm, write), + new BlockingMethodFrameListener(_channelId) + { - if (_logger.isTraceEnabled()) - { - _logger.trace("Rejecting Unacked message:" + message.getDeliverBody().deliveryTag); - } + public boolean processMethod(int channelId, AMQMethodBody frame) // throws AMQException + { + if (frame instanceof AccessRequestOkBody) + { + setTicket(((AccessRequestOkBody) frame).getTicket()); - rejectMessage(message.getDeliverBody().deliveryTag, requeue); - } + return true; + } + else + { + return false; + } + } + }); + }*/ - public void rejectMessage(AbstractJMSMessage message, boolean requeue) + private class SuspenderRunner implements Runnable { - if (_logger.isTraceEnabled()) + private boolean _suspend; + + public SuspenderRunner(boolean suspend) { - _logger.trace("Rejecting Abstract message:" + message.getDeliveryTag()); + _suspend = suspend; } - rejectMessage(message.getDeliveryTag(), requeue); - - } - public void rejectMessage(long deliveryTag, boolean requeue) - { - if (_acknowledgeMode == CLIENT_ACKNOWLEDGE || - _acknowledgeMode == SESSION_TRANSACTED) + public void run() { - if (_logger.isDebugEnabled()) + try { - _logger.debug("Rejecting delivery tag:" + deliveryTag); + suspendChannel(_suspend); + } + catch (AMQException e) + { + _logger.warn("Unable to suspend channel"); } - AMQFrame basicRejectBody = BasicRejectBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), - getProtocolMinorVersion(), - deliveryTag, - requeue); - - _connection.getProtocolHandler().writeFrame(basicRejectBody); } } - - public boolean isStrictAMQP() - { - return _strictAMQP; - } - } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 1c3cdbcb65..3a31eda754 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -20,22 +20,10 @@ */ package org.apache.qpid.client; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; - import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; @@ -48,6 +36,19 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.jms.MessageConsumer; import org.apache.qpid.jms.Session; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + public class BasicMessageConsumer extends Closeable implements MessageConsumer { private static final Logger _logger = Logger.getLogger(BasicMessageConsumer.class); @@ -140,9 +141,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private List _closedStack = null; protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, - String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, - AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow, - boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) + String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, + AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow, + boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) { _channelId = channelId; _connection = connection; @@ -219,7 +220,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (_logger.isDebugEnabled()) { _logger.debug("Session stopped : Message listener(" + messageListener + ") set for destination " - + _destination); + + _destination); } } else @@ -468,7 +469,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (_closedStack != null) { _logger.trace(_consumerTag + " close():" - + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); + + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); _logger.trace(_consumerTag + " previously:" + _closedStack.toString()); } else @@ -481,9 +482,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { // TODO: Be aware of possible changes to parameter order as versions change. final AMQFrame cancelFrame = - BasicCancelBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion(), _consumerTag, // consumerTag - false); // nowait + BasicCancelBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(), + _protocolHandler.getProtocolMinorVersion(), _consumerTag, // consumerTag + false); // nowait try { @@ -497,10 +498,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } catch (AMQException e) { - // _logger.error("Error closing consumer: " + e, e); - JMSException jmse = new JMSException("Error closing consumer: " + e); - jmse.setLinkedException(e); - throw jmse; + throw new JMSAMQException("Error closing consumer: " + e, e); + } + catch (FailoverException e) + { + throw new JMSAMQException("FailoverException interrupted basic cancel.", e); } } else @@ -540,7 +542,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (_closedStack != null) { _logger.trace(_consumerTag + " markClosed():" - + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8)); + + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8)); _logger.trace(_consumerTag + " previously:" + _closedStack.toString()); } else @@ -572,9 +574,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer try { AbstractJMSMessage jmsMessage = - _messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag, - messageFrame.getDeliverBody().redelivered, messageFrame.getDeliverBody().exchange, - messageFrame.getDeliverBody().routingKey, messageFrame.getContentHeader(), messageFrame.getBodies()); + _messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag, + messageFrame.getDeliverBody().redelivered, messageFrame.getDeliverBody().exchange, + messageFrame.getDeliverBody().routingKey, messageFrame.getContentHeader(), messageFrame.getBodies()); if (debug) { @@ -659,15 +661,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer switch (_acknowledgeMode) { - case Session.PRE_ACKNOWLEDGE: - _session.acknowledgeMessage(msg.getDeliveryTag(), false); - break; + case Session.PRE_ACKNOWLEDGE: + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + break; - case Session.CLIENT_ACKNOWLEDGE: - // we set the session so that when the user calls acknowledge() it can call the method on session - // to send out the appropriate frame - msg.setAMQSession(_session); - break; + case Session.CLIENT_ACKNOWLEDGE: + // we set the session so that when the user calls acknowledge() it can call the method on session + // to send out the appropriate frame + msg.setAMQSession(_session); + break; } } @@ -677,55 +679,55 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer switch (_acknowledgeMode) { - case Session.CLIENT_ACKNOWLEDGE: - if (isNoConsume()) - { - _session.acknowledgeMessage(msg.getDeliveryTag(), false); - } + case Session.CLIENT_ACKNOWLEDGE: + if (isNoConsume()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + } - break; + break; - case Session.DUPS_OK_ACKNOWLEDGE: - if (++_outstanding >= _prefetchHigh) - { - _dups_ok_acknowledge_send = true; - } + case Session.DUPS_OK_ACKNOWLEDGE: + if (++_outstanding >= _prefetchHigh) + { + _dups_ok_acknowledge_send = true; + } - if (_outstanding <= _prefetchLow) - { - _dups_ok_acknowledge_send = false; - } + if (_outstanding <= _prefetchLow) + { + _dups_ok_acknowledge_send = false; + } - if (_dups_ok_acknowledge_send) + if (_dups_ok_acknowledge_send) + { + if (!_session.isInRecovery()) { - if (!_session.isInRecovery()) - { - _session.acknowledgeMessage(msg.getDeliveryTag(), true); - } + _session.acknowledgeMessage(msg.getDeliveryTag(), true); } + } - break; + break; - case Session.AUTO_ACKNOWLEDGE: - // we do not auto ack a message if the application code called recover() - if (!_session.isInRecovery()) - { - _session.acknowledgeMessage(msg.getDeliveryTag(), false); - } + case Session.AUTO_ACKNOWLEDGE: + // we do not auto ack a message if the application code called recover() + if (!_session.isInRecovery()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + } - break; + break; - case Session.SESSION_TRANSACTED: - if (isNoConsume()) - { - _session.acknowledgeMessage(msg.getDeliveryTag(), false); - } - else - { - _receivedDeliveryTags.add(msg.getDeliveryTag()); - } + case Session.SESSION_TRANSACTED: + if (isNoConsume()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + } + else + { + _receivedDeliveryTags.add(msg.getDeliveryTag()); + } - break; + break; } } @@ -757,7 +759,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (_closedStack != null) { _logger.trace(_consumerTag + " notifyError():" - + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8)); + + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8)); _logger.trace(_consumerTag + " previously" + _closedStack.toString()); } else @@ -817,7 +819,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } - public void acknowledge() throws JMSException + public void acknowledge() // throws JMSException { if (!isClosed()) { @@ -877,7 +879,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (_logger.isDebugEnabled()) { _logger.debug("Rejecting the messages(" + _receivedDeliveryTags.size() + ") in _receivedDTs (RQ)" - + "for consumer with tag:" + _consumerTag); + + "for consumer with tag:" + _consumerTag); } Long tag = _receivedDeliveryTags.poll(); @@ -907,7 +909,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (_logger.isDebugEnabled()) { _logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ") in _syncQueue (PRQ)" - + "for consumer with tag:" + _consumerTag); + + "for consumer with tag:" + _consumerTag); } Iterator iterator = _synchronousQueue.iterator(); @@ -931,7 +933,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer else { _logger.error("Queue contained a :" + o.getClass() - + " unable to reject as it is not an AbstractJMSMessage. Will be cleared"); + + " unable to reject as it is not an AbstractJMSMessage. Will be cleared"); iterator.remove(); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java b/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java index d1237cff49..0927ca3625 100644 --- a/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java +++ b/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java @@ -18,29 +18,12 @@ * under the License. * */ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ package org.apache.qpid.client; -import javax.jms.JMSException; - import org.apache.qpid.AMQException; +import javax.jms.JMSException; + /** * JMSException does not accept wrapped exceptions in its constructor. Presumably this is because it is a relatively old * Java exception class, before this was added as a default to Throwable. This exception class accepts wrapped exceptions @@ -50,8 +33,6 @@ import org.apache.qpid.AMQException; * Responsibilities Collaborations * Accept wrapped exceptions as a JMSException. * - * - * @author Apache Software Foundation */ public class JMSAMQException extends JMSException { @@ -71,6 +52,11 @@ public class JMSAMQException extends JMSException } } + /** + * @param s The underlying exception. + * + * @deprecated Use the other constructor and write a helpfull message. This one will be deleted. + */ public JMSAMQException(AMQException s) { super(s.getMessage(), String.valueOf(s.getErrorCode())); diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverException.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverException.java index 49377fdc19..037b0dc2d1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverException.java +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverException.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,10 +21,26 @@ package org.apache.qpid.client.failover; /** - * This exception is thrown when failover is taking place and we need to let other - * parts of the client know about this. + * FailoverException is used to indicate that a synchronous request has failed to receive the reply that it is waiting + * for because the fail-over process has been started whilst it was waiting for its reply. Synchronous methods generally + * raise this exception to indicate that they must be re-tried once the fail-over process has completed. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Used to indicate failure of a synchronous request due to fail-over. + *
+ * + * @todo This exception is created and passed as an argument to a method, rather than thrown. The exception is being + * used to represent an event, passed out to other threads. Use of exceptions as arguments rather than as + * exceptions is extremly confusing. Ideally use a condition or set a flag and check it instead. + * This exceptions-as-events pattern seems to be in a similar style to Mina code, which is not pretty, but + * potentially acceptable for that reason. We have the option of extending the mina model to add more events + * to it, that is, anything that is interested in handling failover as an event occurs below the main + * amq event handler, which knows the specific interface of the qpid handlers, which can pass this down as + * an explicit event, without it being an exception. Add failover method to BlockingMethodFrameListener, + * have it set a flag or interrupt the waiting thread, which then creates and raises this exception. */ -public class FailoverException extends RuntimeException +public class FailoverException extends Exception { public FailoverException(String message) { diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java index 844ecbe743..dbbceff523 100644 --- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,59 +20,108 @@ */ package org.apache.qpid.client.failover; -import java.util.concurrent.CountDownLatch; - import org.apache.log4j.Logger; + import org.apache.mina.common.IoSession; + import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQStateManager; +import java.util.concurrent.CountDownLatch; + /** - * When failover is required, we need a separate thread to handle the establishment of the new connection and - * the transfer of subscriptions. - *

- * The reason this needs to be a separate thread is because you cannot do this work inside the MINA IO processor - * thread. One significant task is the connection setup which involves a protocol exchange until a particular state - * is achieved. However if you do this in the MINA thread, you have to block until the state is achieved which means - * the IO processor is not able to do anything at all. + * FailoverHandler is a continuation that performs the failover procedure on a protocol session. As described in the + * class level comment for {@link AMQProtocolHandler}, a protocol connection can span many physical transport + * connections, failing over to a new connection if the transport connection fails. The procedure to establish a new + * connection is expressed as a continuation, in order that it may be run in a seperate thread to the i/o thread that + * detected the failure and is used to handle the communication to establish a new connection. + * + *

The reason this needs to be a separate thread is because this work cannot be done inside the i/o processor + * thread. The significant task is the connection setup which involves a protocol exchange until a particular state + * is achieved. This procedure waits until the state is achieved which would prevent the i/o thread doing the work + * it needs to do to achieve the new state. + * + *

The failover procedure does the following: + * + *

    + *
  1. Sets the failing over condition to true.
  2. + *
  3. Creates a {@link FailoverException} and gets the protocol connection handler to propagate this event to all + * interested parties.
  4. + *
  5. Takes the failover mutex on the protocol connection handler.
  6. + *
  7. Abandons the fail over if any of the interested parties vetoes it. The mutex is released and the condition + * reset.
  8. + *
  9. Creates a new {@link AMQStateManager} and re-established the connection through it.
  10. + *
  11. Informs the AMQConnection if the connection cannot be re-established.
  12. + *
  13. Recreates all sessions from the old connection to the new.
  14. + *
  15. Resets the failing over condition and releases the mutex.
  16. + *
+ * + *

+ *
CRC Card
Responsibilities Collaborations + *
Update fail-over state {@link AMQProtocolHandler} + *
+ * + * @todo The failover latch and mutex are used like a lock and condition. If the retrotranlator supports lock/condition + * then could change over to using them. 1.4 support still needed. + * + * @todo If the condition is set to null on a vetoes fail-over and there are already other threads waiting on the + * condition, they will never be released. It might be an idea to reset the condition in a finally block. + * + * @todo Creates a {@link AMQDisconnectedException} and passes it to the AMQConnection. No need to use an + * exception-as-argument here, could just as easily call a specific method for this purpose on AMQConnection. + * + * @todo Creates a {@link FailoverException} and propagates it to the MethodHandlers. No need to use an + * exception-as-argument here, could just as easily call a specific method for this purpose on + * {@link org.apache.qpid.protocol.AMQMethodListener}. */ public class FailoverHandler implements Runnable { + /** Used for debugging. */ private static final Logger _logger = Logger.getLogger(FailoverHandler.class); + /** Holds the MINA session for the connection that has failed, not the connection that is being failed onto. */ private final IoSession _session; + + /** Holds the protocol handler for the failed connection, upon which the new connection is to be set up. */ private AMQProtocolHandler _amqProtocolHandler; - /** - * Used where forcing the failover host - */ + /** Used to hold the host to fail over to. This is optional and if not set a reconnect to the previous host is tried. */ private String _host; - /** - * Used where forcing the failover port - */ + /** Used to hold the port to fail over to. */ private int _port; + /** + * Creates a failover handler on a protocol session, for a particular MINA session (network connection). + * + * @param amqProtocolHandler The protocol handler that spans the failover. + * @param session The MINA session, for the failing connection. + */ public FailoverHandler(AMQProtocolHandler amqProtocolHandler, IoSession session) { _amqProtocolHandler = amqProtocolHandler; _session = session; } + /** + * Performs the failover procedure. See the class level comment, {@link FailoverHandler}, for a description of the + * failover procedure. + */ public void run() { if (Thread.currentThread().isDaemon()) { throw new IllegalStateException("FailoverHandler must run on a non-daemon thread."); } - //Thread.currentThread().setName("Failover Thread"); + // Create a latch, upon which tasks that must not run in parallel with a failover can wait for completion of + // the fail over. _amqProtocolHandler.setFailoverLatch(new CountDownLatch(1)); // We wake up listeners. If they can handle failover, they will extend the - // FailoverSupport class and will in turn block on the latch until failover - // has completed before retrying the operation + // FailoverRetrySupport class and will in turn block on the latch until failover + // has completed before retrying the operation. _amqProtocolHandler.propagateExceptionToWaiters(new FailoverException("Failing over about to start")); // Since failover impacts several structures we protect them all with a single mutex. These structures @@ -93,14 +142,18 @@ public class FailoverHandler implements Runnable _amqProtocolHandler.setStateManager(existingStateManager); if (_host != null) { - _amqProtocolHandler.getConnection().exceptionReceived(new AMQDisconnectedException("Redirect was vetoed by client")); + _amqProtocolHandler.getConnection().exceptionReceived(new AMQDisconnectedException( + "Redirect was vetoed by client")); } else { - _amqProtocolHandler.getConnection().exceptionReceived(new AMQDisconnectedException("Failover was vetoed by client")); + _amqProtocolHandler.getConnection().exceptionReceived(new AMQDisconnectedException( + "Failover was vetoed by client")); } + _amqProtocolHandler.getFailoverLatch().countDown(); _amqProtocolHandler.setFailoverLatch(null); + return; } @@ -119,12 +172,12 @@ public class FailoverHandler implements Runnable { failoverSucceeded = _amqProtocolHandler.getConnection().attemptReconnection(); } + if (!failoverSucceeded) { _amqProtocolHandler.setStateManager(existingStateManager); - _amqProtocolHandler.getConnection().exceptionReceived( - new AMQDisconnectedException("Server closed connection and no failover " + - "was successful")); + _amqProtocolHandler.getConnection().exceptionReceived(new AMQDisconnectedException( + "Server closed connection and no failover " + "was successful")); } else { @@ -140,6 +193,7 @@ public class FailoverHandler implements Runnable { _logger.info("Client vetoed automatic resubscription"); } + _amqProtocolHandler.getConnection().fireFailoverComplete(); _amqProtocolHandler.setFailoverState(FailoverState.NOT_STARTED); _logger.info("Connection failover completed successfully"); @@ -148,35 +202,36 @@ public class FailoverHandler implements Runnable { _logger.info("Failover process failed - exception being propagated by protocol handler"); _amqProtocolHandler.setFailoverState(FailoverState.FAILED); - try - { - _amqProtocolHandler.exceptionCaught(_session, e); - } + /*try + {*/ + _amqProtocolHandler.exceptionCaught(_session, e); + /*} catch (Exception ex) { _logger.error("Error notifying protocol session of error: " + ex, ex); - } + }*/ } } } - _amqProtocolHandler.getFailoverLatch().countDown(); - } - public String getHost() - { - return _host; + _amqProtocolHandler.getFailoverLatch().countDown(); } + /** + * Sets the host name to fail over to. This is optional and if not set a reconnect to the previous host is tried. + * + * @param host The host name to fail over to. + */ public void setHost(String host) { _host = host; } - public int getPort() - { - return _port; - } - + /** + * Sets the port to fail over to. + * + * @param port The port to fail over to. + */ public void setPort(int port) { _port = port; diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java new file mode 100644 index 0000000000..dece1b6c3f --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java @@ -0,0 +1,54 @@ +package org.apache.qpid.client.failover; + +import org.apache.qpid.client.AMQConnection; + +/** + * FailoverNoopSupport is a {@link FailoverSupport} implementation that does not really provide any failover support + * at all. It wraps a {@link FailoverProtectedOperation} but should that operation throw {@link FailoverException} this + * support class simply re-raises that exception as an IllegalStateException. This support wrapper should only be + * used where the caller can be certain that the failover protected operation cannot acutally throw a failover exception, + * for example, because the caller already holds locks preventing that condition from arising. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Perform a fail-over protected operation with no handling of fail-over conditions. + *
+ */ +public class FailoverNoopSupport implements FailoverSupport +{ + /** The protected operation that is to be retried in the event of fail-over. */ + FailoverProtectedOperation operation; + + /** The connection on which the fail-over protected operation is to be performed. */ + AMQConnection connection; + + /** + * Creates an automatic retrying fail-over handler for the specified operation. + * + * @param operation The fail-over protected operation to wrap in this handler. + */ + public FailoverNoopSupport(FailoverProtectedOperation operation, AMQConnection con) + { + this.operation = operation; + this.connection = con; + } + + /** + * Delegates to another continuation which is to be provided with fail-over handling. + * + * @return The return value from the delegated to continuation. + * @throws E Any exception that the delegated to continuation may raise. + */ + public T execute() throws E + { + try + { + return operation.execute(); + } + catch (FailoverException e) + { + throw new IllegalStateException("Fail-over interupted no-op failover support. " + + "No-op support should only be used where the caller is certaing fail-over cannot occur.", e); + } + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverProtectedOperation.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverProtectedOperation.java new file mode 100644 index 0000000000..efb7bf8aed --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverProtectedOperation.java @@ -0,0 +1,30 @@ +package org.apache.qpid.client.failover; + +import org.apache.qpid.AMQException; + +/** + * FailoverProtectedOperation is a continuation for an operation that may throw a {@link FailoverException} because + * it has been interrupted by the fail-over process. The {@link FailoverRetrySupport} class defines support wrappers + * for failover protected operations, in order to provide different handling schemes when failovers occurr. + * + *

The type of checked exception that the operation may perform has been generified, in order that fail over + * protected operations can be defined that raise arbitrary exceptions. The actuall exception types used should not + * be sub-classes of FailoverException, or else catching FailoverException in the {@link FailoverRetrySupport} classes + * will mask the exception. + * + *

+ *
CRC Card
Responsibilities + *
Perform an operation that may be interrupted by fail-over. + *
+ */ +public interface FailoverProtectedOperation +{ + /** + * Performs the continuations work. + * + * @return Provdes scope for the continuation to return an arbitrary value. + * + * @throws FailoverException If the operation is interrupted by a fail-over notification. + */ + public abstract T execute() throws E, FailoverException; +} diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java new file mode 100644 index 0000000000..1e4908976b --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java @@ -0,0 +1,130 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.failover; + +import org.apache.log4j.Logger; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; + +import javax.jms.JMSException; + +/** + * FailoverRetrySupport is a continuation that wraps another continuation, delaying its execution until it is notified + * that a blocking condition has been met, and executing the continuation within a mutex. If the continuation fails, due + * to the original condition being broken, whilst the continuation is waiting for a reponse to a synchronous request, + * FailoverRetrySupport automatcally rechecks the condition and re-acquires the mutex and re-runs the continution. This + * automatic retrying is continued until the continuation succeeds, or throws an exception (different to + * FailoverException, which is used to signal the failure of the original condition). + * + *

The blocking condition used is that the connection is not currently failing over, and the mutex used is the + * connection failover mutex, which guards against the fail-over process being run during fail-over vulnerable methods. + * These are used like a lock and condition variable. + * + *

The wrapped operation may throw a FailoverException, this is an exception that can be raised by a + * {@link org.apache.qpid.client.protocol.BlockingMethodFrameListener}, in response to it being notified that a + * fail-over wants to start whilst it was waiting. Methods that are vulnerable to fail-over are those that are + * synchronous, where a failure will prevent them from getting the reply they are waiting for and asynchronous + * methods that should not be attempted when a fail-over is in progress. + * + *

Wrapping a synchronous method in a FailoverRetrySupport will have the effect that the operation will not be + * started during fail-over, but be delayed until any current fail-over has completed. Should a fail-over process want + * to start whilst waiting for the synchrnous reply, the FailoverRetrySupport will detect this and rety the operation + * until it succeeds. Synchronous methods are usually coordinated with a + * {@link org.apache.qpid.client.protocol.BlockingMethodFrameListener} which is notified when a fail-over process wants + * to start and throws a FailoverException in response to this. + * + *

Wrapping an asynchronous method in a FailoverRetrySupport will have the effect that the operation will not be + * started during fail-over, but be delayed until any current fail-over has completed. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Provide a continuation synchronized on a fail-over lock and condition. + *
Automatically retry the continuation accross fail-overs until it succeeds, or raises an exception. + *
+ * + * @todo Another continuation. Could use an interface Continuation (as described in other todos, for example, see + * {@link org.apache.qpid.pool.Job}). Then have a wrapping continuation (this), which blocks on an arbitrary + * Condition or Latch (specified in constructor call), that this blocks on before calling the wrapped Continuation. + * Must work on Java 1.4, so check retrotranslator works on Lock/Condition or latch first. Argument and return type + * to match wrapped condition as type parameters. Rename to AsyncConditionalContinuation or something like that. + * + * @todo InterruptedException not handled well. + */ +public class FailoverRetrySupport implements FailoverSupport +{ + /** Used for debugging. */ + private static final Logger _log = Logger.getLogger(FailoverRetrySupport.class); + + /** The protected operation that is to be retried in the event of fail-over. */ + FailoverProtectedOperation operation; + + /** The connection on which the fail-over protected operation is to be performed. */ + AMQConnection connection; + + /** + * Creates an automatic retrying fail-over handler for the specified operation. + * + * @param operation The fail-over protected operation to wrap in this handler. + */ + public FailoverRetrySupport(FailoverProtectedOperation operation, AMQConnection con) + { + this.operation = operation; + this.connection = con; + } + + /** + * Delays a continuation until the "not failing over" condition is met on the specified connection. Repeats + * until the operation throws AMQException or succeeds without being interrupted by fail-over. + * + * @return The result of executing the continuation. + * + * @throws AMQException Any underlying exception is allowed to fall through. + */ + public T execute() throws E + { + while (true) + { + try + { + connection.blockUntilNotFailingOver(); + } + catch (InterruptedException e) + { + _log.debug("Interrupted: " + e, e); + + return null; + } + + synchronized (connection.getFailoverMutex()) + { + try + { + return operation.execute(); + } + catch (FailoverException e) + { + _log.debug("Failover exception caught during operation: " + e, e); + } + } + } + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java index a005bc5fdf..41bac34a34 100644 --- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java @@ -1,65 +1,28 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ package org.apache.qpid.client.failover; -import javax.jms.JMSException; - -import org.apache.log4j.Logger; import org.apache.qpid.client.AMQConnection; -public abstract class FailoverSupport +/** + * FailoverSupport defines an interface for different types of fail-over handlers, that provide different types of + * behaviour for handling fail-overs during operations that can be interrupted by the fail-over process. For example, + * the support could automatically retry once the fail-over process completes, could prevent an operation from being + * started whilst fail-over is running, or could quietly abandon the operation or raise an exception, and so on. + * + *

+ *
CRC Card
Responsibilities + *
Perform a fail-over protected operation with handling for fail-over conditions. + *
+ * + * @todo Continuation, extend some sort of re-usable Continuation interface, which might look very like this one. + */ +public interface FailoverSupport { - private static final Logger _log = Logger.getLogger(FailoverSupport.class); - - public Object execute(AMQConnection con) throws JMSException - { - // We wait until we are not in the middle of failover before acquiring the mutex and then proceeding. - // Any method that can potentially block for any reason should use this class so that deadlock will not - // occur. The FailoverException is propagated by the AMQProtocolHandler to any listeners (e.g. frame listeners) - // that might be causing a block. When that happens, the exception is caught here and the mutex is released - // before waiting for the failover to complete (either successfully or unsuccessfully). - while (true) - { - try - { - con.blockUntilNotFailingOver(); - } - catch (InterruptedException e) - { - _log.info("Interrupted: " + e, e); - return null; - } - synchronized (con.getFailoverMutex()) - { - try - { - return operation(); - } - catch (FailoverException e) - { - _log.info("Failover exception caught during operation: " + e, e); - } - } - } - } - - protected abstract Object operation() throws JMSException; + /** + * Delegates to another continuation which is to be provided with fail-over handling. + * + * @return The return value from the delegated to continuation. + * + * @throws E Any exception that the delegated to continuation may raise. + */ + public T execute() throws E; } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index addef94215..5bf7bffc63 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,24 +20,22 @@ */ package org.apache.qpid.client.protocol; -import java.util.Iterator; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.CountDownLatch; - import org.apache.log4j.Logger; + import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoSession; import org.apache.mina.filter.SSLFilter; import org.apache.mina.filter.codec.ProtocolCodecFilter; + import org.apache.qpid.AMQConnectionClosedException; import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQTimeoutException; -import org.apache.qpid.AMQChannelClosedException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.SSLConfiguration; +import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverHandler; import org.apache.qpid.client.failover.FailoverState; import org.apache.qpid.client.state.AMQState; @@ -60,22 +58,86 @@ import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.ssl.SSLContextFactory; +import java.util.Iterator; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.CountDownLatch; +/** + * AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the + * network by MINA. The primary purpose of AMQProtocolHandler is to translate the generic event model of MINA into the + * specific event model of AMQP, by revealing the type of the received events (from decoded data), and passing the + * event on to more specific handlers for the type. In this sense, it channels the richer event model of AMQP, + * expressed in terms of methods and so on, through the cruder, general purpose event model of MINA, expressed in + * terms of "message received" and so on. + * + *

There is a 1:1 mapping between an AMQProtocolHandler and an {@link AMQConnection}. The connection class is + * exposed to the end user of the AMQP client API, and also implements the JMS Connection API, so provides the public + * API calls through which an individual connection can be manipulated. This protocol handler talks to the network + * through MINA, in a behind the scenes role; it is not an exposed part of the client API. + * + *

There is a 1:many mapping between an AMQProtocolHandler and a set of {@link AMQSession}s. At the MINA level, + * there is one session per connection. At the AMQP level there can be many channels which are also called sessions in + * JMS parlance. The {@link AMQSession}s are managed through an {@link AMQProtocolSession} instance. The protocol + * session is similar to the MINA per-connection session, except that it can span the lifecycle of multiple MINA sessions + * in the event of failover. See below for more information about this. + * + *

Mina provides a session container that can be used to store/retrieve arbitrary objects as String named + * attributes. A more convenient, type-safe, container for session data is provided in the form of + * {@link AMQProtocolSession}. + * + *

A common way to use MINA is to have a single instance of the event handler, and for MINA to pass in its session + * object with every event, and for per-connection data to be held in the MINA session (perhaps using a type-safe wrapper + * as described above). This event handler is different, because dealing with failover complicates things. To the + * end client of an AMQConnection, a failed over connection is still handled through the same connection instance, but + * behind the scenes a new transport connection, and MINA session will have been created. The MINA session object cannot + * be used to track the state of the fail-over process, because it is destroyed and a new one is created, as the old + * connection is shutdown and a new one created. For this reason, an AMQProtocolHandler is created per AMQConnection + * and the protocol session data is held outside of the MINA IOSession. + * + *

This handler is responsibile for setting up the filter chain to filter all events for this handler through. + * The filter chain is set up as a stack of event handers that perform the following functions (working upwards from + * the network traffic at the bottom), handing off incoming events to an asynchronous thread pool to do the work, + * optionally handling secure sockets encoding/decoding, encoding/decoding the AMQP format itself. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Create the filter chain to filter this handlers events. + * {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link SSLFilter}, {@link ReadWriteThreadModel}. + * + *
Maintain fail-over state. + *
+ *
+ * + * @todo Explain the system property: amqj.shared_read_write_pool. How does putting the protocol codec filter before the + * async write filter make it a shared pool? The pooling filter uses the same thread pool for reading and writing + * anyway, see {@link org.apache.qpid.pool.PoolingFilter}, docs for comments. Will putting the protocol codec + * filter before it mean not doing the read/write asynchronously but in the main filter thread? + * + * @todo Use a single handler instance, by shifting everything to do with the 'protocol session' state, including + * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of + * AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could + * be merged, although there is sense in keeping the session model seperate. Will clarify things by having data + * held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so + * that lifecycles of the fields match lifecycles of their containing objects. + */ public class AMQProtocolHandler extends IoHandlerAdapter { + /** Used for debugging. */ private static final Logger _logger = Logger.getLogger(AMQProtocolHandler.class); /** - * The connection that this protocol handler is associated with. There is a 1-1 mapping between connection instances - * and protocol handler instances. + * The connection that this protocol handler is associated with. There is a 1-1 mapping between connection + * instances and protocol handler instances. */ private AMQConnection _connection; /** Our wrapper for a protocol session that provides access to session values in a typesafe manner. */ private volatile AMQProtocolSession _protocolSession; + /** Holds the state of the protocol session. */ private AMQStateManager _stateManager = new AMQStateManager(); + /** Holds the method listeners, */ private final CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet(); /** @@ -91,15 +153,31 @@ public class AMQProtocolHandler extends IoHandlerAdapter */ private FailoverState _failoverState = FailoverState.NOT_STARTED; + /** Used to provide a condition to wait upon for operations that are required to wait for failover to complete. */ private CountDownLatch _failoverLatch; + /** Defines the default timeout to use for synchronous protocol commands. */ private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30; + /** + * Creates a new protocol handler, associated with the specified client connection instance. + * + * @param con The client connection that this is the event handler for. + */ public AMQProtocolHandler(AMQConnection con) { _connection = con; } + /** + * Invoked by MINA when a MINA session for a new connection is created. This method sets up the filter chain on the + * session, which filters the events handled by this handler. The filter chain consists of, handing off events + * to an asynchronous thread pool, optionally encoding/decoding ssl, encoding/decoding AMQP. + * + * @param session The MINA session. + * + * @throws Exception Any underlying exceptions are allowed to fall through to MINA. + */ public void sessionCreated(IoSession session) throws Exception { _logger.debug("Protocol session created for session " + System.identityHashCode(session)); @@ -119,16 +197,15 @@ public class AMQProtocolHandler extends IoHandlerAdapter if (_connection.getSSLConfiguration() != null) { SSLConfiguration sslConfig = _connection.getSSLConfiguration(); - SSLContextFactory sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType()); + SSLContextFactory sslFactory = + new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType()); SSLFilter sslFilter = new SSLFilter(sslFactory.buildClientContext()); sslFilter.setUseClientMode(true); session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter); } - try { - ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance(); threadModel.getAsynchronousReadFilter().createNewJobForSession(session); threadModel.getAsynchronousWriteFilter().createNewJobForSession(session); @@ -142,35 +219,38 @@ public class AMQProtocolHandler extends IoHandlerAdapter _protocolSession.init(); } - public void sessionOpened(IoSession session) throws Exception - { - //System.setProperty("foo", "bar"); - } - /** - * When the broker connection dies we can either get sessionClosed() called or exceptionCaught() followed by - * sessionClosed() depending on whether we were trying to send data at the time of failure. + * Called when the network connection is closed. This can happen, either because the client explicitly requested + * that the connection be closed, in which case nothing is done, or because the connection died. In the case + * where the connection died, an attempt to failover automatically to a new connection may be started. The failover + * process will be started, provided that it is the clients policy to allow failover, and provided that a failover + * has not already been started or failed. + * + *

It is important to note that when the connection dies this method may be called or {@link #exceptionCaught} + * may be called first followed by this method. This depends on whether the client was trying to send data at the + * time of the failure. * - * @param session + * @param session The MINA session. * - * @throws Exception + * @todo Clarify: presumably exceptionCaught is called when the client is sending during a connection failure and + * not otherwise? The above comment doesn't make that clear. */ - public void sessionClosed(IoSession session) throws Exception + public void sessionClosed(IoSession session) { if (_connection.isClosed()) { - _logger.info("Session closed called by client"); + _logger.debug("Session closed called by client"); } else { - _logger.info("Session closed called with failover state currently " + _failoverState); + _logger.debug("Session closed called with failover state currently " + _failoverState); - //reconnetablility was introduced here so as not to disturb the client as they have made their intentions + // reconnetablility was introduced here so as not to disturb the client as they have made their intentions // known through the policy settings. if ((_failoverState != FailoverState.IN_PROGRESS) && _connection.failoverAllowed()) { - _logger.info("FAILOVER STARTING"); + _logger.debug("FAILOVER STARTING"); if (_failoverState == FailoverState.NOT_STARTED) { _failoverState = FailoverState.IN_PROGRESS; @@ -178,12 +258,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter } else { - _logger.info("Not starting failover as state currently " + _failoverState); + _logger.debug("Not starting failover as state currently " + _failoverState); } } else { - _logger.info("Failover not allowed by policy."); + _logger.debug("Failover not allowed by policy."); // or already in progress? if (_logger.isDebugEnabled()) { @@ -192,19 +272,18 @@ public class AMQProtocolHandler extends IoHandlerAdapter if (_failoverState != FailoverState.IN_PROGRESS) { - _logger.info("sessionClose() not allowed to failover"); - _connection.exceptionReceived( - new AMQDisconnectedException("Server closed connection and reconnection " + - "not permitted.")); + _logger.debug("sessionClose() not allowed to failover"); + _connection.exceptionReceived(new AMQDisconnectedException( + "Server closed connection and reconnection " + "not permitted.")); } else { - _logger.info("sessionClose() failover in progress"); + _logger.debug("sessionClose() failover in progress"); } } } - _logger.info("Protocol Session [" + this + "] closed"); + _logger.debug("Protocol Session [" + this + "] closed"); } /** See {@link FailoverHandler} to see rationale for separate thread. */ @@ -223,25 +302,32 @@ public class AMQProtocolHandler extends IoHandlerAdapter _logger.debug("Protocol Session [" + this + ":" + session + "] idle: " + status); if (IdleStatus.WRITER_IDLE.equals(status)) { - //write heartbeat frame: + // write heartbeat frame: _logger.debug("Sent heartbeat"); session.write(HeartbeatBody.FRAME); HeartbeatDiagnostics.sent(); } else if (IdleStatus.READER_IDLE.equals(status)) { - //failover: + // failover: HeartbeatDiagnostics.timeout(); _logger.warn("Timed out while waiting for heartbeat from peer."); session.close(); } } - public void exceptionCaught(IoSession session, Throwable cause) throws Exception + /** + * Invoked when any exception is thrown by a user IoHandler implementation or by MINA. If the cause is an + * IOException, MINA will close the connection automatically. + * + * @param session The MINA session. + * @param cause The exception that triggered this event. + */ + public void exceptionCaught(IoSession session, Throwable cause) { if (_failoverState == FailoverState.NOT_STARTED) { - //if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException))) + // if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException))) if (cause instanceof AMQConnectionClosedException) { _logger.info("Exception caught therefore going to attempt failover: " + cause, cause); @@ -250,8 +336,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter sessionClosed(session); } - //FIXME Need to correctly handle other exceptions. Things like ... -// if (cause instanceof AMQChannelClosedException) + // FIXME Need to correctly handle other exceptions. Things like ... + // if (cause instanceof AMQChannelClosedException) // which will cause the JMSSession to end due to a channel close and so that Session needs // to be removed from the map so we can correctly still call close without an exception when trying to close // the server closed session. See also CloseChannelMethodHandler as the sessionClose is never called on exception @@ -261,6 +347,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter else if (_failoverState == FailoverState.FAILED) { _logger.error("Exception caught by protocol handler: " + cause, cause); + // we notify the state manager of the error in case we have any clients waiting on a state // change. Those "waiters" will be interrupted and can handle the exception AMQException amqe = new AMQException("Protocol handler error: " + cause, cause); @@ -297,7 +384,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter final boolean debug = _logger.isDebugEnabled(); final long msgNumber = ++_messageReceivedCount; - if (debug && (msgNumber % 1000 == 0)) + if (debug && ((msgNumber % 1000) == 0)) { _logger.debug("Received " + _messageReceivedCount + " protocol messages"); } @@ -310,72 +397,77 @@ public class AMQProtocolHandler extends IoHandlerAdapter switch (bodyFrame.getFrameType()) { - case AMQMethodBody.TYPE: + case AMQMethodBody.TYPE: - if (debug) - { - _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame); - } + if (debug) + { + _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame); + } - final AMQMethodEvent evt = new AMQMethodEvent(frame.getChannel(), (AMQMethodBody) bodyFrame); + final AMQMethodEvent evt = + new AMQMethodEvent(frame.getChannel(), (AMQMethodBody) bodyFrame); - try - { + try + { - boolean wasAnyoneInterested = getStateManager().methodReceived(evt); - if (!_frameListeners.isEmpty()) - { - Iterator it = _frameListeners.iterator(); - while (it.hasNext()) - { - final AMQMethodListener listener = (AMQMethodListener) it.next(); - wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; - } - } - if (!wasAnyoneInterested) + boolean wasAnyoneInterested = getStateManager().methodReceived(evt); + if (!_frameListeners.isEmpty()) + { + Iterator it = _frameListeners.iterator(); + while (it.hasNext()) { - throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + _frameListeners); + final AMQMethodListener listener = (AMQMethodListener) it.next(); + wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; } } - catch (AMQException e) + + if (!wasAnyoneInterested) { - getStateManager().error(e); - if (!_frameListeners.isEmpty()) + throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + + _frameListeners); + } + } + catch (AMQException e) + { + getStateManager().error(e); + if (!_frameListeners.isEmpty()) + { + Iterator it = _frameListeners.iterator(); + while (it.hasNext()) { - Iterator it = _frameListeners.iterator(); - while (it.hasNext()) - { - final AMQMethodListener listener = (AMQMethodListener) it.next(); - listener.error(e); - } + final AMQMethodListener listener = (AMQMethodListener) it.next(); + listener.error(e); } - exceptionCaught(session, e); } - break; - case ContentHeaderBody.TYPE: + exceptionCaught(session, e); + } - _protocolSession.messageContentHeaderReceived(frame.getChannel(), - (ContentHeaderBody) bodyFrame); - break; + break; - case ContentBody.TYPE: + case ContentHeaderBody.TYPE: - _protocolSession.messageContentBodyReceived(frame.getChannel(), - (ContentBody) bodyFrame); - break; + _protocolSession.messageContentHeaderReceived(frame.getChannel(), (ContentHeaderBody) bodyFrame); + break; - case HeartbeatBody.TYPE: + case ContentBody.TYPE: - if (debug) - { - _logger.debug("Received heartbeat"); - } - break; + _protocolSession.messageContentBodyReceived(frame.getChannel(), (ContentBody) bodyFrame); + break; + + case HeartbeatBody.TYPE: + + if (debug) + { + _logger.debug("Received heartbeat"); + } + + break; - default: + default: } + _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes()); } @@ -387,10 +479,11 @@ public class AMQProtocolHandler extends IoHandlerAdapter final boolean debug = _logger.isDebugEnabled(); - if (debug && (sentMessages % 1000 == 0)) + if (debug && ((sentMessages % 1000) == 0)) { _logger.debug("Sent " + _messagesOut + " protocol messages"); } + _connection.bytesSent(session.getWrittenBytes()); if (debug) { @@ -408,7 +501,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter { _frameListeners.remove(listener); } - */ + */ public void attainState(AMQState s) throws AMQException { getStateManager().attainState(s); @@ -437,9 +530,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter * @param frame * @param listener the blocking listener. Note the calling thread will block. */ - public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, - BlockingMethodFrameListener listener) - throws AMQException + public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, BlockingMethodFrameListener listener) + throws AMQException, FailoverException { return writeCommandFrameAndWaitForReply(frame, listener, DEFAULT_SYNC_TIMEOUT); } @@ -451,9 +543,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter * @param frame * @param listener the blocking listener. Note the calling thread will block. */ - public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, - BlockingMethodFrameListener listener, long timeout) - throws AMQException + public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, BlockingMethodFrameListener listener, + long timeout) throws AMQException, FailoverException { try { @@ -461,9 +552,10 @@ public class AMQProtocolHandler extends IoHandlerAdapter _protocolSession.writeFrame(frame); AMQMethodEvent e = listener.blockForFrame(timeout); + return e; - // When control resumes before this line, a reply will have been received - // that matches the criteria defined in the blocking listener + // When control resumes before this line, a reply will have been received + // that matches the criteria defined in the blocking listener } catch (AMQException e) { @@ -478,25 +570,33 @@ public class AMQProtocolHandler extends IoHandlerAdapter } /** More convenient method to write a frame and wait for it's response. */ - public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) throws AMQException + public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) throws AMQException, FailoverException { return syncWrite(frame, responseClass, DEFAULT_SYNC_TIMEOUT); } /** More convenient method to write a frame and wait for it's response. */ - public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws AMQException + public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws AMQException, FailoverException { - return writeCommandFrameAndWaitForReply(frame, - new SpecificMethodFrameListener(frame.getChannel(), responseClass), timeout); + return writeCommandFrameAndWaitForReply(frame, new SpecificMethodFrameListener(frame.getChannel(), responseClass), + timeout); } - - public void closeSession(AMQSession session) throws AMQException { _protocolSession.closeSession(session); } + /** + * Closes the connection. + * + *

If a failover exception occurs whilst closing the connection it is ignored, as the connection is closed + * anyway. + * + * @param timeout The timeout to wait for an acknowledgement to the close request. + * + * @throws AMQException If the close fails for any reason. + */ public void closeConnection(long timeout) throws AMQException { getStateManager().changeState(AMQState.CONNECTION_CLOSING); @@ -504,13 +604,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - final AMQFrame frame = ConnectionCloseBody.createAMQFrame(0, - _protocolSession.getProtocolMajorVersion(), - _protocolSession.getProtocolMinorVersion(), // AMQP version (major, minor) - 0, // classId - 0, // methodId - AMQConstant.REPLY_SUCCESS.getCode(), // replyCode - new AMQShortString("JMS client is closing the connection.")); // replyText + final AMQFrame frame = + ConnectionCloseBody.createAMQFrame(0, _protocolSession.getProtocolMajorVersion(), + _protocolSession.getProtocolMinorVersion(), // AMQP version (major, minor) + 0, // classId + 0, // methodId + AMQConstant.REPLY_SUCCESS.getCode(), // replyCode + new AMQShortString("JMS client is closing the connection.")); // replyText try { @@ -521,8 +621,10 @@ public class AMQProtocolHandler extends IoHandlerAdapter { _protocolSession.closeProtocolSession(false); } - - + catch (FailoverException e) + { + _logger.debug("FailoverException interrupted connection close, ignoring as connection close anyway."); + } } /** @return the number of bytes read from this protocol session */ @@ -604,7 +706,6 @@ public class AMQProtocolHandler extends IoHandlerAdapter return _protocolSession.getProtocolMajorVersion(); } - public byte getProtocolMinorVersion() { return _protocolSession.getProtocolMinorVersion(); diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java index 85f98eab69..86db9d5859 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -27,71 +27,137 @@ import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; +/** + * BlockingMethodFrameListener is a 'rendezvous' which acts as a {@link AMQMethodListener} that delegates handling of + * incoming methods to a method listener implemented as a sub-class of this and hands off the processed method or + * error to a consumer. The producer of the event does not have to wait for the consumer to take the event, so this + * differs from a 'rendezvous' in that sense. + * + *

BlockingMethodFrameListeners are used to coordinate waiting for replies to method calls that expect a response. + * They are always used in a 'one-shot' manner, that is, to recieve just one response. Usually the caller has to register + * them as method listeners with an event dispatcher and remember to de-register them (in a finally block) once they + * have been completed. + * + *

The {@link #processMethod} must return true on any incoming method that it handles. This indicates to + * this listeners that the method it is waiting for has arrived. Incoming methods are also filtered by channel prior to + * being passed to the {@link #processMethod} method, so responses are only received for a particular channel. The + * channel id must be passed to the constructor. + * + *

Errors from the producer are rethrown to the consumer. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Accept notification of AMQP method events. {@link AMQMethodEvent} + *
Delegate handling of the method to another method listener. {@link AMQMethodBody} + *
Block until a method is handled by the delegated to handler. + *
Propagate the most recent exception to the consumer. + *
+ * + * @todo Might be neater if this method listener simply wrapped another that provided the method handling using a + * methodRecevied method. The processMethod takes an additional channelId, however none of the implementations + * seem to use it. So wrapping the listeners is possible. + * + * @todo What is to stop a blocking method listener, receiving a second method whilst it is registered as a listener, + * overwriting the first one before the caller of the block method has had a chance to examine it? If one-shot + * behaviour is to be intended it should be enforced, perhaps by always returning false once the blocked for + * method has been received. + * + * @todo Interuption is caught but not handled. This could be allowed to fall through. This might actually be usefull + * for fail-over where a thread is blocking when failure happens, it could be interrupted to abandon or retry + * when this happens. At the very least, restore the interrupted status flag. + * + * @todo If the retrotranslator can handle it, could use a SynchronousQueue to implement this rendezvous. Need to + * check that SynchronousQueue has a non-blocking put method available. + */ public abstract class BlockingMethodFrameListener implements AMQMethodListener { + /** This flag is used to indicate that the blocked for method has been received. */ private volatile boolean _ready = false; - public abstract boolean processMethod(int channelId, AMQMethodBody frame) throws AMQException; - + /** Used to protect the shared event and ready flag between the producer and consumer. */ private final Object _lock = new Object(); - /** - * This is set if there is an exception thrown from processCommandFrame and the - * exception is rethrown to the caller of blockForFrame() - */ + /** Used to hold the most recent exception that is passed to the {@link #error(Exception)} method. */ private volatile Exception _error; + /** Holds the channel id for the channel upon which this listener is waiting for a response. */ protected int _channelId; + /** Holds the incoming method. */ protected AMQMethodEvent _doneEvt = null; + /** + * Creates a new method listener, that filters incoming method to just those that match the specified channel id. + * + * @param channelId The channel id to filter incoming methods with. + */ public BlockingMethodFrameListener(int channelId) { _channelId = channelId; } /** - * This method is called by the MINA dispatching thread. Note that it could - * be called before blockForFrame() has been called. + * Delegates any additional handling of the incoming methods to another handler. * - * @param evt the frame event - * @return true if the listener has dealt with this frame - * @throws AMQException + * @param channelId The channel id of the incoming method. + * @param frame The method body. + * + * @return true if the method was handled, false otherwise. */ - public boolean methodReceived(AMQMethodEvent evt) throws AMQException + public abstract boolean processMethod(int channelId, AMQMethodBody frame); // throws AMQException; + + /** + * Informs this listener that an AMQP method has been received. + * + * @param evt The AMQP method. + * + * @return true if this listener has handled the method, false otherwise. + */ + public boolean methodReceived(AMQMethodEvent evt) // throws AMQException { AMQMethodBody method = evt.getMethod(); - try + /*try + {*/ + boolean ready = (evt.getChannelId() == _channelId) && processMethod(evt.getChannelId(), method); + + if (ready) { - boolean ready = (evt.getChannelId() == _channelId) && processMethod(evt.getChannelId(), method); - if (ready) + // we only update the flag from inside the synchronized block + // so that the blockForFrame method cannot "miss" an update - it + // will only ever read the flag from within the synchronized block + synchronized (_lock) { - // we only update the flag from inside the synchronized block - // so that the blockForFrame method cannot "miss" an update - it - // will only ever read the flag from within the synchronized block - synchronized (_lock) - { - _doneEvt = evt; - _ready = ready; - _lock.notify(); - } + _doneEvt = evt; + _ready = ready; + _lock.notify(); } - return ready; } + + return ready; + + /*} catch (AMQException e) { error(e); // we rethrow the error here, and the code in the frame dispatcher will go round // each listener informing them that an exception has been thrown throw e; - } + }*/ } /** - * This method is called by the thread that wants to wait for a frame. + * Blocks until a method is received that is handled by the delegated to method listener, or the specified timeout + * has passed. + * + * @param timeout The timeout in milliseconds. + * + * @return The AMQP method that was received. + * + * @throws AMQException + * @throws FailoverException */ - public AMQMethodEvent blockForFrame(long timeout) throws AMQException + public AMQMethodEvent blockForFrame(long timeout) throws AMQException, FailoverException { synchronized (_lock) { @@ -117,24 +183,25 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener catch (InterruptedException e) { // IGNORE -- //fixme this isn't ideal as being interrupted isn't equivellant to sucess -// if (!_ready && timeout != -1) -// { -// _error = new AMQException("Server did not respond timely"); -// _ready = true; -// } + // if (!_ready && timeout != -1) + // { + // _error = new AMQException("Server did not respond timely"); + // _ready = true; + // } } } } + if (_error != null) { if (_error instanceof AMQException) { - throw(AMQException) _error; + throw (AMQException) _error; } else if (_error instanceof FailoverException) { - // This should ensure that FailoverException is not wrapped and can be caught. - throw(FailoverException) _error; // needed to expose FailoverException. + // This should ensure that FailoverException is not wrapped and can be caught. + throw (FailoverException) _error; // needed to expose FailoverException. } else { @@ -156,6 +223,7 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener // set the error so that the thread that is blocking (against blockForFrame()) // can pick up the exception and rethrow to the caller _error = e; + synchronized (_lock) { _ready = true; diff --git a/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java index 1c70ded62a..623591e0b6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java @@ -34,7 +34,7 @@ public class SpecificMethodFrameListener extends BlockingMethodFrameListener _expectedClass = expectedClass; } - public boolean processMethod(int channelId, AMQMethodBody frame) throws AMQException + public boolean processMethod(int channelId, AMQMethodBody frame) //throws AMQException { return _expectedClass.isInstance(frame); } diff --git a/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java index 642b928d81..0fc39a9318 100644 --- a/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java +++ b/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,16 +20,17 @@ */ package org.apache.qpid.client.util; - +import java.util.Iterator; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import java.util.Iterator; /** * A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow * control) to try to prevent the queue growing (much) further. The underlying queue itself is not bounded therefore the * caller is not obliged to react to the events.

This implementation is only safe where we have a single * thread adding items and a single (different) thread removing items. + * + * @todo Make this implement java.util.Queue and hide the implementation. Then different queue types can be substituted. */ public class FlowControllingBlockingQueue { @@ -81,6 +82,7 @@ public class FlowControllingBlockingQueue } } } + return o; } @@ -104,4 +106,3 @@ public class FlowControllingBlockingQueue return _queue.iterator(); } } - diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java index 51bbe7d0e6..c201e88104 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java @@ -14,42 +14,43 @@ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. + * under the License. + * * - * */ package org.apache.qpid.test.unit.client.channelclose; import junit.framework.TestCase; -import javax.jms.Connection; -import javax.jms.Session; - -import javax.jms.JMSException; -import javax.jms.ExceptionListener; -import javax.jms.MessageProducer; -import javax.jms.MessageConsumer; -import javax.jms.Message; -import javax.jms.TextMessage; -import javax.jms.Queue; +import org.apache.log4j.Logger; -import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQTimeoutException; import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ChannelCloseOkBody; import org.apache.qpid.framing.ChannelOpenBody; import org.apache.qpid.framing.ChannelOpenOkBody; -import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ExchangeDeclareBody; import org.apache.qpid.framing.ExchangeDeclareOkBody; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ChannelCloseOkBody; -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQTimeoutException; -import org.apache.qpid.url.URLSyntaxException; -import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.jms.ConnectionListener; -import org.apache.log4j.Logger; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.url.URLSyntaxException; + +import javax.jms.Connection; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; public class ChannelCloseTest extends TestCase implements ExceptionListener, ConnectionListener { @@ -73,15 +74,14 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con TransportConnection.killAllVMBrokers(); } - /* close channel, use chanel with same id ensure error. - */ - public void testReusingChannelAfterFullClosure() + */ + public void testReusingChannelAfterFullClosure() throws Exception { _connection = newConnection(); - //Create Producer + // Create Producer try { _connection.start(); @@ -113,6 +113,7 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con { _logger.info("Exception occured was:" + e.getErrorCode()); } + assertEquals("Connection should be closed", AMQConstant.CHANNEL_ERROR, e.getErrorCode()); _connection = newConnection(); @@ -134,29 +135,27 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con /* close channel and send guff then send ok no errors */ - public void testSendingMethodsAfterClose() + public void testSendingMethodsAfterClose() throws Exception { try { - _connection = new AMQConnection("amqp://guest:guest@CCTTest/test?brokerlist='" - + _brokerlist + "'"); + _connection = new AMQConnection("amqp://guest:guest@CCTTest/test?brokerlist='" + _brokerlist + "'"); ((AMQConnection) _connection).setConnectionListener(this); - _connection.setExceptionListener(this); - //Change the StateManager for one that doesn't respond with Close-OKs + // Change the StateManager for one that doesn't respond with Close-OKs AMQStateManager oldStateManager = ((AMQConnection) _connection).getProtocolHandler().getStateManager(); _session = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); _connection.start(); - //Test connection + // Test connection checkSendingMessage(); - //Set StateManager to manager that ignores Close-oks + // Set StateManager to manager that ignores Close-oks AMQProtocolSession protocolSession = ((AMQConnection) _connection).getProtocolHandler().getProtocolSession(); AMQStateManager newStateManager = new NoCloseOKStateManager(protocolSession); newStateManager.changeState(oldStateManager.getCurrentState()); @@ -214,7 +213,7 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con createChannelAndTest(TEST_CHANNEL); - //Test connection is still ok + // Test connection is still ok checkSendingMessage(); @@ -248,9 +247,9 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con } } - private void createChannelAndTest(int channel) + private void createChannelAndTest(int channel) throws FailoverException { - //Create A channel + // Create A channel try { createChannel(channel); @@ -274,14 +273,14 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con private void sendClose(int channel) { - AMQFrame frame = ChannelCloseOkBody.createAMQFrame(channel, - ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), - ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion()); + AMQFrame frame = + ChannelCloseOkBody.createAMQFrame(channel, + ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), + ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion()); ((AMQConnection) _connection).getProtocolHandler().writeFrame(frame); } - private void checkSendingMessage() throws JMSException { TEST++; @@ -307,8 +306,7 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con AMQConnection connection = null; try { - connection = new AMQConnection("amqp://guest:guest@CCTTest/test?brokerlist='" - + _brokerlist + "'"); + connection = new AMQConnection("amqp://guest:guest@CCTTest/test?brokerlist='" + _brokerlist + "'"); connection.setConnectionListener(this); @@ -330,24 +328,24 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con fail("Creating new connection when:" + e.getMessage()); } - return connection; } - private void declareExchange(int channelId, String _type, String _name, boolean nowait) throws AMQException + private void declareExchange(int channelId, String _type, String _name, boolean nowait) + throws AMQException, FailoverException { - AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(channelId, - ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), - ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(), - null, // arguments - false, // autoDelete - false, // durable - new AMQShortString(_name), // exchange - false, // internal - nowait, // nowait - true, // passive - 0, // ticket - new AMQShortString(_type)); // type + AMQFrame exchangeDeclare = + ExchangeDeclareBody.createAMQFrame(channelId, + ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), + ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(), null, // arguments + false, // autoDelete + false, // durable + new AMQShortString(_name), // exchange + false, // internal + nowait, // nowait + true, // passive + 0, // ticket + new AMQShortString(_type)); // type if (nowait) { @@ -355,36 +353,31 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con } else { - ((AMQConnection) _connection).getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class, SYNC_TIMEOUT); + ((AMQConnection) _connection).getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class, + SYNC_TIMEOUT); } } - private void createChannel(int channelId) throws AMQException + private void createChannel(int channelId) throws AMQException, FailoverException { - ((AMQConnection) _connection).getProtocolHandler().syncWrite( - ChannelOpenBody.createAMQFrame(channelId, - ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), - ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(), - null), // outOfBand - ChannelOpenOkBody.class); + ((AMQConnection) _connection).getProtocolHandler().syncWrite(ChannelOpenBody.createAMQFrame(channelId, + ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), + ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(), null), // outOfBand + ChannelOpenOkBody.class); } - public void onException(JMSException jmsException) { - //_logger.info("CCT" + jmsException); + // _logger.info("CCT" + jmsException); fail(jmsException.getMessage()); } public void bytesSent(long count) - { - } + { } public void bytesReceived(long count) - { - - } + { } public boolean preFailover(boolean redirect) { @@ -397,6 +390,5 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con } public void failoverComplete() - { - } + { } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java index d52707d965..58ac8294f2 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java @@ -22,29 +22,29 @@ package org.apache.qpid.test.unit.close; import junit.framework.TestCase; -import java.util.concurrent.atomic.AtomicInteger; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.client.AMQConnectionURL; +import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.testutil.QpidClientConnection; +import org.apache.qpid.url.URLSyntaxException; -import javax.jms.ExceptionListener; -import javax.jms.Session; import javax.jms.Connection; +import javax.jms.ExceptionListener; import javax.jms.JMSException; -import javax.jms.Queue; -import javax.jms.MessageProducer; import javax.jms.Message; -import javax.jms.TextMessage; import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; -import org.apache.qpid.client.AMQConnectionFactory; -import org.apache.qpid.client.AMQConnectionURL; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.message.AbstractJMSMessage; -import org.apache.qpid.client.transport.TransportConnection; -import org.apache.qpid.url.URLSyntaxException; -import org.apache.qpid.AMQException; -import org.apache.qpid.testutil.QpidClientConnection; -import org.apache.log4j.Logger; -import org.apache.log4j.Level; +import java.util.concurrent.atomic.AtomicInteger; public class MessageRequeueTest extends TestCase { @@ -86,7 +86,7 @@ public class MessageRequeueTest extends TestCase { super.tearDown(); - if (!passed) // clean up + if (!passed) // clean up { QpidClientConnection conn = new QpidClientConnection(BROKER); @@ -96,6 +96,7 @@ public class MessageRequeueTest extends TestCase conn.disconnect(); } + TransportConnection.killVMBroker(1); } @@ -117,7 +118,7 @@ public class MessageRequeueTest extends TestCase final MessageConsumer consumer = conn.getSession().createConsumer(q); int messagesReceived = 0; - long messageLog[] = new long[numTestMessages + 1]; + long[] messageLog = new long[numTestMessages + 1]; _logger.info("consuming..."); Message msg = consumer.receive(1000); @@ -130,15 +131,13 @@ public class MessageRequeueTest extends TestCase int msgindex = msg.getIntProperty("index"); if (messageLog[msgindex] != 0) { - _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) msg).getDeliveryTag() + - ") more than once."); + _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) msg).getDeliveryTag() + + ") more than once."); } if (_logger.isInfoEnabled()) { - _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " + - "DT:" + dt + - "IN:" + msgindex); + _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " + "DT:" + dt + "IN:" + msgindex); } if (dt == 0) @@ -148,7 +147,7 @@ public class MessageRequeueTest extends TestCase messageLog[msgindex] = dt; - //get Next message + // get Next message msg = consumer.receive(1000); } @@ -163,7 +162,7 @@ public class MessageRequeueTest extends TestCase for (long b : messageLog) { - if (b == 0 && index != 0) //delivery tag of zero shouldn't exist + if ((b == 0) && (index != 0)) // delivery tag of zero shouldn't exist { _logger.error("Index: " + index + " was not received."); list.append(" "); @@ -175,6 +174,7 @@ public class MessageRequeueTest extends TestCase index++; } + assertEquals(list.toString(), 0, failed); _logger.info("consumed: " + messagesReceived); conn.disconnect(); @@ -199,7 +199,7 @@ public class MessageRequeueTest extends TestCase t1.start(); t2.start(); t3.start(); -// t4.start(); + // t4.start(); try { @@ -228,7 +228,7 @@ public class MessageRequeueTest extends TestCase for (long b : receieved) { - if (b == 0 && index != 0) //delivery tag of zero shouldn't exist (and we don't have msg 0) + if ((b == 0) && (index != 0)) // delivery tag of zero shouldn't exist (and we don't have msg 0) { _logger.error("Index: " + index + " was not received."); list.append(" "); @@ -237,8 +237,10 @@ public class MessageRequeueTest extends TestCase list.append(b); failed++; } + index++; } + assertEquals(list.toString() + "-" + numTestMessages + "-" + totalConsumed, 0, failed); assertEquals("number of consumed messages does not match initial data", numTestMessages, totalConsumed); passed = true; @@ -278,15 +280,14 @@ public class MessageRequeueTest extends TestCase int msgindex = result.getIntProperty("index"); if (receieved[msgindex] != 0) { - _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) result).getDeliveryTag() + - ") more than once."); + _logger.error("Received Message(" + msgindex + ":" + + ((AbstractJMSMessage) result).getDeliveryTag() + ") more than once."); } if (_logger.isInfoEnabled()) { - _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " + - "DT:" + dt + - "IN:" + msgindex); + _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " + "DT:" + dt + + "IN:" + msgindex); } if (dt == 0) @@ -297,9 +298,8 @@ public class MessageRequeueTest extends TestCase receieved[msgindex] = dt; } - count++; - if (count % 100 == 0) + if ((count % 100) == 0) { _logger.info("consumer-" + id + ": got " + result + ", new count is " + count); } @@ -328,11 +328,10 @@ public class MessageRequeueTest extends TestCase } } - public void testRequeue() throws JMSException, AMQException, URLSyntaxException { int run = 0; -// while (run < 10) + // while (run < 10) { run++; @@ -359,7 +358,6 @@ public class MessageRequeueTest extends TestCase assertNotNull("Message should not be null", msg); - // As we have not ack'd message will be requeued. _logger.debug("Close Consumer"); consumer.close(); @@ -369,4 +367,4 @@ public class MessageRequeueTest extends TestCase } } -} \ No newline at end of file +} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java index 0e718da19b..8d96977df2 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java @@ -21,18 +21,20 @@ package org.apache.qpid.test.unit.transacted; import junit.framework.TestCase; + +import org.apache.log4j.Logger; + +import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.transport.TransportConnection; -import org.apache.qpid.AMQException; import org.apache.qpid.url.URLSyntaxException; -import org.apache.log4j.Logger; -import javax.jms.Session; -import javax.jms.MessageProducer; -import javax.jms.MessageConsumer; -import javax.jms.Queue; import javax.jms.JMSException; import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; import javax.jms.TextMessage; /** @@ -62,10 +64,10 @@ public class CommitRollbackTest extends TestCase { TransportConnection.createVMBroker(1); } + testMethod++; queue += testMethod; - newConnection(); } @@ -106,7 +108,6 @@ public class CommitRollbackTest extends TestCase assertTrue("session is not transacted", _session.getTransacted()); assertTrue("session is not transacted", _pubSession.getTransacted()); - _logger.info("sending test message"); String MESSAGE_TEXT = "testPutThenDisconnect"; _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); @@ -119,7 +120,7 @@ public class CommitRollbackTest extends TestCase _logger.info("receiving result"); Message result = _consumer.receive(1000); - //commit to ensure message is removed from queue + // commit to ensure message is removed from queue _session.commit(); assertNull("test message was put and disconnected before commit, but is still present", result); @@ -135,7 +136,6 @@ public class CommitRollbackTest extends TestCase assertTrue("session is not transacted", _session.getTransacted()); assertTrue("session is not transacted", _pubSession.getTransacted()); - _logger.info("sending test message"); String MESSAGE_TEXT = "testPutThenDisconnect"; _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); @@ -151,7 +151,7 @@ public class CommitRollbackTest extends TestCase _logger.info("receiving result"); Message result = _consumer.receive(1000); - //commit to ensure message is removed from queue + // commit to ensure message is removed from queue _session.commit(); assertNull("test message was put and disconnected before commit, but is still present", result); @@ -168,7 +168,6 @@ public class CommitRollbackTest extends TestCase assertTrue("session is not transacted", _session.getTransacted()); assertTrue("session is not transacted", _pubSession.getTransacted()); - _logger.info("sending test message"); String MESSAGE_TEXT = "testPutThenRollback"; _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); @@ -335,13 +334,12 @@ public class CommitRollbackTest extends TestCase assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered()); } - /** * Test that rolling back a session purges the dispatcher queue, and the messages arrive in the correct order * * @throws Exception On error */ - public void testSend2ThenRollback() throws Exception + /*public void testSend2ThenRollback() throws Exception { assertTrue("session is not transacted", _session.getTransacted()); assertTrue("session is not transacted", _pubSession.getTransacted()); @@ -391,7 +389,7 @@ public class CommitRollbackTest extends TestCase } assertNull("test message should be null", result); - } + }*/ public void testSend2ThenCloseAfter1andTryAgain() throws Exception { @@ -428,7 +426,7 @@ public class CommitRollbackTest extends TestCase { assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered()); } - else // or it will be msg 2 arriving the first time due to latency. + else // or it will be msg 2 arriving the first time due to latency. { _logger.info("Message 2 wasn't prefetched so wasn't rejected"); assertEquals("2", ((TextMessage) result).getText()); @@ -445,7 +443,6 @@ public class CommitRollbackTest extends TestCase } - public void testPutThenRollbackThenGet() throws Exception { assertTrue("session is not transacted", _session.getTransacted()); diff --git a/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java b/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java index 195ed79dab..d52da06f76 100644 --- a/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java +++ b/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java @@ -1,25 +1,25 @@ package org.apache.qpid.testutil; +import org.apache.log4j.Logger; + +import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQConnectionFactory; import org.apache.qpid.client.AMQConnectionURL; -import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.JMSAMQException; import org.apache.qpid.url.URLSyntaxException; -import org.apache.log4j.Logger; -import javax.jms.ExceptionListener; -import javax.jms.Session; import javax.jms.Connection; +import javax.jms.ExceptionListener; import javax.jms.JMSException; -import javax.jms.Queue; -import javax.jms.MessageProducer; import javax.jms.Message; import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; import javax.jms.TextMessage; public class QpidClientConnection implements ExceptionListener { - private static final Logger _logger = Logger.getLogger(QpidClientConnection.class); private boolean transacted = true; @@ -40,17 +40,16 @@ public class QpidClientConnection implements ExceptionListener setPrefetch(5000); } - public void connect() throws JMSException { if (!connected) { /* - * amqp://[user:pass@][clientid]/virtualhost? - * brokerlist='[transport://]host[:port][?option='value'[&option='value']];' - * [&failover='method[?option='value'[&option='value']]'] - * [&option='value']" - */ + * amqp://[user:pass@][clientid]/virtualhost? + * brokerlist='[transport://]host[:port][?option='value'[&option='value']];' + * [&failover='method[?option='value'[&option='value']]'] + * [&option='value']" + */ String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; try { @@ -63,7 +62,6 @@ public class QpidClientConnection implements ExceptionListener session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch); - _logger.info("starting connection"); connection.start(); @@ -124,7 +122,6 @@ public class QpidClientConnection implements ExceptionListener this.prefetch = prefetch; } - /** override as necessary */ public void onException(JMSException exception) { @@ -266,4 +263,3 @@ public class QpidClientConnection implements ExceptionListener _logger.info("consumed: " + messagesReceived); } } - diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java index 931c6cd87a..eb736d437f 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java @@ -23,14 +23,17 @@ package org.apache.qpid; import org.apache.qpid.protocol.AMQConstant; /** - * AMQConnectionClosedException indicates that an operation cannot be performed becauase a connection has been closed. + * AMQConnectionClosedException indicates that a connection has been closed. + * + *

This exception is really used as an event, in order that the method handler that raises it creates an event + * which is propagated to the io handler, in order to notify it of the connection closure. * *

*
CRC Card
Responsibilities Collaborations - *
Represents a failed operation on a closed conneciton. + *
Represents a the closure of a connection. *
* - * @todo Does this duplicate AMQConnectionException? + * @todo Should review where exceptions-as-events */ public class AMQConnectionClosedException extends AMQException { diff --git a/java/common/src/main/java/org/apache/qpid/AMQPInvalidClassException.java b/java/common/src/main/java/org/apache/qpid/AMQPInvalidClassException.java index 883e13e5e6..a0574efa72 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQPInvalidClassException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQPInvalidClassException.java @@ -14,13 +14,22 @@ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. + * under the License. + * * - * */ package org.apache.qpid; - +/** + * AMQPInvalidClassException indicates an error when trying to store an illegally typed argument in a field table. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Represents illegal argument type for field table values. + *
+ * + * @todo Could just re-use an exising exception like IllegalArgumentException or ClassCastException. + */ public class AMQPInvalidClassException extends RuntimeException { public AMQPInvalidClassException(String s) diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java index 808272e9ec..2fbeeda1d4 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java @@ -28,7 +28,7 @@ import org.apache.qpid.framing.AMQMethodBody; * *

An event listener may be associated with a particular context, usually an AMQP channel, and in addition to * receiving method events will be notified of errors on that context. This enables listeners to perform any clean - * up that they need to do before the context is closed. + * up that they need to do before the context is closed or retried. * *

*
CRC Card
Responsibilities @@ -64,8 +64,6 @@ public interface AMQMethodListener * any necessary clean-up for the context. * * @param e The underlying exception that is the source of the error. - * - * @todo Consider narrowing the exception, or wrapping it. */ void error(Exception e); } diff --git a/java/common/src/main/java/org/apache/qpid/util/PrettyPrintingUtils.java b/java/common/src/main/java/org/apache/qpid/util/PrettyPrintingUtils.java index faeb9d7167..10f6a27293 100644 --- a/java/common/src/main/java/org/apache/qpid/util/PrettyPrintingUtils.java +++ b/java/common/src/main/java/org/apache/qpid/util/PrettyPrintingUtils.java @@ -26,6 +26,8 @@ package org.apache.qpid.util; *

*
CRC Card
Responsibilities Collaborations *
+ * + * @todo Drop this. There are already array pretty printing methods it java.utils.Arrays. */ public class PrettyPrintingUtils { diff --git a/java/perftests/RunningPerformanceTests.txt b/java/perftests/RunningPerformanceTests.txt index 95051c088e..54291483bf 100644 --- a/java/perftests/RunningPerformanceTests.txt +++ b/java/perftests/RunningPerformanceTests.txt @@ -116,7 +116,7 @@ The specific performance test cases for QPid are implemented as extensions to JU The most common test case to run is implemented in the class PingAsyncTestPerf, which sends and recieves messages simultaneously. This class uses a PingPongProdicer to do its sending and receiving, and wraps it in a suitable way to make it callable through the extended JUnit test runner. This class also accpets another parameter "batchSize" with a default of "1000". This tells the test how many messages to send before stopping sending and waiting for them all to come back. The actual value entered does not matter too much, but typically values larger than 1000 are used to ensure that there is a reasonable opportunity for simultaneous sending and receiving, and less than 10000 to ensure that each test method invocation does not go on for too long. -The test script parameters can all be seen in the pom.xml file. A three letter code is used on the test scripts, first letter P or T for persistent or transient, second letter Q or T for queue (p2p) or topic (pub/sub), third letter R for reliability tests, C for client scaling tests, M for message size tests.Typically tests run and sample their results for 10 minutes, to get a reasonable measurement of a broker running under a steady load. The tests as configured do not measure peak performance. +The test script parameters can all be seen in the pom.xml file. A three letter code is used on the test scripts, first letter P or T for persistent or transient, second letter Q or T for queue (p2p) or topic (pub/sub), third letter R for reliability tests, C for client scaling tests, M for message size tests.Typically tests run and sample their results for 10 minutes, to get a reasonable measurement of a broker running under a steady load. The tests as configured do not measure 'burst' performance. The reliability/burn in tests, test the broker running at slightly below its maximum throughput for a period of 24 hours. Their purpose is to check that the broker remains stable under load for a reasonable duration, in order to provide some confidence in the long-term stability of its process. These tests are intended to be run as a two step process. The first two tests run for 10 minutes and are used to asses the broker throughput for the test. The output from these tests are to be fed into the rate limiter for the second set of tests, so that the broker may be set up to run at slightly below its maximum throughput for the 24 hour duration. It is suggested that 90% of the rate achieved by the first two tests should be used for this. diff --git a/java/perftests/pom.xml b/java/perftests/pom.xml index 17cc4a3be9..8d03755efc 100644 --- a/java/perftests/pom.xml +++ b/java/perftests/pom.xml @@ -305,16 +305,16 @@ -n PTM-Qpid-2-1M -d10M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=true transacted=false commitBatchSize=10 batchSize=1000 messageSize=1048476 destinationsCount=1 rate=0 maxPending=20000000 - -n FT-Qpid-1 -s [250000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf messageSize=256 batchSize=10000 transacted=true broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" failBeforeSend=true -o $QPID_WORK/results - -n FT-Qpid-2 -s [250000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf messageSize=256 batchSize=10000 transacted=true broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" failAfterSend=true -o $QPID_WORK/results - -n FT-Qpid-3 -s [250000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf messageSize=256 batchSize=10000 transacted=true broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" failAfterCommit=true -o $QPID_WORK/results - -n FT-Qpid-4 -s [250000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf messageSize=256 batchSize=10000 broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" transacted=false failBeforeSend=true -o $QPID_WORK/results - -n FT-Qpid-5 -s [250000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf messageSize=256 batchSize=10000 broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" transacted=false failAfterSend=true -o $QPID_WORK/results - -n FT-Qpid-1-P -s [25000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true messageSize=256 batchSize=10000 transacted=true broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" failBeforeSend=true -o $QPID_WORK/results - -n FT-Qpid-2-P -s [25000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true messageSize=256 batchSize=10000 transacted=true broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" failAfterSend=true -o $QPID_WORK/results - -n FT-Qpid-3-P -s [25000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true messageSize=256 batchSize=10000 transacted=true broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" failAfterCommit=true -o $QPID_WORK/results - -n FT-Qpid-4-P -s [250] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true messageSize=256 broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" transacted=false failBeforeSend=true -o $QPID_WORK/results - -n FT-Qpid-5-P -s [250] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true messageSize=256 broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" transacted=false failAfterSend=true -o $QPID_WORK/results + -n FT-Qpid-1 -s [250000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf messageSize=256 batchSize=10000 transacted=true broker="tcp://127.0.0.1:5001;tcp://127.0.0.1:5002" failBeforeSend=true -o $QPID_WORK/results + -n FT-Qpid-2 -s [250000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf messageSize=256 batchSize=10000 transacted=true broker="tcp://127.0.0.1:5001;tcp://127.0.0.1:5002" failAfterSend=true -o $QPID_WORK/results + -n FT-Qpid-3 -s [250000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf messageSize=256 batchSize=10000 transacted=true broker="tcp://127.0.0.1:5001;tcp://127.0.0.1:5002" failAfterCommit=true -o $QPID_WORK/results + -n FT-Qpid-4 -s [250000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf messageSize=256 batchSize=10000 broker="tcp://127.0.0.1:5001;tcp://127.0.0.1:5002" transacted=false failBeforeSend=true -o $QPID_WORK/results + -n FT-Qpid-5 -s [250000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf messageSize=256 batchSize=10000 broker="tcp://127.0.0.1:5001;tcp://127.0.0.1:5002" transacted=false failAfterSend=true -o $QPID_WORK/results + -n FT-Qpid-1-P -s [25000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true messageSize=256 batchSize=10000 transacted=true broker="tcp://127.0.0.1:5001;tcp://127.0.0.1:5002" failBeforeSend=true -o $QPID_WORK/results + -n FT-Qpid-2-P -s [25000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true messageSize=256 batchSize=10000 transacted=true broker="tcp://127.0.0.1:5001;tcp://127.0.0.1:5002" failAfterSend=true -o $QPID_WORK/results + -n FT-Qpid-3-P -s [25000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true messageSize=256 batchSize=10000 transacted=true broker="tcp://127.0.0.1:5001;tcp://127.0.0.1:5002" failAfterCommit=true -o $QPID_WORK/results + -n FT-Qpid-4-P -s [250] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true messageSize=256 broker="tcp://127.0.0.1:5001;tcp://127.0.0.1:5002" transacted=false failBeforeSend=true -o $QPID_WORK/results + -n FT-Qpid-5-P -s [250] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true messageSize=256 broker="tcp://127.0.0.1:5001;tcp://127.0.0.1:5002" transacted=false failAfterSend=true -o $QPID_WORK/results diff --git a/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java b/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java index c6cbac0ba8..6f62931338 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java +++ b/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java @@ -1,7 +1,7 @@ package org.apache.qpid.server.failure; import junit.framework.TestCase; -import org.apache.qpid.testutil.QpidClientConnection; +import org.apache.qpid.testutil.QpidClientConnectionHelper; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; @@ -17,7 +17,7 @@ public class HeapExhaustion extends TestCase { private static final Logger _logger = Logger.getLogger(HeapExhaustion.class); - protected QpidClientConnection conn; + protected QpidClientConnectionHelper conn; protected final String BROKER = "localhost"; protected final String vhost = "/test"; protected final String queue = "direct://amq.direct//queue"; @@ -32,7 +32,7 @@ public class HeapExhaustion extends TestCase protected void setUp() throws Exception { - conn = new QpidClientConnection(BROKER); + conn = new QpidClientConnectionHelper(BROKER); conn.setVirtualHost(vhost); conn.connect(); diff --git a/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java b/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java deleted file mode 100644 index d3f064293e..0000000000 --- a/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java +++ /dev/null @@ -1,271 +0,0 @@ -package org.apache.qpid.testutil; - -import org.apache.qpid.client.AMQConnectionFactory; -import org.apache.qpid.client.AMQConnectionURL; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.JMSAMQException; -import org.apache.qpid.url.URLSyntaxException; -import org.apache.log4j.Logger; - -import javax.jms.ExceptionListener; -import javax.jms.Session; -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.Queue; -import javax.jms.MessageProducer; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.TextMessage; -import javax.jms.DeliveryMode; - -public class QpidClientConnection implements ExceptionListener -{ - - private static final Logger _logger = Logger.getLogger(QpidClientConnection.class); - - private boolean transacted = true; - private int ackMode = Session.CLIENT_ACKNOWLEDGE; - private Connection connection; - - private String virtualHost; - private String brokerlist; - private int prefetch; - protected Session session; - protected boolean connected; - - public QpidClientConnection(String broker) - { - super(); - setVirtualHost("/test"); - setBrokerList(broker); - setPrefetch(5000); - } - - - public void connect() throws JMSException - { - if (!connected) - { - /* - * amqp://[user:pass@][clientid]/virtualhost? - * brokerlist='[transport://]host[:port][?option='value'[&option='value']];' - * [&failover='method[?option='value'[&option='value']]'] - * [&option='value']" - */ - String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; - try - { - AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl)); - _logger.info("connecting to Qpid :" + brokerUrl); - connection = factory.createConnection(); - - // register exception listener - connection.setExceptionListener(this); - - session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch); - - - _logger.info("starting connection"); - connection.start(); - - connected = true; - } - catch (URLSyntaxException e) - { - throw new JMSAMQException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage(), e); - } - } - } - - public void disconnect() throws JMSException - { - if (connected) - { - session.commit(); - session.close(); - connection.close(); - connected = false; - _logger.info("disconnected"); - } - } - - public void disconnectWithoutCommit() throws JMSException - { - if (connected) - { - session.close(); - connection.close(); - connected = false; - _logger.info("disconnected without commit"); - } - } - - public String getBrokerList() - { - return brokerlist; - } - - public void setBrokerList(String brokerlist) - { - this.brokerlist = brokerlist; - } - - public String getVirtualHost() - { - return virtualHost; - } - - public void setVirtualHost(String virtualHost) - { - this.virtualHost = virtualHost; - } - - public void setPrefetch(int prefetch) - { - this.prefetch = prefetch; - } - - - /** override as necessary */ - public void onException(JMSException exception) - { - _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage()); - } - - public boolean isConnected() - { - return connected; - } - - public Session getSession() - { - return session; - } - - /** - * Put a String as a text messages, repeat n times. A null payload will result in a null message. - * - * @param queueName The queue name to put to - * @param payload the content of the payload - * @param copies the number of messages to put - * - * @throws javax.jms.JMSException any exception that occurs - */ - public void put(String queueName, String payload, int copies, int deliveryMode) throws JMSException - { - if (!connected) - { - connect(); - } - - _logger.info("putting to queue " + queueName); - Queue queue = session.createQueue(queueName); - - final MessageProducer sender = session.createProducer(queue); - - sender.setDeliveryMode(deliveryMode); - - for (int i = 0; i < copies; i++) - { - Message m = session.createTextMessage(payload + i); - m.setIntProperty("index", i + 1); - sender.send(m); - } - - session.commit(); - sender.close(); - _logger.info("put " + copies + " copies"); - } - - /** - * GET the top message on a queue. Consumes the message. Accepts timeout value. - * - * @param queueName The quename to get from - * @param readTimeout The timeout to use - * - * @return the content of the text message if any - * - * @throws javax.jms.JMSException any exception that occured - */ - public Message getNextMessage(String queueName, long readTimeout) throws JMSException - { - if (!connected) - { - connect(); - } - - Queue queue = session.createQueue(queueName); - - final MessageConsumer consumer = session.createConsumer(queue); - - Message message = consumer.receive(readTimeout); - session.commit(); - consumer.close(); - - Message result; - - // all messages we consume should be TextMessages - if (message instanceof TextMessage) - { - result = ((TextMessage) message); - } - else if (null == message) - { - result = null; - } - else - { - _logger.info("warning: received non-text message"); - result = message; - } - - return result; - } - - /** - * GET the top message on a queue. Consumes the message. - * - * @param queueName The Queuename to get from - * - * @return The string content of the text message, if any received - * - * @throws javax.jms.JMSException any exception that occurs - */ - public Message getNextMessage(String queueName) throws JMSException - { - return getNextMessage(queueName, 0); - } - - /** - * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer. - * - * @param queueName The Queue name to consume from - * @param readTimeout The timeout for each consume - * - * @throws javax.jms.JMSException Any exception that occurs during the consume - * @throws InterruptedException If the consume thread was interrupted during a consume. - */ - public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException - { - if (!connected) - { - connect(); - } - - _logger.info("consuming queue " + queueName); - Queue queue = session.createQueue(queueName); - - final MessageConsumer consumer = session.createConsumer(queue); - int messagesReceived = 0; - - _logger.info("consuming..."); - while ((consumer.receive(readTimeout)) != null) - { - messagesReceived++; - } - - session.commit(); - consumer.close(); - _logger.info("consumed: " + messagesReceived); - } -} diff --git a/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnectionHelper.java b/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnectionHelper.java new file mode 100644 index 0000000000..fcc3adee13 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnectionHelper.java @@ -0,0 +1,275 @@ +package org.apache.qpid.testutil; + +import org.apache.log4j.Logger; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.client.AMQConnectionURL; +import org.apache.qpid.client.JMSAMQException; +import org.apache.qpid.url.URLSyntaxException; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +/** + * @todo This was originally cut and paste from the client module leading to a duplicate class, then altered very + * slightly. To avoid the duplicate class the name was altered slightly to have 'Helper' on the end in order + * to distinguish it from the original. Delete this class and use the original instead, just upgrade it to + * provide the new features needed. + */ +public class QpidClientConnectionHelper implements ExceptionListener +{ + + private static final Logger _logger = Logger.getLogger(QpidClientConnectionHelper.class); + + private boolean transacted = true; + private int ackMode = Session.CLIENT_ACKNOWLEDGE; + private Connection connection; + + private String virtualHost; + private String brokerlist; + private int prefetch; + protected Session session; + protected boolean connected; + + public QpidClientConnectionHelper(String broker) + { + super(); + setVirtualHost("/test"); + setBrokerList(broker); + setPrefetch(5000); + } + + public void connect() throws JMSException + { + if (!connected) + { + /* + * amqp://[user:pass@][clientid]/virtualhost? + * brokerlist='[transport://]host[:port][?option='value'[&option='value']];' + * [&failover='method[?option='value'[&option='value']]'] + * [&option='value']" + */ + String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; + try + { + AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl)); + _logger.info("connecting to Qpid :" + brokerUrl); + connection = factory.createConnection(); + + // register exception listener + connection.setExceptionListener(this); + + session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch); + + _logger.info("starting connection"); + connection.start(); + + connected = true; + } + catch (URLSyntaxException e) + { + throw new JMSAMQException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage(), e); + } + } + } + + public void disconnect() throws JMSException + { + if (connected) + { + session.commit(); + session.close(); + connection.close(); + connected = false; + _logger.info("disconnected"); + } + } + + public void disconnectWithoutCommit() throws JMSException + { + if (connected) + { + session.close(); + connection.close(); + connected = false; + _logger.info("disconnected without commit"); + } + } + + public String getBrokerList() + { + return brokerlist; + } + + public void setBrokerList(String brokerlist) + { + this.brokerlist = brokerlist; + } + + public String getVirtualHost() + { + return virtualHost; + } + + public void setVirtualHost(String virtualHost) + { + this.virtualHost = virtualHost; + } + + public void setPrefetch(int prefetch) + { + this.prefetch = prefetch; + } + + /** override as necessary */ + public void onException(JMSException exception) + { + _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage()); + } + + public boolean isConnected() + { + return connected; + } + + public Session getSession() + { + return session; + } + + /** + * Put a String as a text messages, repeat n times. A null payload will result in a null message. + * + * @param queueName The queue name to put to + * @param payload the content of the payload + * @param copies the number of messages to put + * + * @throws javax.jms.JMSException any exception that occurs + */ + public void put(String queueName, String payload, int copies, int deliveryMode) throws JMSException + { + if (!connected) + { + connect(); + } + + _logger.info("putting to queue " + queueName); + Queue queue = session.createQueue(queueName); + + final MessageProducer sender = session.createProducer(queue); + + sender.setDeliveryMode(deliveryMode); + + for (int i = 0; i < copies; i++) + { + Message m = session.createTextMessage(payload + i); + m.setIntProperty("index", i + 1); + sender.send(m); + } + + session.commit(); + sender.close(); + _logger.info("put " + copies + " copies"); + } + + /** + * GET the top message on a queue. Consumes the message. Accepts timeout value. + * + * @param queueName The quename to get from + * @param readTimeout The timeout to use + * + * @return the content of the text message if any + * + * @throws javax.jms.JMSException any exception that occured + */ + public Message getNextMessage(String queueName, long readTimeout) throws JMSException + { + if (!connected) + { + connect(); + } + + Queue queue = session.createQueue(queueName); + + final MessageConsumer consumer = session.createConsumer(queue); + + Message message = consumer.receive(readTimeout); + session.commit(); + consumer.close(); + + Message result; + + // all messages we consume should be TextMessages + if (message instanceof TextMessage) + { + result = ((TextMessage) message); + } + else if (null == message) + { + result = null; + } + else + { + _logger.info("warning: received non-text message"); + result = message; + } + + return result; + } + + /** + * GET the top message on a queue. Consumes the message. + * + * @param queueName The Queuename to get from + * + * @return The string content of the text message, if any received + * + * @throws javax.jms.JMSException any exception that occurs + */ + public Message getNextMessage(String queueName) throws JMSException + { + return getNextMessage(queueName, 0); + } + + /** + * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer. + * + * @param queueName The Queue name to consume from + * @param readTimeout The timeout for each consume + * + * @throws javax.jms.JMSException Any exception that occurs during the consume + * @throws InterruptedException If the consume thread was interrupted during a consume. + */ + public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException + { + if (!connected) + { + connect(); + } + + _logger.info("consuming queue " + queueName); + Queue queue = session.createQueue(queueName); + + final MessageConsumer consumer = session.createConsumer(queue); + int messagesReceived = 0; + + _logger.info("consuming..."); + while ((consumer.receive(readTimeout)) != null) + { + messagesReceived++; + } + + session.commit(); + consumer.close(); + _logger.info("consumed: " + messagesReceived); + } +} -- cgit v1.2.1