summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2011-02-04 08:14:00 +0000
committerRobert Gemmell <robbie@apache.org>2011-02-04 08:14:00 +0000
commite7f02a8b8b25d9fcce6525ccc5b794f8438995f0 (patch)
tree20179efb250c6351d7012b29fa8104558b83780f /java/client/src
parentcf47f99d276a50ac32ed9835a9afb818fd90f4ba (diff)
downloadqpid-python-e7f02a8b8b25d9fcce6525ccc5b794f8438995f0.tar.gz
QPID-1670: Implement an UncaughtExceptionHandler to log exceptions causing the permature termination of Qpid client threads.
Applied patch from Keith Wall <keith.wall@gmail.com> git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1067108 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java37
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java39
2 files changed, 57 insertions, 19 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index b5c41e483c..1f940b62f0 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -119,7 +119,6 @@ import org.slf4j.LoggerFactory;
*/
public abstract class AMQSession<C extends BasicMessageConsumer, P extends BasicMessageProducer> extends Closeable implements Session, QueueSession, TopicSession
{
-
public static final class IdToConsumerMap<C extends BasicMessageConsumer>
{
private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
@@ -363,7 +362,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
/**
* Set when recover is called. This is to handle the case where recover() is called by application code during
- * onMessage() processing to enure that an auto ack is not sent.
+ * onMessage() processing to ensure that an auto ack is not sent.
*/
private boolean _inRecovery;
@@ -383,7 +382,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
private final Object _suspensionLock = new Object();
/**
- * Used to ensure that onlt the first call to start the dispatcher can unsuspend the channel.
+ * Used to ensure that only the first call to start the dispatcher can unsuspend the channel.
*
* @todo This is accessed only within a synchronized method, so does not need to be atomic.
*/
@@ -429,7 +428,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
* @param con The connection on which to create the session.
* @param channelId The unique identifier for the session.
* @param transacted Indicates whether or not the session is transactional.
- * @param acknowledgeMode The acknoledgement mode for the session.
+ * @param acknowledgeMode The acknowledgement mode for the session.
* @param messageFactoryRegistry The message factory factory for the session.
* @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session.
* @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session.
@@ -475,7 +474,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// flow control
if (!(_thisSession.isClosed() || _thisSession.isClosing()))
{
- // Only executute change if previous state
+ // Only execute change if previous state
// was False
if (!_suspendState.getAndSet(true))
{
@@ -485,7 +484,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
"Above threshold(" + _prefetchHighMark
+ ") so suspending channel. Current value is " + currentValue);
}
- new Thread(new SuspenderRunner(_suspendState)).start();
+ try
+ {
+ Threading.getThreadFactory().createThread(new SuspenderRunner(_suspendState)).start();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Failed to create thread", e);
+ }
}
}
}
@@ -496,7 +502,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// flow control
if (!(_thisSession.isClosed() || _thisSession.isClosing()))
{
- // Only executute change if previous state
+ // Only execute change if previous state
// was true
if (_suspendState.getAndSet(false))
{
@@ -507,7 +513,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
"Below threshold(" + _prefetchLowMark
+ ") so unsuspending channel. Current value is " + currentValue);
}
- new Thread(new SuspenderRunner(_suspendState)).start();
+ try
+ {
+ Threading.getThreadFactory().createThread(new SuspenderRunner(_suspendState)).start();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Failed to create thread", e);
+ }
}
}
}
@@ -531,7 +544,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
* @param con The connection on which to create the session.
* @param channelId The unique identifier for the session.
* @param transacted Indicates whether or not the session is transactional.
- * @param acknowledgeMode The acknoledgement mode for the session.
+ * @param acknowledgeMode The acknowledgement mode for the session.
* @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session.
* @param defaultPrefetchLow The number of prefetched messages at which to resume the session.
*/
@@ -562,7 +575,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
catch (IllegalStateException ise)
{
- // if the Connection has closed then we should throw any exception that has occured that we were not waiting for
+ // if the Connection has closed then we should throw any exception that has occurred that we were not waiting for
AMQStateManager manager = _connection.getProtocolHandler().getStateManager();
if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED) && manager.getLastException() != null)
@@ -677,11 +690,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
/**
* Closes the session.
*
- * <p/>Note that this operation succeeds automatically if a fail-over interupts the sycnronous request to close
+ * <p/>Note that this operation succeeds automatically if a fail-over interrupts the synchronous request to close
* the channel. This is because the channel is marked as closed before the request to close it is made, so the
* fail-over should not re-open it.
*
- * @param timeout The timeout in milliseconds to wait for the session close acknoledgement from the broker.
+ * @param timeout The timeout in milliseconds to wait for the session close acknowledgement from the broker.
*
* @throws JMSException If the JMS provider fails to close the session due to some internal error.
* @todo Be aware of possible changes to parameter order as versions change.
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index c16941b341..eb5af119b2 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -28,6 +28,7 @@ import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.mina.filter.codec.ProtocolCodecException;
@@ -63,6 +64,7 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.NetworkDriver;
import org.apache.qpid.transport.network.io.IoTransport;
import org.slf4j.Logger;
@@ -100,7 +102,7 @@ import org.slf4j.LoggerFactory;
* connection is shutdown and a new one created. For this reason, an AMQProtocolHandler is created per AMQConnection
* and the protocol session data is held outside of the MINA IOSession.
*
- * <p/>This handler is responsibile for setting up the filter chain to filter all events for this handler through.
+ * <p/>This handler is responsible for setting up the filter chain to filter all events for this handler through.
* The filter chain is set up as a stack of event handers that perform the following functions (working upwards from
* the network traffic at the bottom), handing off incoming events to an asynchronous thread pool to do the work,
* optionally handling secure sockets encoding/decoding, encoding/decoding the AMQP format itself.
@@ -114,8 +116,8 @@ import org.slf4j.LoggerFactory;
* @todo Use a single handler instance, by shifting everything to do with the 'protocol session' state, including
* failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of
* AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could
- * be merged, although there is sense in keeping the session model seperate. Will clarify things by having data
- * held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so
+ * be merged, although there is sense in keeping the session model separate. Will clarify things by having data
+ * held per protocol handler, per protocol session, per network connection, per channel, in separate classes, so
* that lifecycles of the fields match lifecycles of their containing objects.
*/
public class AMQProtocolHandler implements ProtocolEngine
@@ -158,7 +160,7 @@ public class AMQProtocolHandler implements ProtocolEngine
/** Used to provide a condition to wait upon for operations that are required to wait for failover to complete. */
private CountDownLatch _failoverLatch;
- /** The last failover exception that occured */
+ /** The last failover exception that occurred */
private FailoverException _lastFailoverException;
/** Defines the default timeout to use for synchronous protocol commands. */
@@ -187,6 +189,21 @@ public class AMQProtocolHandler implements ProtocolEngine
_protocolSession = new AMQProtocolSession(this, _connection);
_stateManager = new AMQStateManager(_protocolSession);
_codecFactory = new AMQCodecFactory(false, _protocolSession);
+ _poolReference.setThreadFactory(new ThreadFactory()
+ {
+
+ public Thread newThread(final Runnable runnable)
+ {
+ try
+ {
+ return Threading.getThreadFactory().createThread(runnable);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Failed to create thread", e);
+ }
+ }
+ });
_readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true);
_writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false);
_poolReference.acquireExecutorService();
@@ -275,7 +292,15 @@ public class AMQProtocolHandler implements ProtocolEngine
{
if(!_connection.isClosed())
{
- Thread failoverThread = new Thread(_failoverHandler);
+ final Thread failoverThread;
+ try
+ {
+ failoverThread = Threading.getThreadFactory().createThread(_failoverHandler);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Failed to create thread", e);
+ }
failoverThread.setName("Failover");
// Do not inherit daemon-ness from current thread as this can be a daemon
// thread such as a AnonymousIoService thread.
@@ -369,7 +394,7 @@ public class AMQProtocolHandler implements ProtocolEngine
}
/**
- * This caters for the case where we only need to propogate an exception to the the frame listeners to interupt any
+ * This caters for the case where we only need to propagate an exception to the the frame listeners to interupt any
* protocol level waits.
*
* This will would normally be used to notify all Frame Listeners that Failover is about to occur and they should
@@ -407,7 +432,7 @@ public class AMQProtocolHandler implements ProtocolEngine
}
//Only notify the Frame listeners that failover is going to occur as the State listeners shouldn't be
- // interupted unless failover cannot restore the state.
+ // interrupted unless failover cannot restore the state.
propagateExceptionToFrameListeners(_lastFailoverException);
}