summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-06-19 14:53:29 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-06-19 14:53:29 +0000
commit89238b6c323ec9d5164fe630ac66b4b5ab2cfdcd (patch)
tree61dd0bb31bb872563e89466ed01daccd85a2a892 /java/client/src
parentd43e1a5351598e558d01de9493c21870b3b0bcbb (diff)
downloadqpid-python-89238b6c323ec9d5164fe630ac66b4b5ab2cfdcd.tar.gz
Fixed outstanding merge issues and updates to trunk code that were required for sl4j.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@548753 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java13
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java117
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java7
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java89
4 files changed, 138 insertions, 88 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index ebbbac5e6e..50fbd47fe9 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.client;
-import org.apache.log4j.Logger;
import org.apache.qpid.AMQConnectionFailureException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUndeliveredException;
@@ -48,6 +47,10 @@ import org.apache.qpid.jms.FailoverPolicy;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.url.URLSyntaxException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData;
import javax.jms.Destination;
@@ -65,6 +68,7 @@ import javax.naming.NamingException;
import javax.naming.Reference;
import javax.naming.Referenceable;
import javax.naming.StringRefAddr;
+
import java.io.IOException;
import java.net.ConnectException;
import java.nio.channels.UnresolvedAddressException;
@@ -81,7 +85,7 @@ import java.util.concurrent.atomic.AtomicInteger;
public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
{
- private static final Logger _logger = Logger.getLogger(AMQConnection.class);
+ private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class);
private AtomicInteger _idFactory = new AtomicInteger(0);
@@ -237,8 +241,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
/**
- * @todo Some horrible stuff going on here with setting exceptions to be non-null to detect if an exception was
- * thrown during the connection! Intention not clear. Use a flag anyway, not exceptions... Will fix soon.
+ * @todo Some horrible stuff going on here with setting exceptions to be non-null to detect if an exception
+ * was thrown during the connection! Intention not clear. Use a flag anyway, not exceptions... Will fix soon.
*/
public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException
{
@@ -1067,6 +1071,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
*/
public void exceptionReceived(Throwable cause)
{
+
if (_logger.isDebugEnabled())
{
_logger.debug("exceptionReceived done by:" + Thread.currentThread().getName(), cause);
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index c2b7bc26c4..763379fc29 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.client;
-import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.AMQInvalidRoutingKeyException;
@@ -74,6 +74,9 @@ import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.URLSyntaxException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
@@ -98,6 +101,7 @@ import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
+
import java.io.Serializable;
import java.text.MessageFormat;
import java.util.ArrayList;
@@ -109,14 +113,30 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td>
+ * </table>
+ *
+ * @todo Different FailoverSupport implementation are needed on the same method call, in different situations. For
+ * example, when failing-over and reestablishing the bindings, the bind cannot be interrupted by a second
+ * fail-over, if it fails with an exception, the fail-over process should also fail. When binding outside of
+ * the fail-over process, the retry handler could be used to automatically retry the operation once the connection
+ * has been reestablished. All fail-over protected operations should be placed in private methods, with
+ * FailoverSupport passed in by the caller to provide the correct support for the calling context. Sometimes the
+ * fail-over process sets a nowait flag and uses an async method call instead.
+ *
+ * @todo Two new objects created on every failover supported method call. Consider more efficient ways of doing this,
+ * after looking at worse bottlenecks first.
*/
public class AMQSession extends Closeable implements Session, QueueSession, TopicSession
{
/** Used for debugging. */
- private static final Logger _logger = Logger.getLogger(AMQSession.class);
+ private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
/** Used for debugging in the dispatcher. */
- private static final Logger _dispatcherLogger = Logger.getLogger(Dispatcher.class);
+ private static final Logger _dispatcherLogger = LoggerFactory.getLogger(Dispatcher.class);
/** The default maximum number of prefetched message at which to suspend the channel. */
public static final int DEFAULT_PREFETCH_HIGH_MARK = 5000;
@@ -190,8 +210,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
new ConcurrentHashMap<String, TopicSubscriberAdaptor>();
/**
- * Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked up
- * in the {@link #_subscriptions} map.
+ * Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked
+ * up in the {@link #_subscriptions} map.
*/
private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap =
new ConcurrentHashMap<BasicMessageConsumer, String>();
@@ -253,8 +273,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private boolean _suspended;
/**
- * Used to protect the suspension of this session, so that critical code can be executed during suspension, without
- * the session being resumed by other threads.
+ * Used to protect the suspension of this session, so that critical code can be executed during suspension,
+ * without the session being resumed by other threads.
*/
private final Object _suspensionLock = new Object();
@@ -350,12 +370,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
/**
* Creates a new session on a connection with the default message factory factory.
*
- * @param con The connection on which to create the session.
- * @param channelId The unique identifier for the session.
- * @param transacted Indicates whether or not the session is transactional.
- * @param acknowledgeMode The acknoledgement mode for the session.
- * @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session.
- * @param defaultPrefetchLow The number of prefetched messages at which to resume the session.
+ * @param con The connection on which to create the session.
+ * @param channelId The unique identifier for the session.
+ * @param transacted Indicates whether or not the session is transactional.
+ * @param acknowledgeMode The acknoledgement mode for the session.
+ * @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session.
+ * @param defaultPrefetchLow The number of prefetched messages at which to resume the session.
*/
AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh,
int defaultPrefetchLow)
@@ -364,6 +384,18 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
defaultPrefetchLow);
}
+ // ===== JMS Session methods.
+
+ /**
+ * Closes the session with no timeout.
+ *
+ * @throws JMSException If the JMS provider fails to close the session due to some internal error.
+ */
+ public void close() throws JMSException
+ {
+ close(-1);
+ }
+
/**
* Acknowledges all unacknowledged messages on the session, for all message consumers on the session.
*
@@ -416,7 +448,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* @param exchangeName The exchange to bind the queue on.
*
* @throws AMQException If the queue cannot be bound for any reason.
+ *
* @todo Be aware of possible changes to parameter order as versions change.
+ *
* @todo Document the additional arguments that may be passed in the field table. Are these for headers exchanges?
*/
public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
@@ -444,30 +478,24 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
/**
- * Closes the session with no timeout.
- *
- * @throws JMSException If the JMS provider fails to close the session due to some internal error.
- */
- public void close() throws JMSException
- {
- close(-1);
- }
- /**
* Closes the session.
*
- * <p/>Note that this operation succeeds automatically if a fail-over interupts the sycnronous request to close the
- * channel. This is because the channel is marked as closed before the request to close it is made, so the fail-over
- * should not re-open it.
+ * <p/>Note that this operation succeeds automatically if a fail-over interupts the sycnronous request to close
+ * the channel. This is because the channel is marked as closed before the request to close it is made, so the
+ * fail-over should not re-open it.
*
* @param timeout The timeout in milliseconds to wait for the session close acknoledgement from the broker.
*
* @throws JMSException If the JMS provider fails to close the session due to some internal error.
+ *
* @todo Be aware of possible changes to parameter order as versions change.
- * @todo Not certain about the logic of ignoring the failover exception, because the channel won't be re-opened. May
- * need to examine this more carefully.
- * @todo Note that taking the failover mutex doesn't prevent this operation being interrupted by a failover, because
- * the failover process sends the failover event before acquiring the mutex itself.
+ *
+ * @todo Not certain about the logic of ignoring the failover exception, because the channel won't be
+ * re-opened. May need to examine this more carefully.
+ *
+ * @todo Note that taking the failover mutex doesn't prevent this operation being interrupted by a failover,
+ * because the failover process sends the failover event before acquiring the mutex itself.
*/
public void close(long timeout) throws JMSException
{
@@ -556,12 +584,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* Commits all messages done in this transaction and releases any locks currently held.
*
* <p/>If the commit fails, because the commit itself is interrupted by a fail-over between requesting that the
- * commit be done, and receiving an acknowledgement that it has been done, then a JMSException will be thrown. The
- * client will be unable to determine whether or not the commit actually happened on the broker in this case.
+ * commit be done, and receiving an acknowledgement that it has been done, then a JMSException will be thrown.
+ * The client will be unable to determine whether or not the commit actually happened on the broker in this case.
*
* @throws JMSException If the JMS provider fails to commit the transaction due to some internal error. This does
* not mean that the commit is known to have failed, merely that it is not known whether it
* failed or not.
+ *
* @todo Be aware of possible changes to parameter order as versions change.
*/
public void commit() throws JMSException
@@ -917,6 +946,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* @param exclusive Flag to indicate that the queue is exclusive to this client.
*
* @throws AMQException If the queue cannot be declared for any reason.
+ *
* @todo Be aware of possible changes to parameter order as versions change.
*/
public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable,
@@ -1257,9 +1287,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*
* <p/>Restarting a session causes it to take the following actions:
*
- * <ul> <li>Stop message delivery.</li> <li>Mark all messages that might have been delivered but not acknowledged as
- * "redelivered". <li>Restart the delivery sequence including all unacknowledged messages that had been previously
- * delivered. Redelivered messages do not have to be delivered in exactly their original delivery order.</li> </ul>
+ * <ul>
+ * <li>Stop message delivery.</li>
+ * <li>Mark all messages that might have been delivered but not acknowledged as "redelivered".
+ * <li>Restart the delivery sequence including all unacknowledged messages that had been previously delivered.
+ * Redelivered messages do not have to be delivered in exactly their original delivery order.</li>
+ * </ul>
*
* <p/>If the recover operation is interrupted by a fail-over, between asking that the broker begin recovery and
* receiving acknolwedgement that it hasm then a JMSException will be thrown. In this case it will not be possible
@@ -1373,12 +1406,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* Commits all messages done in this transaction and releases any locks currently held.
*
* <p/>If the rollback fails, because the rollback itself is interrupted by a fail-over between requesting that the
- * rollback be done, and receiving an acknowledgement that it has been done, then a JMSException will be thrown. The
- * client will be unable to determine whether or not the rollback actually happened on the broker in this case.
+ * rollback be done, and receiving an acknowledgement that it has been done, then a JMSException will be thrown.
+ * The client will be unable to determine whether or not the rollback actually happened on the broker in this case.
*
* @throws JMSException If the JMS provider fails to rollback the transaction due to some internal error. This does
* not mean that the rollback is known to have failed, merely that it is not known whether it
* failed or not.
+ *
* @todo Be aware of possible changes to parameter order as versions change.
*/
public void rollback() throws JMSException
@@ -1650,6 +1684,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* @return <tt>true</tt> if the queue is bound to the exchange and routing key, <tt>false</tt> if not.
*
* @throws JMSException If the query fails for any reason.
+ *
* @todo Be aware of possible changes to parameter order as versions change.
*/
boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
@@ -1722,9 +1757,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* Starts the session, which ensures that it is not suspended and that its event dispatcher is running.
*
* @throws AMQException If the session cannot be started for any reason.
+ *
* @todo This should be controlled by _stopped as it pairs with the stop method fixme or check the
- * FlowControlledBlockingQueue _queue to see if we have flow controlled. will result in sending Flow messages for
- * each subsequent call to flow.. only need to do this if we have called stop.
+ * FlowControlledBlockingQueue _queue to see if we have flow controlled. will result in sending Flow messages
+ * for each subsequent call to flow.. only need to do this if we have called stop.
*/
void start() throws AMQException
{
@@ -2084,6 +2120,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* @param nowait
*
* @throws AMQException If the exchange cannot be declared for any reason.
+ *
* @todo Be aware of possible changes to parameter order as versions change.
*/
private void declareExchange(final AMQShortString name, final AMQShortString type,
@@ -2128,7 +2165,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* the client.
*
* @throws AMQException If the queue cannot be declared for any reason.
+ *
* @todo Verify the destiation is valid or throw an exception.
+ *
* @todo Be aware of possible changes to parameter order as versions change.
*/
private AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler)
@@ -2172,6 +2211,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* @param queueName The name of the queue to delete.
*
* @throws JMSException If the queue could not be deleted for any reason.
+ *
* @todo Be aware of possible changes to parameter order as versions change.
*/
private void deleteQueue(final AMQShortString queueName) throws JMSException
@@ -2460,6 +2500,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* should be unsuspended.
*
* @throws AMQException If the session cannot be suspended for any reason.
+ *
* @todo Be aware of possible changes to parameter order as versions change.
*/
private void suspendChannel(boolean suspend) throws AMQException // , FailoverException
diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
index 8e66aec0d6..fb2d72267b 100644
--- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
@@ -20,14 +20,15 @@
*/
package org.apache.qpid.client.failover;
-import org.apache.log4j.Logger;
-
import org.apache.mina.common.IoSession;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQStateManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.concurrent.CountDownLatch;
/**
@@ -78,7 +79,7 @@ import java.util.concurrent.CountDownLatch;
public class FailoverHandler implements Runnable
{
/** Used for debugging. */
- private static final Logger _logger = Logger.getLogger(FailoverHandler.class);
+ private static final Logger _logger = LoggerFactory.getLogger(FailoverHandler.class);
/** Holds the MINA session for the connection that has failed, not the connection that is being failed onto. */
private final IoSession _session;
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index d2d29039ea..a00078b010 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -20,12 +20,12 @@
*/
package org.apache.qpid.client.protocol;
-import org.apache.log4j.Logger;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
import org.apache.mina.filter.SSLFilter;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
+
import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
@@ -56,6 +56,9 @@ import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.ssl.SSLContextFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.Iterator;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
@@ -63,10 +66,10 @@ import java.util.concurrent.CountDownLatch;
/**
* AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the
* network by MINA. The primary purpose of AMQProtocolHandler is to translate the generic event model of MINA into the
- * specific event model of AMQP, by revealing the type of the received events (from decoded data), and passing the event
- * on to more specific handlers for the type. In this sense, it channels the richer event model of AMQP, expressed in
- * terms of methods and so on, through the cruder, general purpose event model of MINA, expressed in terms of "message
- * received" and so on.
+ * specific event model of AMQP, by revealing the type of the received events (from decoded data), and passing the
+ * event on to more specific handlers for the type. In this sense, it channels the richer event model of AMQP,
+ * expressed in terms of methods and so on, through the cruder, general purpose event model of MINA, expressed in
+ * terms of "message received" and so on.
*
* <p/>There is a 1:1 mapping between an AMQProtocolHandler and an {@link AMQConnection}. The connection class is
* exposed to the end user of the AMQP client API, and also implements the JMS Connection API, so provides the public
@@ -76,52 +79,56 @@ import java.util.concurrent.CountDownLatch;
* <p/>There is a 1:many mapping between an AMQProtocolHandler and a set of {@link AMQSession}s. At the MINA level,
* there is one session per connection. At the AMQP level there can be many channels which are also called sessions in
* JMS parlance. The {@link AMQSession}s are managed through an {@link AMQProtocolSession} instance. The protocol
- * session is similar to the MINA per-connection session, except that it can span the lifecycle of multiple MINA
- * sessions in the event of failover. See below for more information about this.
+ * session is similar to the MINA per-connection session, except that it can span the lifecycle of multiple MINA sessions
+ * in the event of failover. See below for more information about this.
*
* <p/>Mina provides a session container that can be used to store/retrieve arbitrary objects as String named
- * attributes. A more convenient, type-safe, container for session data is provided in the form of {@link
- * AMQProtocolSession}.
+ * attributes. A more convenient, type-safe, container for session data is provided in the form of
+ * {@link AMQProtocolSession}.
*
* <p/>A common way to use MINA is to have a single instance of the event handler, and for MINA to pass in its session
- * object with every event, and for per-connection data to be held in the MINA session (perhaps using a type-safe
- * wrapper as described above). This event handler is different, because dealing with failover complicates things. To
- * the end client of an AMQConnection, a failed over connection is still handled through the same connection instance,
- * but behind the scenes a new transport connection, and MINA session will have been created. The MINA session object
- * cannot be used to track the state of the fail-over process, because it is destroyed and a new one is created, as the
- * old connection is shutdown and a new one created. For this reason, an AMQProtocolHandler is created per AMQConnection
+ * object with every event, and for per-connection data to be held in the MINA session (perhaps using a type-safe wrapper
+ * as described above). This event handler is different, because dealing with failover complicates things. To the
+ * end client of an AMQConnection, a failed over connection is still handled through the same connection instance, but
+ * behind the scenes a new transport connection, and MINA session will have been created. The MINA session object cannot
+ * be used to track the state of the fail-over process, because it is destroyed and a new one is created, as the old
+ * connection is shutdown and a new one created. For this reason, an AMQProtocolHandler is created per AMQConnection
* and the protocol session data is held outside of the MINA IOSession.
*
- * <p/>This handler is responsibile for setting up the filter chain to filter all events for this handler through. The
- * filter chain is set up as a stack of event handers that perform the following functions (working upwards from the
- * network traffic at the bottom), handing off incoming events to an asynchronous thread pool to do the work, optionally
- * handling secure sockets encoding/decoding, encoding/decoding the AMQP format itself.
+ * <p/>This handler is responsibile for setting up the filter chain to filter all events for this handler through.
+ * The filter chain is set up as a stack of event handers that perform the following functions (working upwards from
+ * the network traffic at the bottom), handing off incoming events to an asynchronous thread pool to do the work,
+ * optionally handling secure sockets encoding/decoding, encoding/decoding the AMQP format itself.
*
- * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations <tr><td> Create the
- * filter chain to filter this handlers events. <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link
- * SSLFilter}, {@link ReadWriteThreadModel}.
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Create the filter chain to filter this handlers events.
+ * <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link SSLFilter}, {@link ReadWriteThreadModel}.
*
- * <tr><td> Maintain fail-over state. <tr><td> </table>
+ * <tr><td> Maintain fail-over state.
+ * <tr><td>
+ * </table>
*
* @todo Explain the system property: amqj.shared_read_write_pool. How does putting the protocol codec filter before the
- * async write filter make it a shared pool? The pooling filter uses the same thread pool for reading and writing
- * anyway, see {@link org.apache.qpid.pool.PoolingFilter}, docs for comments. Will putting the protocol codec filter
- * before it mean not doing the read/write asynchronously but in the main filter thread?
+ * async write filter make it a shared pool? The pooling filter uses the same thread pool for reading and writing
+ * anyway, see {@link org.apache.qpid.pool.PoolingFilter}, docs for comments. Will putting the protocol codec
+ * filter before it mean not doing the read/write asynchronously but in the main filter thread?
+ *
* @todo Use a single handler instance, by shifting everything to do with the 'protocol session' state, including
- * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of AMQProtocolSesssion
- * and AMQConnection will be the same, so if there is high cohesion between them, they could be merged, although there
- * is sense in keeping the session model seperate. Will clarify things by having data held per protocol handler, per
- * protocol session, per network connection, per channel, in seperate classes, so that lifecycles of the fields match
- * lifecycles of their containing objects.
+ * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of
+ * AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could
+ * be merged, although there is sense in keeping the session model seperate. Will clarify things by having data
+ * held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so
+ * that lifecycles of the fields match lifecycles of their containing objects.
*/
public class AMQProtocolHandler extends IoHandlerAdapter
{
/** Used for debugging. */
- private static final Logger _logger = Logger.getLogger(AMQProtocolHandler.class);
+ private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandler.class);
/**
- * The connection that this protocol handler is associated with. There is a 1-1 mapping between connection instances
- * and protocol handler instances.
+ * The connection that this protocol handler is associated with. There is a 1-1 mapping between connection
+ * instances and protocol handler instances.
*/
private AMQConnection _connection;
@@ -165,8 +172,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter
/**
* Invoked by MINA when a MINA session for a new connection is created. This method sets up the filter chain on the
- * session, which filters the events handled by this handler. The filter chain consists of, handing off events to an
- * asynchronous thread pool, optionally encoding/decoding ssl, encoding/decoding AMQP.
+ * session, which filters the events handled by this handler. The filter chain consists of, handing off events
+ * to an asynchronous thread pool, optionally encoding/decoding ssl, encoding/decoding AMQP.
*
* @param session The MINA session.
*
@@ -215,8 +222,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter
/**
* Called when the network connection is closed. This can happen, either because the client explicitly requested
- * that the connection be closed, in which case nothing is done, or because the connection died. In the case where
- * the connection died, an attempt to failover automatically to a new connection may be started. The failover
+ * that the connection be closed, in which case nothing is done, or because the connection died. In the case
+ * where the connection died, an attempt to failover automatically to a new connection may be started. The failover
* process will be started, provided that it is the clients policy to allow failover, and provided that a failover
* has not already been started or failed.
*
@@ -227,7 +234,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
* @param session The MINA session.
*
* @todo Clarify: presumably exceptionCaught is called when the client is sending during a connection failure and
- * not otherwise? The above comment doesn't make that clear.
+ * not otherwise? The above comment doesn't make that clear.
*/
public void sessionClosed(IoSession session)
{
@@ -414,10 +421,6 @@ public class AMQProtocolHandler extends IoHandlerAdapter
final AMQMethodListener listener = (AMQMethodListener) it.next();
wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
}
- if (!wasAnyoneInterested)
- {
- throw new AMQException(null, "AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + _frameListeners, null);
- }
}
if (!wasAnyoneInterested)