diff options
| author | Robert Gemmell <robbie@apache.org> | 2011-02-04 08:14:00 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2011-02-04 08:14:00 +0000 |
| commit | e7f02a8b8b25d9fcce6525ccc5b794f8438995f0 (patch) | |
| tree | 20179efb250c6351d7012b29fa8104558b83780f /java/common/src/main | |
| parent | cf47f99d276a50ac32ed9835a9afb818fd90f4ba (diff) | |
| download | qpid-python-e7f02a8b8b25d9fcce6525ccc5b794f8438995f0.tar.gz | |
QPID-1670: Implement an UncaughtExceptionHandler to log exceptions causing the permature termination of Qpid client threads.
Applied patch from Keith Wall <keith.wall@gmail.com>
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1067108 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src/main')
4 files changed, 123 insertions, 21 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java index 20a30b3ed3..8152a1f5e9 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java +++ b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java @@ -22,10 +22,12 @@ package org.apache.qpid.pool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.LinkedBlockingQueue; + /** * ReferenceCountingExecutorService wraps an ExecutorService in order to provide shared reference to it. It counts * the references taken, instantiating the service on the first reference, and shutting it down when the last @@ -36,7 +38,7 @@ import java.util.concurrent.LinkedBlockingQueue; * * <p/><table id="crc><caption>CRC Card</caption> * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Provide a shared exector service. <td> {@link Executors} + * <tr><td> Provide a shared executor service. <td> {@link Executors} * <tr><td> Shutdown the executor service when not needed. <td> {@link ExecutorService} * <tr><td> Track references to the executor service. * <tr><td> Provide configuration of the executor service. @@ -53,13 +55,15 @@ import java.util.concurrent.LinkedBlockingQueue; * @todo {@link #_poolSize} should be static? * * @todo The {@link #getPool()} method breaks the encapsulation of the reference counter. Generally when getPool is used - * further checks are applied to ensure that the exector service has not been shutdown. This passes responsibility + * further checks are applied to ensure that the executor service has not been shutdown. This passes responsibility * for managing the lifecycle of the reference counted object onto the caller rather than neatly encapsulating it * here. Could think about adding more state to the lifecycle, to mark ref counted objects as invalid, and have an * isValid method, or could make calling code deal with RejectedExecutionException raised by shutdown executors. */ public class ReferenceCountingExecutorService { + + /** Defines the smallest thread pool that will be allocated, irrespective of the number of processors. */ private static final int MINIMUM_POOL_SIZE = 4; @@ -87,6 +91,11 @@ public class ReferenceCountingExecutorService /** Holds the number of executor threads to create. */ private int _poolSize = Integer.getInteger("amqj.read_write_pool_size", DEFAULT_POOL_SIZE); + /** Thread Factory used to create thread of the pool. Uses the default implementation provided by + * {@link java.util.concurrent.Executors#defaultThreadFactory()} unless reset by the caller. + */ + private ThreadFactory _threadFactory = Executors.defaultThreadFactory(); + private final boolean _useBiasedPool = Boolean.getBoolean("org.apache.qpid.use_write_biased_pool"); /** @@ -116,19 +125,23 @@ public class ReferenceCountingExecutorService { if (_refCount++ == 0) { -// _pool = Executors.newFixedThreadPool(_poolSize); - // Use a job queue that biases to writes if(_useBiasedPool) { _pool = new ThreadPoolExecutor(_poolSize, _poolSize, 0L, TimeUnit.MILLISECONDS, - new ReadWriteJobQueue()); + new ReadWriteJobQueue(), + _threadFactory); + } else { - _pool = Executors.newFixedThreadPool(_poolSize); + _pool = new ThreadPoolExecutor(_poolSize, _poolSize, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>(), + _threadFactory); } + } @@ -137,7 +150,7 @@ public class ReferenceCountingExecutorService } /** - * Releases a reference to a shared executor service, decrementing the reference count. If the refence count falls + * Releases a reference to a shared executor service, decrementing the reference count. If the reference count falls * to zero, the executor service is shut down. */ public void releaseExecutorService() @@ -169,4 +182,34 @@ public class ReferenceCountingExecutorService { return _refCount; } + + /** + * + * Return the thread factory used by the {@link ThreadPoolExecutor} to create new threads. + * + * @return thread factory + */ + public ThreadFactory getThreadFactory() + { + return _threadFactory; + } + + /** + * Sets the thread factory used by the {@link ThreadPoolExecutor} to create new threads. + * <p> + * If the pool has been already created, the change will have no effect until + * {@link #getReferenceCount()} reaches zero and the pool recreated. For this reason, + * callers must invoke this method <i>before</i> calling {@link #acquireExecutorService()}. + * + * @param threadFactory thread factory + */ + public void setThreadFactory(final ThreadFactory threadFactory) + { + if (threadFactory == null) + { + throw new NullPointerException("threadFactory cannot be null"); + } + _threadFactory = threadFactory; + } + } diff --git a/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java b/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java index 9786d8fc3f..a96dac4109 100644 --- a/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java +++ b/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java @@ -1,4 +1,3 @@ -package org.apache.qpid.thread; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -20,29 +19,25 @@ package org.apache.qpid.thread; * */ +package org.apache.qpid.thread; -public class DefaultThreadFactory implements ThreadFactory -{ - - private static class QpidThread extends Thread - { - private QpidThread(final Runnable target) - { - super(target); - } - } +public class DefaultThreadFactory implements ThreadFactory +{ + private final LoggingUncaughtExceptionHandler _loggingUncaughtExceptionHandler = new LoggingUncaughtExceptionHandler(); public Thread createThread(Runnable r) { - return new Thread(r); + Thread t = new Thread(r); + t.setUncaughtExceptionHandler(_loggingUncaughtExceptionHandler); + return t; } public Thread createThread(Runnable r, int priority) { - Thread t = new Thread(r); + Thread t = createThread(r); t.setPriority(priority); return t; } diff --git a/java/common/src/main/java/org/apache/qpid/thread/LoggingUncaughtExceptionHandler.java b/java/common/src/main/java/org/apache/qpid/thread/LoggingUncaughtExceptionHandler.java new file mode 100644 index 0000000000..192675edcd --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/thread/LoggingUncaughtExceptionHandler.java @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.thread; + +import java.lang.Thread.UncaughtExceptionHandler; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * An {@link UncaughtExceptionHandler} that writes the exception to the application log via + * the SLF4J framework. Once registered with {@link Thread#setUncaughtExceptionHandler(UncaughtExceptionHandler)} + * it will be invoked by the JVM when a thread has been <i>abruptly</i> terminated due to an uncaught exception. + * Owing to the contract of {@link Runnable#run()}, the only possible exception types which can cause such a termination + * are instances of {@link RuntimeException} and {@link Error}. These exceptions are catastrophic and the client must + * restart the JVM. + * <p> + * The implementation also invokes {@link ThreadGroup#uncaughtException(Thread, Throwable)}. This + * is done to retain compatibility with any monitoring solutions (for example, log scraping of + * standard error) that existing users of older Qpid client libraries may have in place. + * + */ +public class LoggingUncaughtExceptionHandler implements UncaughtExceptionHandler +{ + private static final Logger _logger = LoggerFactory.getLogger(LoggingUncaughtExceptionHandler.class); + + @Override + public void uncaughtException(Thread t, Throwable e) + { + try + { + _logger.error("Uncaught exception in thread \"{}\"", t.getName(), e); + } + finally + { + // Invoke the thread group's handler too for compatibility with any + // existing clients who are already scraping stderr for such conditions. + t.getThreadGroup().uncaughtException(t, e); + } + } +}
\ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java b/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java index 0507b3108f..95a8d192c5 100644 --- a/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java +++ b/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java @@ -25,6 +25,8 @@ import java.lang.reflect.Constructor; public class RealtimeThreadFactory implements ThreadFactory { + private final LoggingUncaughtExceptionHandler _loggingUncaughtExceptionHandler = new LoggingUncaughtExceptionHandler(); + private Class threadClass; private Constructor threadConstructor; private Constructor priorityParameterConstructor; @@ -62,7 +64,9 @@ public class RealtimeThreadFactory implements ThreadFactory public Thread createThread(Runnable r, int priority) throws Exception { Object priorityParams = priorityParameterConstructor.newInstance(priority); - return (Thread)threadConstructor.newInstance(priorityParams,null,null,null,null,r); + Thread thread = (Thread)threadConstructor.newInstance(priorityParams,null,null,null,null,r); + thread.setUncaughtExceptionHandler(_loggingUncaughtExceptionHandler); + return thread; } } |
