From e7f02a8b8b25d9fcce6525ccc5b794f8438995f0 Mon Sep 17 00:00:00 2001 From: Robert Gemmell Date: Fri, 4 Feb 2011 08:14:00 +0000 Subject: QPID-1670: Implement an UncaughtExceptionHandler to log exceptions causing the permature termination of Qpid client threads. Applied patch from Keith Wall git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1067108 13f79535-47bb-0310-9956-ffa450edef68 --- .../pool/ReferenceCountingExecutorService.java | 57 +++++++- .../apache/qpid/thread/DefaultThreadFactory.java | 21 ++- .../thread/LoggingUncaughtExceptionHandler.java | 60 ++++++++ .../apache/qpid/thread/RealtimeThreadFactory.java | 6 +- .../pool/ReferenceCountingExecutorServiceTest.java | 159 +++++++++++++++++++++ .../org/apache/qpid/thread/ThreadFactoryTest.java | 63 ++++++-- 6 files changed, 332 insertions(+), 34 deletions(-) create mode 100644 java/common/src/main/java/org/apache/qpid/thread/LoggingUncaughtExceptionHandler.java create mode 100644 java/common/src/test/java/org/apache/qpid/pool/ReferenceCountingExecutorServiceTest.java (limited to 'java/common/src') diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java index 20a30b3ed3..8152a1f5e9 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java +++ b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java @@ -22,10 +22,12 @@ package org.apache.qpid.pool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.LinkedBlockingQueue; + /** * ReferenceCountingExecutorService wraps an ExecutorService in order to provide shared reference to it. It counts * the references taken, instantiating the service on the first reference, and shutting it down when the last @@ -36,7 +38,7 @@ import java.util.concurrent.LinkedBlockingQueue; * *

(), + _threadFactory); } + } @@ -137,7 +150,7 @@ public class ReferenceCountingExecutorService } /** - * Releases a reference to a shared executor service, decrementing the reference count. If the refence count falls + * Releases a reference to a shared executor service, decrementing the reference count. If the reference count falls * to zero, the executor service is shut down. */ public void releaseExecutorService() @@ -169,4 +182,34 @@ public class ReferenceCountingExecutorService { return _refCount; } + + /** + * + * Return the thread factory used by the {@link ThreadPoolExecutor} to create new threads. + * + * @return thread factory + */ + public ThreadFactory getThreadFactory() + { + return _threadFactory; + } + + /** + * Sets the thread factory used by the {@link ThreadPoolExecutor} to create new threads. + *

+ * If the pool has been already created, the change will have no effect until + * {@link #getReferenceCount()} reaches zero and the pool recreated. For this reason, + * callers must invoke this method before calling {@link #acquireExecutorService()}. + * + * @param threadFactory thread factory + */ + public void setThreadFactory(final ThreadFactory threadFactory) + { + if (threadFactory == null) + { + throw new NullPointerException("threadFactory cannot be null"); + } + _threadFactory = threadFactory; + } + } diff --git a/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java b/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java index 9786d8fc3f..a96dac4109 100644 --- a/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java +++ b/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java @@ -1,4 +1,3 @@ -package org.apache.qpid.thread; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -20,29 +19,25 @@ package org.apache.qpid.thread; * */ +package org.apache.qpid.thread; -public class DefaultThreadFactory implements ThreadFactory -{ - - private static class QpidThread extends Thread - { - private QpidThread(final Runnable target) - { - super(target); - } - } +public class DefaultThreadFactory implements ThreadFactory +{ + private final LoggingUncaughtExceptionHandler _loggingUncaughtExceptionHandler = new LoggingUncaughtExceptionHandler(); public Thread createThread(Runnable r) { - return new Thread(r); + Thread t = new Thread(r); + t.setUncaughtExceptionHandler(_loggingUncaughtExceptionHandler); + return t; } public Thread createThread(Runnable r, int priority) { - Thread t = new Thread(r); + Thread t = createThread(r); t.setPriority(priority); return t; } diff --git a/java/common/src/main/java/org/apache/qpid/thread/LoggingUncaughtExceptionHandler.java b/java/common/src/main/java/org/apache/qpid/thread/LoggingUncaughtExceptionHandler.java new file mode 100644 index 0000000000..192675edcd --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/thread/LoggingUncaughtExceptionHandler.java @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.thread; + +import java.lang.Thread.UncaughtExceptionHandler; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * An {@link UncaughtExceptionHandler} that writes the exception to the application log via + * the SLF4J framework. Once registered with {@link Thread#setUncaughtExceptionHandler(UncaughtExceptionHandler)} + * it will be invoked by the JVM when a thread has been abruptly terminated due to an uncaught exception. + * Owing to the contract of {@link Runnable#run()}, the only possible exception types which can cause such a termination + * are instances of {@link RuntimeException} and {@link Error}. These exceptions are catastrophic and the client must + * restart the JVM. + *

+ * The implementation also invokes {@link ThreadGroup#uncaughtException(Thread, Throwable)}. This + * is done to retain compatibility with any monitoring solutions (for example, log scraping of + * standard error) that existing users of older Qpid client libraries may have in place. + * + */ +public class LoggingUncaughtExceptionHandler implements UncaughtExceptionHandler +{ + private static final Logger _logger = LoggerFactory.getLogger(LoggingUncaughtExceptionHandler.class); + + @Override + public void uncaughtException(Thread t, Throwable e) + { + try + { + _logger.error("Uncaught exception in thread \"{}\"", t.getName(), e); + } + finally + { + // Invoke the thread group's handler too for compatibility with any + // existing clients who are already scraping stderr for such conditions. + t.getThreadGroup().uncaughtException(t, e); + } + } +} \ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java b/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java index 0507b3108f..95a8d192c5 100644 --- a/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java +++ b/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java @@ -25,6 +25,8 @@ import java.lang.reflect.Constructor; public class RealtimeThreadFactory implements ThreadFactory { + private final LoggingUncaughtExceptionHandler _loggingUncaughtExceptionHandler = new LoggingUncaughtExceptionHandler(); + private Class threadClass; private Constructor threadConstructor; private Constructor priorityParameterConstructor; @@ -62,7 +64,9 @@ public class RealtimeThreadFactory implements ThreadFactory public Thread createThread(Runnable r, int priority) throws Exception { Object priorityParams = priorityParameterConstructor.newInstance(priority); - return (Thread)threadConstructor.newInstance(priorityParams,null,null,null,null,r); + Thread thread = (Thread)threadConstructor.newInstance(priorityParams,null,null,null,null,r); + thread.setUncaughtExceptionHandler(_loggingUncaughtExceptionHandler); + return thread; } } diff --git a/java/common/src/test/java/org/apache/qpid/pool/ReferenceCountingExecutorServiceTest.java b/java/common/src/test/java/org/apache/qpid/pool/ReferenceCountingExecutorServiceTest.java new file mode 100644 index 0000000000..35998de3a1 --- /dev/null +++ b/java/common/src/test/java/org/apache/qpid/pool/ReferenceCountingExecutorServiceTest.java @@ -0,0 +1,159 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.pool; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import junit.framework.TestCase; + + +public class ReferenceCountingExecutorServiceTest extends TestCase +{ + + + private ReferenceCountingExecutorService _executorService = ReferenceCountingExecutorService.getInstance(); // Class under test + private ThreadFactory _beforeExecutorThreadFactory; + + + @Override + protected void setUp() throws Exception + { + super.setUp(); + _beforeExecutorThreadFactory = _executorService.getThreadFactory(); + } + + @Override + protected void tearDown() throws Exception + { + super.tearDown(); + _executorService.setThreadFactory(_beforeExecutorThreadFactory); + } + + + + /** + * Tests that the ReferenceCountingExecutorService correctly manages the reference count. + */ + public void testReferenceCounting() throws Exception + { + final int countBefore = _executorService.getReferenceCount(); + + try + { + _executorService.acquireExecutorService(); + _executorService.acquireExecutorService(); + + assertEquals("Reference count should now be +2", countBefore + 2, _executorService.getReferenceCount()); + } + finally + { + _executorService.releaseExecutorService(); + _executorService.releaseExecutorService(); + } + assertEquals("Reference count should have returned to the initial value", countBefore, _executorService.getReferenceCount()); + } + + /** + * Tests that the executor creates and executes a task using the default thread pool. + */ + public void testExecuteCommandWithDefaultExecutorThreadFactory() throws Exception + { + final CountDownLatch latch = new CountDownLatch(1); + final Set threadGroups = new HashSet(); + + _executorService.acquireExecutorService(); + + try + { + _executorService.getPool().execute(createRunnable(latch, threadGroups)); + + latch.await(3, TimeUnit.SECONDS); + + assertTrue("Expect that executor created a thread using default thread factory", + threadGroups.contains(Thread.currentThread().getThreadGroup())); + } + finally + { + _executorService.releaseExecutorService(); + } + } + + /** + * Tests that the executor creates and executes a task using an overridden thread pool. + */ + public void testExecuteCommandWithOverriddenExecutorThreadFactory() throws Exception + { + final CountDownLatch latch = new CountDownLatch(1); + final ThreadGroup expectedThreadGroup = new ThreadGroup("junit"); + _executorService.setThreadFactory(new ThreadGroupChangingThreadFactory(expectedThreadGroup)); + _executorService.acquireExecutorService(); + + final Set threadGroups = new HashSet(); + + try + { + _executorService.getPool().execute(createRunnable(latch, threadGroups)); + + latch.await(3, TimeUnit.SECONDS); + + assertTrue("Expect that executor created a thread using overridden thread factory", + threadGroups.contains(expectedThreadGroup)); + } + finally + { + _executorService.releaseExecutorService(); + } + } + + private Runnable createRunnable(final CountDownLatch latch, final Set threadGroups) + { + return new Runnable() + { + + public void run() + { + threadGroups.add(Thread.currentThread().getThreadGroup()); + latch.countDown(); + } + + }; + } + + private final class ThreadGroupChangingThreadFactory implements ThreadFactory + { + private final ThreadGroup _newGroup; + + private ThreadGroupChangingThreadFactory(final ThreadGroup newGroup) + { + this._newGroup = newGroup; + } + + public Thread newThread(Runnable r) + { + return new Thread(_newGroup, r); + } + } + +} diff --git a/java/common/src/test/java/org/apache/qpid/thread/ThreadFactoryTest.java b/java/common/src/test/java/org/apache/qpid/thread/ThreadFactoryTest.java index 7f17592893..7b0f93700a 100644 --- a/java/common/src/test/java/org/apache/qpid/thread/ThreadFactoryTest.java +++ b/java/common/src/test/java/org/apache/qpid/thread/ThreadFactoryTest.java @@ -1,4 +1,3 @@ -package org.apache.qpid.thread; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -20,18 +19,22 @@ package org.apache.qpid.thread; * */ +package org.apache.qpid.thread; import junit.framework.TestCase; +/** + * Tests the ThreadFactory. + */ public class ThreadFactoryTest extends TestCase { public void testThreadFactory() { - Class threadFactoryClass = null; + Class threadFactoryClass = null; try { threadFactoryClass = Class.forName(System.getProperty("qpid.thread_factory", - "org.apache.qpid.thread.DefaultThreadFactory")); + "org.apache.qpid.thread.DefaultThreadFactory")).asSubclass(ThreadFactory.class); } // If the thread factory class was wrong it will flagged way before it gets here. catch(Exception e) @@ -41,20 +44,19 @@ public class ThreadFactoryTest extends TestCase assertEquals(threadFactoryClass, Threading.getThreadFactory().getClass()); } - - public void testThreadCreate() + + /** + * Tests creating a thread without a priority. Also verifies that the factory sets the + * uncaught exception handler so uncaught exceptions are logged to SLF4J. + */ + public void testCreateThreadWithDefaultPriority() { - Runnable r = new Runnable(){ - - public void run(){ - - } - }; + Runnable r = createRunnable(); Thread t = null; try { - t = Threading.getThreadFactory().createThread(r,5); + t = Threading.getThreadFactory().createThread(r); } catch(Exception e) { @@ -62,6 +64,41 @@ public class ThreadFactoryTest extends TestCase } assertNotNull(t); - assertEquals(5,t.getPriority()); + assertEquals(Thread.NORM_PRIORITY, t.getPriority()); + assertTrue(t.getUncaughtExceptionHandler() instanceof LoggingUncaughtExceptionHandler); + } + + /** + * Tests creating thread with a priority. Also verifies that the factory sets the + * uncaught exception handler so uncaught exceptions are logged to SLF4J. + */ + public void testCreateThreadWithSpecifiedPriority() + { + Runnable r = createRunnable(); + + Thread t = null; + try + { + t = Threading.getThreadFactory().createThread(r, 4); + } + catch(Exception e) + { + fail("Error creating thread using Qpid thread factory"); + } + + assertNotNull(t); + assertEquals(4, t.getPriority()); + assertTrue(t.getUncaughtExceptionHandler() instanceof LoggingUncaughtExceptionHandler); + } + + private Runnable createRunnable() + { + Runnable r = new Runnable(){ + + public void run(){ + + } + }; + return r; } } -- cgit v1.2.1