diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2007-06-19 14:53:29 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2007-06-19 14:53:29 +0000 |
| commit | 89238b6c323ec9d5164fe630ac66b4b5ab2cfdcd (patch) | |
| tree | 61dd0bb31bb872563e89466ed01daccd85a2a892 /java | |
| parent | d43e1a5351598e558d01de9493c21870b3b0bcbb (diff) | |
| download | qpid-python-89238b6c323ec9d5164fe630ac66b4b5ab2cfdcd.tar.gz | |
Fixed outstanding merge issues and updates to trunk code that were required for sl4j.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@548753 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
8 files changed, 319 insertions, 259 deletions
diff --git a/java/broker/pom.xml b/java/broker/pom.xml index 58200a81b1..bad0d8a52d 100644 --- a/java/broker/pom.xml +++ b/java/broker/pom.xml @@ -49,6 +49,18 @@ </dependency> <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>1.4.0</version> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>1.4.0</version> + </dependency> + + <dependency> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> </dependency> @@ -64,7 +76,7 @@ </dependency> <!-- Test Dependencies --> - <dependency> + <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.4.0</version> @@ -180,8 +192,8 @@ <tasks> <condition property="broker.dir" - else="${user.dir}${file.separator}broker" - value="${user.dir}"> + else="${user.dir}${file.separator}broker" + value="${user.dir}"> <contains string="${user.dir}" substring="broker" /> </condition> diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index ebbbac5e6e..50fbd47fe9 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.client; -import org.apache.log4j.Logger; import org.apache.qpid.AMQConnectionFailureException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUndeliveredException; @@ -48,6 +47,10 @@ import org.apache.qpid.jms.FailoverPolicy; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.url.URLSyntaxException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + import javax.jms.ConnectionConsumer; import javax.jms.ConnectionMetaData; import javax.jms.Destination; @@ -65,6 +68,7 @@ import javax.naming.NamingException; import javax.naming.Reference; import javax.naming.Referenceable; import javax.naming.StringRefAddr; + import java.io.IOException; import java.net.ConnectException; import java.nio.channels.UnresolvedAddressException; @@ -81,7 +85,7 @@ import java.util.concurrent.atomic.AtomicInteger; public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable { - private static final Logger _logger = Logger.getLogger(AMQConnection.class); + private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class); private AtomicInteger _idFactory = new AtomicInteger(0); @@ -237,8 +241,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } /** - * @todo Some horrible stuff going on here with setting exceptions to be non-null to detect if an exception was - * thrown during the connection! Intention not clear. Use a flag anyway, not exceptions... Will fix soon. + * @todo Some horrible stuff going on here with setting exceptions to be non-null to detect if an exception + * was thrown during the connection! Intention not clear. Use a flag anyway, not exceptions... Will fix soon. */ public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException { @@ -1067,6 +1071,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect */ public void exceptionReceived(Throwable cause) { + if (_logger.isDebugEnabled()) { _logger.debug("exceptionReceived done by:" + Thread.currentThread().getName(), cause); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index c2b7bc26c4..763379fc29 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.client; -import org.apache.log4j.Logger; + import org.apache.qpid.AMQException; import org.apache.qpid.AMQInvalidArgumentException; import org.apache.qpid.AMQInvalidRoutingKeyException; @@ -74,6 +74,9 @@ import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.URLSyntaxException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.jms.BytesMessage; import javax.jms.Destination; import javax.jms.IllegalStateException; @@ -98,6 +101,7 @@ import javax.jms.Topic; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; + import java.io.Serializable; import java.text.MessageFormat; import java.util.ArrayList; @@ -109,14 +113,30 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; /** + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> + * </table> + * + * @todo Different FailoverSupport implementation are needed on the same method call, in different situations. For + * example, when failing-over and reestablishing the bindings, the bind cannot be interrupted by a second + * fail-over, if it fails with an exception, the fail-over process should also fail. When binding outside of + * the fail-over process, the retry handler could be used to automatically retry the operation once the connection + * has been reestablished. All fail-over protected operations should be placed in private methods, with + * FailoverSupport passed in by the caller to provide the correct support for the calling context. Sometimes the + * fail-over process sets a nowait flag and uses an async method call instead. + * + * @todo Two new objects created on every failover supported method call. Consider more efficient ways of doing this, + * after looking at worse bottlenecks first. */ public class AMQSession extends Closeable implements Session, QueueSession, TopicSession { /** Used for debugging. */ - private static final Logger _logger = Logger.getLogger(AMQSession.class); + private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class); /** Used for debugging in the dispatcher. */ - private static final Logger _dispatcherLogger = Logger.getLogger(Dispatcher.class); + private static final Logger _dispatcherLogger = LoggerFactory.getLogger(Dispatcher.class); /** The default maximum number of prefetched message at which to suspend the channel. */ public static final int DEFAULT_PREFETCH_HIGH_MARK = 5000; @@ -190,8 +210,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi new ConcurrentHashMap<String, TopicSubscriberAdaptor>(); /** - * Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked up - * in the {@link #_subscriptions} map. + * Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked + * up in the {@link #_subscriptions} map. */ private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap = new ConcurrentHashMap<BasicMessageConsumer, String>(); @@ -253,8 +273,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private boolean _suspended; /** - * Used to protect the suspension of this session, so that critical code can be executed during suspension, without - * the session being resumed by other threads. + * Used to protect the suspension of this session, so that critical code can be executed during suspension, + * without the session being resumed by other threads. */ private final Object _suspensionLock = new Object(); @@ -350,12 +370,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** * Creates a new session on a connection with the default message factory factory. * - * @param con The connection on which to create the session. - * @param channelId The unique identifier for the session. - * @param transacted Indicates whether or not the session is transactional. - * @param acknowledgeMode The acknoledgement mode for the session. - * @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session. - * @param defaultPrefetchLow The number of prefetched messages at which to resume the session. + * @param con The connection on which to create the session. + * @param channelId The unique identifier for the session. + * @param transacted Indicates whether or not the session is transactional. + * @param acknowledgeMode The acknoledgement mode for the session. + * @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session. + * @param defaultPrefetchLow The number of prefetched messages at which to resume the session. */ AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow) @@ -364,6 +384,18 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi defaultPrefetchLow); } + // ===== JMS Session methods. + + /** + * Closes the session with no timeout. + * + * @throws JMSException If the JMS provider fails to close the session due to some internal error. + */ + public void close() throws JMSException + { + close(-1); + } + /** * Acknowledges all unacknowledged messages on the session, for all message consumers on the session. * @@ -416,7 +448,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @param exchangeName The exchange to bind the queue on. * * @throws AMQException If the queue cannot be bound for any reason. + * * @todo Be aware of possible changes to parameter order as versions change. + * * @todo Document the additional arguments that may be passed in the field table. Are these for headers exchanges? */ public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, @@ -444,30 +478,24 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } /** - * Closes the session with no timeout. - * - * @throws JMSException If the JMS provider fails to close the session due to some internal error. - */ - public void close() throws JMSException - { - close(-1); - } - /** * Closes the session. * - * <p/>Note that this operation succeeds automatically if a fail-over interupts the sycnronous request to close the - * channel. This is because the channel is marked as closed before the request to close it is made, so the fail-over - * should not re-open it. + * <p/>Note that this operation succeeds automatically if a fail-over interupts the sycnronous request to close + * the channel. This is because the channel is marked as closed before the request to close it is made, so the + * fail-over should not re-open it. * * @param timeout The timeout in milliseconds to wait for the session close acknoledgement from the broker. * * @throws JMSException If the JMS provider fails to close the session due to some internal error. + * * @todo Be aware of possible changes to parameter order as versions change. - * @todo Not certain about the logic of ignoring the failover exception, because the channel won't be re-opened. May - * need to examine this more carefully. - * @todo Note that taking the failover mutex doesn't prevent this operation being interrupted by a failover, because - * the failover process sends the failover event before acquiring the mutex itself. + * + * @todo Not certain about the logic of ignoring the failover exception, because the channel won't be + * re-opened. May need to examine this more carefully. + * + * @todo Note that taking the failover mutex doesn't prevent this operation being interrupted by a failover, + * because the failover process sends the failover event before acquiring the mutex itself. */ public void close(long timeout) throws JMSException { @@ -556,12 +584,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * Commits all messages done in this transaction and releases any locks currently held. * * <p/>If the commit fails, because the commit itself is interrupted by a fail-over between requesting that the - * commit be done, and receiving an acknowledgement that it has been done, then a JMSException will be thrown. The - * client will be unable to determine whether or not the commit actually happened on the broker in this case. + * commit be done, and receiving an acknowledgement that it has been done, then a JMSException will be thrown. + * The client will be unable to determine whether or not the commit actually happened on the broker in this case. * * @throws JMSException If the JMS provider fails to commit the transaction due to some internal error. This does * not mean that the commit is known to have failed, merely that it is not known whether it * failed or not. + * * @todo Be aware of possible changes to parameter order as versions change. */ public void commit() throws JMSException @@ -917,6 +946,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @param exclusive Flag to indicate that the queue is exclusive to this client. * * @throws AMQException If the queue cannot be declared for any reason. + * * @todo Be aware of possible changes to parameter order as versions change. */ public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable, @@ -1257,9 +1287,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * * <p/>Restarting a session causes it to take the following actions: * - * <ul> <li>Stop message delivery.</li> <li>Mark all messages that might have been delivered but not acknowledged as - * "redelivered". <li>Restart the delivery sequence including all unacknowledged messages that had been previously - * delivered. Redelivered messages do not have to be delivered in exactly their original delivery order.</li> </ul> + * <ul> + * <li>Stop message delivery.</li> + * <li>Mark all messages that might have been delivered but not acknowledged as "redelivered". + * <li>Restart the delivery sequence including all unacknowledged messages that had been previously delivered. + * Redelivered messages do not have to be delivered in exactly their original delivery order.</li> + * </ul> * * <p/>If the recover operation is interrupted by a fail-over, between asking that the broker begin recovery and * receiving acknolwedgement that it hasm then a JMSException will be thrown. In this case it will not be possible @@ -1373,12 +1406,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * Commits all messages done in this transaction and releases any locks currently held. * * <p/>If the rollback fails, because the rollback itself is interrupted by a fail-over between requesting that the - * rollback be done, and receiving an acknowledgement that it has been done, then a JMSException will be thrown. The - * client will be unable to determine whether or not the rollback actually happened on the broker in this case. + * rollback be done, and receiving an acknowledgement that it has been done, then a JMSException will be thrown. + * The client will be unable to determine whether or not the rollback actually happened on the broker in this case. * * @throws JMSException If the JMS provider fails to rollback the transaction due to some internal error. This does * not mean that the rollback is known to have failed, merely that it is not known whether it * failed or not. + * * @todo Be aware of possible changes to parameter order as versions change. */ public void rollback() throws JMSException @@ -1650,6 +1684,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @return <tt>true</tt> if the queue is bound to the exchange and routing key, <tt>false</tt> if not. * * @throws JMSException If the query fails for any reason. + * * @todo Be aware of possible changes to parameter order as versions change. */ boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey) @@ -1722,9 +1757,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * Starts the session, which ensures that it is not suspended and that its event dispatcher is running. * * @throws AMQException If the session cannot be started for any reason. + * * @todo This should be controlled by _stopped as it pairs with the stop method fixme or check the - * FlowControlledBlockingQueue _queue to see if we have flow controlled. will result in sending Flow messages for - * each subsequent call to flow.. only need to do this if we have called stop. + * FlowControlledBlockingQueue _queue to see if we have flow controlled. will result in sending Flow messages + * for each subsequent call to flow.. only need to do this if we have called stop. */ void start() throws AMQException { @@ -2084,6 +2120,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @param nowait * * @throws AMQException If the exchange cannot be declared for any reason. + * * @todo Be aware of possible changes to parameter order as versions change. */ private void declareExchange(final AMQShortString name, final AMQShortString type, @@ -2128,7 +2165,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * the client. * * @throws AMQException If the queue cannot be declared for any reason. + * * @todo Verify the destiation is valid or throw an exception. + * * @todo Be aware of possible changes to parameter order as versions change. */ private AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) @@ -2172,6 +2211,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @param queueName The name of the queue to delete. * * @throws JMSException If the queue could not be deleted for any reason. + * * @todo Be aware of possible changes to parameter order as versions change. */ private void deleteQueue(final AMQShortString queueName) throws JMSException @@ -2460,6 +2500,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * should be unsuspended. * * @throws AMQException If the session cannot be suspended for any reason. + * * @todo Be aware of possible changes to parameter order as versions change. */ private void suspendChannel(boolean suspend) throws AMQException // , FailoverException diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java index 8e66aec0d6..fb2d72267b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java @@ -20,14 +20,15 @@ */ package org.apache.qpid.client.failover; -import org.apache.log4j.Logger; - import org.apache.mina.common.IoSession; import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQStateManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.concurrent.CountDownLatch; /** @@ -78,7 +79,7 @@ import java.util.concurrent.CountDownLatch; public class FailoverHandler implements Runnable { /** Used for debugging. */ - private static final Logger _logger = Logger.getLogger(FailoverHandler.class); + private static final Logger _logger = LoggerFactory.getLogger(FailoverHandler.class); /** Holds the MINA session for the connection that has failed, not the connection that is being failed onto. */ private final IoSession _session; diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index d2d29039ea..a00078b010 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -20,12 +20,12 @@ */ package org.apache.qpid.client.protocol; -import org.apache.log4j.Logger; import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoSession; import org.apache.mina.filter.SSLFilter; import org.apache.mina.filter.codec.ProtocolCodecFilter; + import org.apache.qpid.AMQConnectionClosedException; import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; @@ -56,6 +56,9 @@ import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.ssl.SSLContextFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.Iterator; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CountDownLatch; @@ -63,10 +66,10 @@ import java.util.concurrent.CountDownLatch; /** * AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the * network by MINA. The primary purpose of AMQProtocolHandler is to translate the generic event model of MINA into the - * specific event model of AMQP, by revealing the type of the received events (from decoded data), and passing the event - * on to more specific handlers for the type. In this sense, it channels the richer event model of AMQP, expressed in - * terms of methods and so on, through the cruder, general purpose event model of MINA, expressed in terms of "message - * received" and so on. + * specific event model of AMQP, by revealing the type of the received events (from decoded data), and passing the + * event on to more specific handlers for the type. In this sense, it channels the richer event model of AMQP, + * expressed in terms of methods and so on, through the cruder, general purpose event model of MINA, expressed in + * terms of "message received" and so on. * * <p/>There is a 1:1 mapping between an AMQProtocolHandler and an {@link AMQConnection}. The connection class is * exposed to the end user of the AMQP client API, and also implements the JMS Connection API, so provides the public @@ -76,52 +79,56 @@ import java.util.concurrent.CountDownLatch; * <p/>There is a 1:many mapping between an AMQProtocolHandler and a set of {@link AMQSession}s. At the MINA level, * there is one session per connection. At the AMQP level there can be many channels which are also called sessions in * JMS parlance. The {@link AMQSession}s are managed through an {@link AMQProtocolSession} instance. The protocol - * session is similar to the MINA per-connection session, except that it can span the lifecycle of multiple MINA - * sessions in the event of failover. See below for more information about this. + * session is similar to the MINA per-connection session, except that it can span the lifecycle of multiple MINA sessions + * in the event of failover. See below for more information about this. * * <p/>Mina provides a session container that can be used to store/retrieve arbitrary objects as String named - * attributes. A more convenient, type-safe, container for session data is provided in the form of {@link - * AMQProtocolSession}. + * attributes. A more convenient, type-safe, container for session data is provided in the form of + * {@link AMQProtocolSession}. * * <p/>A common way to use MINA is to have a single instance of the event handler, and for MINA to pass in its session - * object with every event, and for per-connection data to be held in the MINA session (perhaps using a type-safe - * wrapper as described above). This event handler is different, because dealing with failover complicates things. To - * the end client of an AMQConnection, a failed over connection is still handled through the same connection instance, - * but behind the scenes a new transport connection, and MINA session will have been created. The MINA session object - * cannot be used to track the state of the fail-over process, because it is destroyed and a new one is created, as the - * old connection is shutdown and a new one created. For this reason, an AMQProtocolHandler is created per AMQConnection + * object with every event, and for per-connection data to be held in the MINA session (perhaps using a type-safe wrapper + * as described above). This event handler is different, because dealing with failover complicates things. To the + * end client of an AMQConnection, a failed over connection is still handled through the same connection instance, but + * behind the scenes a new transport connection, and MINA session will have been created. The MINA session object cannot + * be used to track the state of the fail-over process, because it is destroyed and a new one is created, as the old + * connection is shutdown and a new one created. For this reason, an AMQProtocolHandler is created per AMQConnection * and the protocol session data is held outside of the MINA IOSession. * - * <p/>This handler is responsibile for setting up the filter chain to filter all events for this handler through. The - * filter chain is set up as a stack of event handers that perform the following functions (working upwards from the - * network traffic at the bottom), handing off incoming events to an asynchronous thread pool to do the work, optionally - * handling secure sockets encoding/decoding, encoding/decoding the AMQP format itself. + * <p/>This handler is responsibile for setting up the filter chain to filter all events for this handler through. + * The filter chain is set up as a stack of event handers that perform the following functions (working upwards from + * the network traffic at the bottom), handing off incoming events to an asynchronous thread pool to do the work, + * optionally handling secure sockets encoding/decoding, encoding/decoding the AMQP format itself. * - * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations <tr><td> Create the - * filter chain to filter this handlers events. <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link - * SSLFilter}, {@link ReadWriteThreadModel}. + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Create the filter chain to filter this handlers events. + * <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link SSLFilter}, {@link ReadWriteThreadModel}. * - * <tr><td> Maintain fail-over state. <tr><td> </table> + * <tr><td> Maintain fail-over state. + * <tr><td> + * </table> * * @todo Explain the system property: amqj.shared_read_write_pool. How does putting the protocol codec filter before the - * async write filter make it a shared pool? The pooling filter uses the same thread pool for reading and writing - * anyway, see {@link org.apache.qpid.pool.PoolingFilter}, docs for comments. Will putting the protocol codec filter - * before it mean not doing the read/write asynchronously but in the main filter thread? + * async write filter make it a shared pool? The pooling filter uses the same thread pool for reading and writing + * anyway, see {@link org.apache.qpid.pool.PoolingFilter}, docs for comments. Will putting the protocol codec + * filter before it mean not doing the read/write asynchronously but in the main filter thread? + * * @todo Use a single handler instance, by shifting everything to do with the 'protocol session' state, including - * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of AMQProtocolSesssion - * and AMQConnection will be the same, so if there is high cohesion between them, they could be merged, although there - * is sense in keeping the session model seperate. Will clarify things by having data held per protocol handler, per - * protocol session, per network connection, per channel, in seperate classes, so that lifecycles of the fields match - * lifecycles of their containing objects. + * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of + * AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could + * be merged, although there is sense in keeping the session model seperate. Will clarify things by having data + * held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so + * that lifecycles of the fields match lifecycles of their containing objects. */ public class AMQProtocolHandler extends IoHandlerAdapter { /** Used for debugging. */ - private static final Logger _logger = Logger.getLogger(AMQProtocolHandler.class); + private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandler.class); /** - * The connection that this protocol handler is associated with. There is a 1-1 mapping between connection instances - * and protocol handler instances. + * The connection that this protocol handler is associated with. There is a 1-1 mapping between connection + * instances and protocol handler instances. */ private AMQConnection _connection; @@ -165,8 +172,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter /** * Invoked by MINA when a MINA session for a new connection is created. This method sets up the filter chain on the - * session, which filters the events handled by this handler. The filter chain consists of, handing off events to an - * asynchronous thread pool, optionally encoding/decoding ssl, encoding/decoding AMQP. + * session, which filters the events handled by this handler. The filter chain consists of, handing off events + * to an asynchronous thread pool, optionally encoding/decoding ssl, encoding/decoding AMQP. * * @param session The MINA session. * @@ -215,8 +222,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter /** * Called when the network connection is closed. This can happen, either because the client explicitly requested - * that the connection be closed, in which case nothing is done, or because the connection died. In the case where - * the connection died, an attempt to failover automatically to a new connection may be started. The failover + * that the connection be closed, in which case nothing is done, or because the connection died. In the case + * where the connection died, an attempt to failover automatically to a new connection may be started. The failover * process will be started, provided that it is the clients policy to allow failover, and provided that a failover * has not already been started or failed. * @@ -227,7 +234,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter * @param session The MINA session. * * @todo Clarify: presumably exceptionCaught is called when the client is sending during a connection failure and - * not otherwise? The above comment doesn't make that clear. + * not otherwise? The above comment doesn't make that clear. */ public void sessionClosed(IoSession session) { @@ -414,10 +421,6 @@ public class AMQProtocolHandler extends IoHandlerAdapter final AMQMethodListener listener = (AMQMethodListener) it.next(); wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; } - if (!wasAnyoneInterested) - { - throw new AMQException(null, "AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + _frameListeners, null); - } } if (!wasAnyoneInterested) diff --git a/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java b/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java index 5bd7b9aea0..47eff4be19 100644 --- a/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java +++ b/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java @@ -1,13 +1,13 @@ package org.apache.qpid.util.concurrent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Synchronous/Asynchronous puts. Asynchronous is easiest, just wait till can write to queue and deposit data.
@@ -44,7 +44,6 @@ public abstract class BatchSynchQueueBase<E> extends AbstractQueue<E> implements {
/** Used for logging. */
private static final Logger log = LoggerFactory.getLogger(BatchSynchQueueBase.class);
-
/** Holds a reference to the queue implementation that holds the buffer. */
Queue<SynchRecordImpl<E>> buffer;
diff --git a/java/common/templates/model/MethodRegistryClass.tmpl b/java/common/templates/model/MethodRegistryClass.tmpl index 82a385f20c..474d9e31d1 100644 --- a/java/common/templates/model/MethodRegistryClass.tmpl +++ b/java/common/templates/model/MethodRegistryClass.tmpl @@ -1,160 +1,159 @@ -&{MainRegistry.java} -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -/* - * This file is auto-generated by ${GENERATOR} - do not modify. - * Supported AMQP versions: -%{VLIST} * ${major}-${minor} - */ - -package org.apache.qpid.framing; - -import java.util.HashMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.mina.common.ByteBuffer; - -public class MainRegistry -{ - private static final HashMap<Long, AMQMethodBodyInstanceFactory> classIDMethodIDVersionBodyMap = new HashMap<Long, AMQMethodBodyInstanceFactory>(); - - - private static final Logger _log = LoggerFactory.getLogger(MainRegistry.class); - - - - private static final int DEFAULT_MINOR_VERSION_COUNT = 10; - private static final int DEFAULT_MAJOR_VERSION_COUNT = 10; - - private static VersionSpecificRegistry[][] _specificRegistries = new VersionSpecificRegistry[DEFAULT_MAJOR_VERSION_COUNT][]; - - static - { -%{CLIST} ${reg_map_put_method} - - configure(); - } - - public static AMQMethodBody get(short classID, short methodID, byte major, byte minor, ByteBuffer in, long size) - throws AMQFrameDecodingException - { - VersionSpecificRegistry registry = getVersionSpecificRegistry(major, minor); - AMQMethodBodyInstanceFactory bodyFactory = registry.getMethodBody(classID,methodID); - - if (bodyFactory == null) - { - throw new AMQFrameDecodingException(null, - "Unable to find a suitable decoder for class " + classID + " and method " + - methodID + " in AMQP version " + major + "-" + minor + ".", null); - } - return bodyFactory.newInstance(major, minor, in, size); - - - } - - - public static VersionSpecificRegistry getVersionSpecificRegistry(ProtocolVersion pv) - { - return getVersionSpecificRegistry(pv.getMajorVersion(), pv.getMinorVersion()); - } - public static VersionSpecificRegistry getVersionSpecificRegistry(byte major, byte minor) - { - try - { - return _specificRegistries[(int)major][(int)minor]; - } - catch (IndexOutOfBoundsException e) - { - return null; - } - catch (NullPointerException e) - { - return null; - } - - - } - - private static VersionSpecificRegistry addVersionSpecificRegistry(byte major, byte minor) - { - VersionSpecificRegistry[][] registries = _specificRegistries; - if(major >= registries.length) - { - _specificRegistries = new VersionSpecificRegistry[(int)major + 1][]; - System.arraycopy(registries, 0, _specificRegistries, 0, registries.length); - registries = _specificRegistries; - } - if(registries[major] == null) - { - registries[major] = new VersionSpecificRegistry[ minor >= DEFAULT_MINOR_VERSION_COUNT ? minor + 1 : DEFAULT_MINOR_VERSION_COUNT ]; - } - else if(registries[major].length <= minor) - { - VersionSpecificRegistry[] minorArray = registries[major]; - registries[major] = new VersionSpecificRegistry[ minor + 1 ]; - System.arraycopy(minorArray, 0, registries[major], 0, minorArray.length); - - } - - VersionSpecificRegistry newRegistry = new VersionSpecificRegistry(major,minor); - - registries[major][minor] = newRegistry; - - return newRegistry; - } - - private static void registerMethod(short classID, short methodID, byte major, byte minor, AMQMethodBodyInstanceFactory instanceFactory ) - { - VersionSpecificRegistry registry = getVersionSpecificRegistry(major,minor); - if(registry == null) - { - registry = addVersionSpecificRegistry(major,minor); - - } - - registry.registerMethod(classID, methodID, instanceFactory); - - } - - - private static void configure() - { - for(int i = 0 ; i < _specificRegistries.length; i++) - { - VersionSpecificRegistry[] registries = _specificRegistries[i]; - if(registries != null) - { - for(int j = 0 ; j < registries.length; j++) - { - VersionSpecificRegistry registry = registries[j]; - - if(registry != null) - { - registry.configure(); - } - } - } - } - - } - -} +&{MainRegistry.java}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file is auto-generated by ${GENERATOR} - do not modify.
+ * Supported AMQP versions:
+%{VLIST} * ${major}-${minor}
+ */
+
+package org.apache.qpid.framing;
+
+import java.util.HashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.mina.common.ByteBuffer;
+
+public class MainRegistry
+{
+ private static final HashMap<Long, AMQMethodBodyInstanceFactory> classIDMethodIDVersionBodyMap = new HashMap<Long, AMQMethodBodyInstanceFactory>();
+
+
+ private static final Logger _log = LoggerFactory.getLogger(MainRegistry.class);
+
+
+ private static final int DEFAULT_MINOR_VERSION_COUNT = 10;
+ private static final int DEFAULT_MAJOR_VERSION_COUNT = 10;
+
+ private static VersionSpecificRegistry[][] _specificRegistries = new VersionSpecificRegistry[DEFAULT_MAJOR_VERSION_COUNT][];
+
+ static
+ {
+%{CLIST} ${reg_map_put_method}
+
+ configure();
+ }
+
+ public static AMQMethodBody get(short classID, short methodID, byte major, byte minor, ByteBuffer in, long size)
+ throws AMQFrameDecodingException
+ {
+ VersionSpecificRegistry registry = getVersionSpecificRegistry(major, minor);
+ AMQMethodBodyInstanceFactory bodyFactory = registry.getMethodBody(classID,methodID);
+
+ if (bodyFactory == null)
+ {
+ throw new AMQFrameDecodingException(null,
+ "Unable to find a suitable decoder for class " + classID + " and method " +
+ methodID + " in AMQP version " + major + "-" + minor + ".", null);
+ }
+ return bodyFactory.newInstance(major, minor, in, size);
+
+
+ }
+
+
+ public static VersionSpecificRegistry getVersionSpecificRegistry(ProtocolVersion pv)
+ {
+ return getVersionSpecificRegistry(pv.getMajorVersion(), pv.getMinorVersion());
+ }
+ public static VersionSpecificRegistry getVersionSpecificRegistry(byte major, byte minor)
+ {
+ try
+ {
+ return _specificRegistries[(int)major][(int)minor];
+ }
+ catch (IndexOutOfBoundsException e)
+ {
+ return null;
+ }
+ catch (NullPointerException e)
+ {
+ return null;
+ }
+
+
+ }
+
+ private static VersionSpecificRegistry addVersionSpecificRegistry(byte major, byte minor)
+ {
+ VersionSpecificRegistry[][] registries = _specificRegistries;
+ if(major >= registries.length)
+ {
+ _specificRegistries = new VersionSpecificRegistry[(int)major + 1][];
+ System.arraycopy(registries, 0, _specificRegistries, 0, registries.length);
+ registries = _specificRegistries;
+ }
+ if(registries[major] == null)
+ {
+ registries[major] = new VersionSpecificRegistry[ minor >= DEFAULT_MINOR_VERSION_COUNT ? minor + 1 : DEFAULT_MINOR_VERSION_COUNT ];
+ }
+ else if(registries[major].length <= minor)
+ {
+ VersionSpecificRegistry[] minorArray = registries[major];
+ registries[major] = new VersionSpecificRegistry[ minor + 1 ];
+ System.arraycopy(minorArray, 0, registries[major], 0, minorArray.length);
+
+ }
+
+ VersionSpecificRegistry newRegistry = new VersionSpecificRegistry(major,minor);
+
+ registries[major][minor] = newRegistry;
+
+ return newRegistry;
+ }
+
+ private static void registerMethod(short classID, short methodID, byte major, byte minor, AMQMethodBodyInstanceFactory instanceFactory )
+ {
+ VersionSpecificRegistry registry = getVersionSpecificRegistry(major,minor);
+ if(registry == null)
+ {
+ registry = addVersionSpecificRegistry(major,minor);
+
+ }
+
+ registry.registerMethod(classID, methodID, instanceFactory);
+
+ }
+
+
+ private static void configure()
+ {
+ for(int i = 0 ; i < _specificRegistries.length; i++)
+ {
+ VersionSpecificRegistry[] registries = _specificRegistries[i];
+ if(registries != null)
+ {
+ for(int j = 0 ; j < registries.length; j++)
+ {
+ VersionSpecificRegistry registry = registries[j];
+
+ if(registry != null)
+ {
+ registry.configure();
+ }
+ }
+ }
+ }
+
+ }
+
+}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java b/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java index f89dc73bc6..012a983be5 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java +++ b/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java @@ -82,7 +82,7 @@ public class HeapExhaustion extends TestCase * * @throws Exception on error */ - public void testUntilFailure() throws Exception + public void testUntilFailureTransient() throws Exception { int copies = 0; int total = 0; @@ -102,7 +102,7 @@ public class HeapExhaustion extends TestCase * * @throws Exception on error */ - public void testUntilFailureWithDelays() throws Exception + public void testUntilFailureWithDelaysTransient() throws Exception { int copies = 0; int total = 0; @@ -137,7 +137,7 @@ public class HeapExhaustion extends TestCase _logger.info("Running testUntilFailure"); try { - he.testUntilFailure(); + he.testUntilFailureTransient(); } catch (FailoverException fe) { @@ -186,7 +186,7 @@ public class HeapExhaustion extends TestCase _logger.info("Running testUntilFailure"); try { - he.testUntilFailureWithDelays(); + he.testUntilFailureWithDelaysTransient(); } catch (FailoverException fe) { |
