diff options
| author | Robert Gemmell <robbie@apache.org> | 2011-02-04 08:14:00 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2011-02-04 08:14:00 +0000 |
| commit | e7f02a8b8b25d9fcce6525ccc5b794f8438995f0 (patch) | |
| tree | 20179efb250c6351d7012b29fa8104558b83780f /java/client/src | |
| parent | cf47f99d276a50ac32ed9835a9afb818fd90f4ba (diff) | |
| download | qpid-python-e7f02a8b8b25d9fcce6525ccc5b794f8438995f0.tar.gz | |
QPID-1670: Implement an UncaughtExceptionHandler to log exceptions causing the permature termination of Qpid client threads.
Applied patch from Keith Wall <keith.wall@gmail.com>
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1067108 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 37 | ||||
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java | 39 |
2 files changed, 57 insertions, 19 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index b5c41e483c..1f940b62f0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -119,7 +119,6 @@ import org.slf4j.LoggerFactory; */ public abstract class AMQSession<C extends BasicMessageConsumer, P extends BasicMessageProducer> extends Closeable implements Session, QueueSession, TopicSession { - public static final class IdToConsumerMap<C extends BasicMessageConsumer> { private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; @@ -363,7 +362,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /** * Set when recover is called. This is to handle the case where recover() is called by application code during - * onMessage() processing to enure that an auto ack is not sent. + * onMessage() processing to ensure that an auto ack is not sent. */ private boolean _inRecovery; @@ -383,7 +382,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic private final Object _suspensionLock = new Object(); /** - * Used to ensure that onlt the first call to start the dispatcher can unsuspend the channel. + * Used to ensure that only the first call to start the dispatcher can unsuspend the channel. * * @todo This is accessed only within a synchronized method, so does not need to be atomic. */ @@ -429,7 +428,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * @param con The connection on which to create the session. * @param channelId The unique identifier for the session. * @param transacted Indicates whether or not the session is transactional. - * @param acknowledgeMode The acknoledgement mode for the session. + * @param acknowledgeMode The acknowledgement mode for the session. * @param messageFactoryRegistry The message factory factory for the session. * @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session. * @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session. @@ -475,7 +474,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // flow control if (!(_thisSession.isClosed() || _thisSession.isClosing())) { - // Only executute change if previous state + // Only execute change if previous state // was False if (!_suspendState.getAndSet(true)) { @@ -485,7 +484,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic "Above threshold(" + _prefetchHighMark + ") so suspending channel. Current value is " + currentValue); } - new Thread(new SuspenderRunner(_suspendState)).start(); + try + { + Threading.getThreadFactory().createThread(new SuspenderRunner(_suspendState)).start(); + } + catch (Exception e) + { + throw new RuntimeException("Failed to create thread", e); + } } } } @@ -496,7 +502,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // flow control if (!(_thisSession.isClosed() || _thisSession.isClosing())) { - // Only executute change if previous state + // Only execute change if previous state // was true if (_suspendState.getAndSet(false)) { @@ -507,7 +513,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic "Below threshold(" + _prefetchLowMark + ") so unsuspending channel. Current value is " + currentValue); } - new Thread(new SuspenderRunner(_suspendState)).start(); + try + { + Threading.getThreadFactory().createThread(new SuspenderRunner(_suspendState)).start(); + } + catch (Exception e) + { + throw new RuntimeException("Failed to create thread", e); + } } } } @@ -531,7 +544,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * @param con The connection on which to create the session. * @param channelId The unique identifier for the session. * @param transacted Indicates whether or not the session is transactional. - * @param acknowledgeMode The acknoledgement mode for the session. + * @param acknowledgeMode The acknowledgement mode for the session. * @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session. * @param defaultPrefetchLow The number of prefetched messages at which to resume the session. */ @@ -562,7 +575,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } catch (IllegalStateException ise) { - // if the Connection has closed then we should throw any exception that has occured that we were not waiting for + // if the Connection has closed then we should throw any exception that has occurred that we were not waiting for AMQStateManager manager = _connection.getProtocolHandler().getStateManager(); if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED) && manager.getLastException() != null) @@ -677,11 +690,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /** * Closes the session. * - * <p/>Note that this operation succeeds automatically if a fail-over interupts the sycnronous request to close + * <p/>Note that this operation succeeds automatically if a fail-over interrupts the synchronous request to close * the channel. This is because the channel is marked as closed before the request to close it is made, so the * fail-over should not re-open it. * - * @param timeout The timeout in milliseconds to wait for the session close acknoledgement from the broker. + * @param timeout The timeout in milliseconds to wait for the session close acknowledgement from the broker. * * @throws JMSException If the JMS provider fails to close the session due to some internal error. * @todo Be aware of possible changes to parameter order as versions change. diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index c16941b341..eb5af119b2 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -28,6 +28,7 @@ import java.util.Iterator; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import org.apache.mina.filter.codec.ProtocolCodecException; @@ -63,6 +64,7 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.NetworkDriver; import org.apache.qpid.transport.network.io.IoTransport; import org.slf4j.Logger; @@ -100,7 +102,7 @@ import org.slf4j.LoggerFactory; * connection is shutdown and a new one created. For this reason, an AMQProtocolHandler is created per AMQConnection * and the protocol session data is held outside of the MINA IOSession. * - * <p/>This handler is responsibile for setting up the filter chain to filter all events for this handler through. + * <p/>This handler is responsible for setting up the filter chain to filter all events for this handler through. * The filter chain is set up as a stack of event handers that perform the following functions (working upwards from * the network traffic at the bottom), handing off incoming events to an asynchronous thread pool to do the work, * optionally handling secure sockets encoding/decoding, encoding/decoding the AMQP format itself. @@ -114,8 +116,8 @@ import org.slf4j.LoggerFactory; * @todo Use a single handler instance, by shifting everything to do with the 'protocol session' state, including * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of * AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could - * be merged, although there is sense in keeping the session model seperate. Will clarify things by having data - * held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so + * be merged, although there is sense in keeping the session model separate. Will clarify things by having data + * held per protocol handler, per protocol session, per network connection, per channel, in separate classes, so * that lifecycles of the fields match lifecycles of their containing objects. */ public class AMQProtocolHandler implements ProtocolEngine @@ -158,7 +160,7 @@ public class AMQProtocolHandler implements ProtocolEngine /** Used to provide a condition to wait upon for operations that are required to wait for failover to complete. */ private CountDownLatch _failoverLatch; - /** The last failover exception that occured */ + /** The last failover exception that occurred */ private FailoverException _lastFailoverException; /** Defines the default timeout to use for synchronous protocol commands. */ @@ -187,6 +189,21 @@ public class AMQProtocolHandler implements ProtocolEngine _protocolSession = new AMQProtocolSession(this, _connection); _stateManager = new AMQStateManager(_protocolSession); _codecFactory = new AMQCodecFactory(false, _protocolSession); + _poolReference.setThreadFactory(new ThreadFactory() + { + + public Thread newThread(final Runnable runnable) + { + try + { + return Threading.getThreadFactory().createThread(runnable); + } + catch (Exception e) + { + throw new RuntimeException("Failed to create thread", e); + } + } + }); _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true); _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false); _poolReference.acquireExecutorService(); @@ -275,7 +292,15 @@ public class AMQProtocolHandler implements ProtocolEngine { if(!_connection.isClosed()) { - Thread failoverThread = new Thread(_failoverHandler); + final Thread failoverThread; + try + { + failoverThread = Threading.getThreadFactory().createThread(_failoverHandler); + } + catch (Exception e) + { + throw new RuntimeException("Failed to create thread", e); + } failoverThread.setName("Failover"); // Do not inherit daemon-ness from current thread as this can be a daemon // thread such as a AnonymousIoService thread. @@ -369,7 +394,7 @@ public class AMQProtocolHandler implements ProtocolEngine } /** - * This caters for the case where we only need to propogate an exception to the the frame listeners to interupt any + * This caters for the case where we only need to propagate an exception to the the frame listeners to interupt any * protocol level waits. * * This will would normally be used to notify all Frame Listeners that Failover is about to occur and they should @@ -407,7 +432,7 @@ public class AMQProtocolHandler implements ProtocolEngine } //Only notify the Frame listeners that failover is going to occur as the State listeners shouldn't be - // interupted unless failover cannot restore the state. + // interrupted unless failover cannot restore the state. propagateExceptionToFrameListeners(_lastFailoverException); } |
