From e7f02a8b8b25d9fcce6525ccc5b794f8438995f0 Mon Sep 17 00:00:00 2001 From: Robert Gemmell Date: Fri, 4 Feb 2011 08:14:00 +0000 Subject: QPID-1670: Implement an UncaughtExceptionHandler to log exceptions causing the permature termination of Qpid client threads. Applied patch from Keith Wall git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1067108 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQSession.java | 37 +++-- .../qpid/client/protocol/AMQProtocolHandler.java | 39 ++++- .../pool/ReferenceCountingExecutorService.java | 57 +++++++- .../apache/qpid/thread/DefaultThreadFactory.java | 21 ++- .../thread/LoggingUncaughtExceptionHandler.java | 60 ++++++++ .../apache/qpid/thread/RealtimeThreadFactory.java | 6 +- .../pool/ReferenceCountingExecutorServiceTest.java | 159 +++++++++++++++++++++ .../org/apache/qpid/thread/ThreadFactoryTest.java | 63 ++++++-- .../apache/qpid/client/MessageListenerTest.java | 106 +++++++++++--- java/test-profiles/08StandaloneExcludes | 2 +- 10 files changed, 480 insertions(+), 70 deletions(-) create mode 100644 java/common/src/main/java/org/apache/qpid/thread/LoggingUncaughtExceptionHandler.java create mode 100644 java/common/src/test/java/org/apache/qpid/pool/ReferenceCountingExecutorServiceTest.java (limited to 'java') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index b5c41e483c..1f940b62f0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -119,7 +119,6 @@ import org.slf4j.LoggerFactory; */ public abstract class AMQSession extends Closeable implements Session, QueueSession, TopicSession { - public static final class IdToConsumerMap { private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; @@ -363,7 +362,7 @@ public abstract class AMQSessionNote that this operation succeeds automatically if a fail-over interupts the sycnronous request to close + *

Note that this operation succeeds automatically if a fail-over interrupts the synchronous request to close * the channel. This is because the channel is marked as closed before the request to close it is made, so the * fail-over should not re-open it. * - * @param timeout The timeout in milliseconds to wait for the session close acknoledgement from the broker. + * @param timeout The timeout in milliseconds to wait for the session close acknowledgement from the broker. * * @throws JMSException If the JMS provider fails to close the session due to some internal error. * @todo Be aware of possible changes to parameter order as versions change. diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index c16941b341..eb5af119b2 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -28,6 +28,7 @@ import java.util.Iterator; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import org.apache.mina.filter.codec.ProtocolCodecException; @@ -63,6 +64,7 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.NetworkDriver; import org.apache.qpid.transport.network.io.IoTransport; import org.slf4j.Logger; @@ -100,7 +102,7 @@ import org.slf4j.LoggerFactory; * connection is shutdown and a new one created. For this reason, an AMQProtocolHandler is created per AMQConnection * and the protocol session data is held outside of the MINA IOSession. * - *

This handler is responsibile for setting up the filter chain to filter all events for this handler through. + *

This handler is responsible for setting up the filter chain to filter all events for this handler through. * The filter chain is set up as a stack of event handers that perform the following functions (working upwards from * the network traffic at the bottom), handing off incoming events to an asynchronous thread pool to do the work, * optionally handling secure sockets encoding/decoding, encoding/decoding the AMQP format itself. @@ -114,8 +116,8 @@ import org.slf4j.LoggerFactory; * @todo Use a single handler instance, by shifting everything to do with the 'protocol session' state, including * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of * AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could - * be merged, although there is sense in keeping the session model seperate. Will clarify things by having data - * held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so + * be merged, although there is sense in keeping the session model separate. Will clarify things by having data + * held per protocol handler, per protocol session, per network connection, per channel, in separate classes, so * that lifecycles of the fields match lifecycles of their containing objects. */ public class AMQProtocolHandler implements ProtocolEngine @@ -158,7 +160,7 @@ public class AMQProtocolHandler implements ProtocolEngine /** Used to provide a condition to wait upon for operations that are required to wait for failover to complete. */ private CountDownLatch _failoverLatch; - /** The last failover exception that occured */ + /** The last failover exception that occurred */ private FailoverException _lastFailoverException; /** Defines the default timeout to use for synchronous protocol commands. */ @@ -187,6 +189,21 @@ public class AMQProtocolHandler implements ProtocolEngine _protocolSession = new AMQProtocolSession(this, _connection); _stateManager = new AMQStateManager(_protocolSession); _codecFactory = new AMQCodecFactory(false, _protocolSession); + _poolReference.setThreadFactory(new ThreadFactory() + { + + public Thread newThread(final Runnable runnable) + { + try + { + return Threading.getThreadFactory().createThread(runnable); + } + catch (Exception e) + { + throw new RuntimeException("Failed to create thread", e); + } + } + }); _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true); _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false); _poolReference.acquireExecutorService(); @@ -275,7 +292,15 @@ public class AMQProtocolHandler implements ProtocolEngine { if(!_connection.isClosed()) { - Thread failoverThread = new Thread(_failoverHandler); + final Thread failoverThread; + try + { + failoverThread = Threading.getThreadFactory().createThread(_failoverHandler); + } + catch (Exception e) + { + throw new RuntimeException("Failed to create thread", e); + } failoverThread.setName("Failover"); // Do not inherit daemon-ness from current thread as this can be a daemon // thread such as a AnonymousIoService thread. @@ -369,7 +394,7 @@ public class AMQProtocolHandler implements ProtocolEngine } /** - * This caters for the case where we only need to propogate an exception to the the frame listeners to interupt any + * This caters for the case where we only need to propagate an exception to the the frame listeners to interupt any * protocol level waits. * * This will would normally be used to notify all Frame Listeners that Failover is about to occur and they should @@ -407,7 +432,7 @@ public class AMQProtocolHandler implements ProtocolEngine } //Only notify the Frame listeners that failover is going to occur as the State listeners shouldn't be - // interupted unless failover cannot restore the state. + // interrupted unless failover cannot restore the state. propagateExceptionToFrameListeners(_lastFailoverException); } diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java index 20a30b3ed3..8152a1f5e9 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java +++ b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java @@ -22,10 +22,12 @@ package org.apache.qpid.pool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.LinkedBlockingQueue; + /** * ReferenceCountingExecutorService wraps an ExecutorService in order to provide shared reference to it. It counts * the references taken, instantiating the service on the first reference, and shutting it down when the last @@ -36,7 +38,7 @@ import java.util.concurrent.LinkedBlockingQueue; * *

(), + _threadFactory); } + } @@ -137,7 +150,7 @@ public class ReferenceCountingExecutorService } /** - * Releases a reference to a shared executor service, decrementing the reference count. If the refence count falls + * Releases a reference to a shared executor service, decrementing the reference count. If the reference count falls * to zero, the executor service is shut down. */ public void releaseExecutorService() @@ -169,4 +182,34 @@ public class ReferenceCountingExecutorService { return _refCount; } + + /** + * + * Return the thread factory used by the {@link ThreadPoolExecutor} to create new threads. + * + * @return thread factory + */ + public ThreadFactory getThreadFactory() + { + return _threadFactory; + } + + /** + * Sets the thread factory used by the {@link ThreadPoolExecutor} to create new threads. + *

+ * If the pool has been already created, the change will have no effect until + * {@link #getReferenceCount()} reaches zero and the pool recreated. For this reason, + * callers must invoke this method before calling {@link #acquireExecutorService()}. + * + * @param threadFactory thread factory + */ + public void setThreadFactory(final ThreadFactory threadFactory) + { + if (threadFactory == null) + { + throw new NullPointerException("threadFactory cannot be null"); + } + _threadFactory = threadFactory; + } + } diff --git a/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java b/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java index 9786d8fc3f..a96dac4109 100644 --- a/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java +++ b/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java @@ -1,4 +1,3 @@ -package org.apache.qpid.thread; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -20,29 +19,25 @@ package org.apache.qpid.thread; * */ +package org.apache.qpid.thread; -public class DefaultThreadFactory implements ThreadFactory -{ - - private static class QpidThread extends Thread - { - private QpidThread(final Runnable target) - { - super(target); - } - } +public class DefaultThreadFactory implements ThreadFactory +{ + private final LoggingUncaughtExceptionHandler _loggingUncaughtExceptionHandler = new LoggingUncaughtExceptionHandler(); public Thread createThread(Runnable r) { - return new Thread(r); + Thread t = new Thread(r); + t.setUncaughtExceptionHandler(_loggingUncaughtExceptionHandler); + return t; } public Thread createThread(Runnable r, int priority) { - Thread t = new Thread(r); + Thread t = createThread(r); t.setPriority(priority); return t; } diff --git a/java/common/src/main/java/org/apache/qpid/thread/LoggingUncaughtExceptionHandler.java b/java/common/src/main/java/org/apache/qpid/thread/LoggingUncaughtExceptionHandler.java new file mode 100644 index 0000000000..192675edcd --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/thread/LoggingUncaughtExceptionHandler.java @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.thread; + +import java.lang.Thread.UncaughtExceptionHandler; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * An {@link UncaughtExceptionHandler} that writes the exception to the application log via + * the SLF4J framework. Once registered with {@link Thread#setUncaughtExceptionHandler(UncaughtExceptionHandler)} + * it will be invoked by the JVM when a thread has been abruptly terminated due to an uncaught exception. + * Owing to the contract of {@link Runnable#run()}, the only possible exception types which can cause such a termination + * are instances of {@link RuntimeException} and {@link Error}. These exceptions are catastrophic and the client must + * restart the JVM. + *

+ * The implementation also invokes {@link ThreadGroup#uncaughtException(Thread, Throwable)}. This + * is done to retain compatibility with any monitoring solutions (for example, log scraping of + * standard error) that existing users of older Qpid client libraries may have in place. + * + */ +public class LoggingUncaughtExceptionHandler implements UncaughtExceptionHandler +{ + private static final Logger _logger = LoggerFactory.getLogger(LoggingUncaughtExceptionHandler.class); + + @Override + public void uncaughtException(Thread t, Throwable e) + { + try + { + _logger.error("Uncaught exception in thread \"{}\"", t.getName(), e); + } + finally + { + // Invoke the thread group's handler too for compatibility with any + // existing clients who are already scraping stderr for such conditions. + t.getThreadGroup().uncaughtException(t, e); + } + } +} \ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java b/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java index 0507b3108f..95a8d192c5 100644 --- a/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java +++ b/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java @@ -25,6 +25,8 @@ import java.lang.reflect.Constructor; public class RealtimeThreadFactory implements ThreadFactory { + private final LoggingUncaughtExceptionHandler _loggingUncaughtExceptionHandler = new LoggingUncaughtExceptionHandler(); + private Class threadClass; private Constructor threadConstructor; private Constructor priorityParameterConstructor; @@ -62,7 +64,9 @@ public class RealtimeThreadFactory implements ThreadFactory public Thread createThread(Runnable r, int priority) throws Exception { Object priorityParams = priorityParameterConstructor.newInstance(priority); - return (Thread)threadConstructor.newInstance(priorityParams,null,null,null,null,r); + Thread thread = (Thread)threadConstructor.newInstance(priorityParams,null,null,null,null,r); + thread.setUncaughtExceptionHandler(_loggingUncaughtExceptionHandler); + return thread; } } diff --git a/java/common/src/test/java/org/apache/qpid/pool/ReferenceCountingExecutorServiceTest.java b/java/common/src/test/java/org/apache/qpid/pool/ReferenceCountingExecutorServiceTest.java new file mode 100644 index 0000000000..35998de3a1 --- /dev/null +++ b/java/common/src/test/java/org/apache/qpid/pool/ReferenceCountingExecutorServiceTest.java @@ -0,0 +1,159 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.pool; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import junit.framework.TestCase; + + +public class ReferenceCountingExecutorServiceTest extends TestCase +{ + + + private ReferenceCountingExecutorService _executorService = ReferenceCountingExecutorService.getInstance(); // Class under test + private ThreadFactory _beforeExecutorThreadFactory; + + + @Override + protected void setUp() throws Exception + { + super.setUp(); + _beforeExecutorThreadFactory = _executorService.getThreadFactory(); + } + + @Override + protected void tearDown() throws Exception + { + super.tearDown(); + _executorService.setThreadFactory(_beforeExecutorThreadFactory); + } + + + + /** + * Tests that the ReferenceCountingExecutorService correctly manages the reference count. + */ + public void testReferenceCounting() throws Exception + { + final int countBefore = _executorService.getReferenceCount(); + + try + { + _executorService.acquireExecutorService(); + _executorService.acquireExecutorService(); + + assertEquals("Reference count should now be +2", countBefore + 2, _executorService.getReferenceCount()); + } + finally + { + _executorService.releaseExecutorService(); + _executorService.releaseExecutorService(); + } + assertEquals("Reference count should have returned to the initial value", countBefore, _executorService.getReferenceCount()); + } + + /** + * Tests that the executor creates and executes a task using the default thread pool. + */ + public void testExecuteCommandWithDefaultExecutorThreadFactory() throws Exception + { + final CountDownLatch latch = new CountDownLatch(1); + final Set threadGroups = new HashSet(); + + _executorService.acquireExecutorService(); + + try + { + _executorService.getPool().execute(createRunnable(latch, threadGroups)); + + latch.await(3, TimeUnit.SECONDS); + + assertTrue("Expect that executor created a thread using default thread factory", + threadGroups.contains(Thread.currentThread().getThreadGroup())); + } + finally + { + _executorService.releaseExecutorService(); + } + } + + /** + * Tests that the executor creates and executes a task using an overridden thread pool. + */ + public void testExecuteCommandWithOverriddenExecutorThreadFactory() throws Exception + { + final CountDownLatch latch = new CountDownLatch(1); + final ThreadGroup expectedThreadGroup = new ThreadGroup("junit"); + _executorService.setThreadFactory(new ThreadGroupChangingThreadFactory(expectedThreadGroup)); + _executorService.acquireExecutorService(); + + final Set threadGroups = new HashSet(); + + try + { + _executorService.getPool().execute(createRunnable(latch, threadGroups)); + + latch.await(3, TimeUnit.SECONDS); + + assertTrue("Expect that executor created a thread using overridden thread factory", + threadGroups.contains(expectedThreadGroup)); + } + finally + { + _executorService.releaseExecutorService(); + } + } + + private Runnable createRunnable(final CountDownLatch latch, final Set threadGroups) + { + return new Runnable() + { + + public void run() + { + threadGroups.add(Thread.currentThread().getThreadGroup()); + latch.countDown(); + } + + }; + } + + private final class ThreadGroupChangingThreadFactory implements ThreadFactory + { + private final ThreadGroup _newGroup; + + private ThreadGroupChangingThreadFactory(final ThreadGroup newGroup) + { + this._newGroup = newGroup; + } + + public Thread newThread(Runnable r) + { + return new Thread(_newGroup, r); + } + } + +} diff --git a/java/common/src/test/java/org/apache/qpid/thread/ThreadFactoryTest.java b/java/common/src/test/java/org/apache/qpid/thread/ThreadFactoryTest.java index 7f17592893..7b0f93700a 100644 --- a/java/common/src/test/java/org/apache/qpid/thread/ThreadFactoryTest.java +++ b/java/common/src/test/java/org/apache/qpid/thread/ThreadFactoryTest.java @@ -1,4 +1,3 @@ -package org.apache.qpid.thread; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -20,18 +19,22 @@ package org.apache.qpid.thread; * */ +package org.apache.qpid.thread; import junit.framework.TestCase; +/** + * Tests the ThreadFactory. + */ public class ThreadFactoryTest extends TestCase { public void testThreadFactory() { - Class threadFactoryClass = null; + Class threadFactoryClass = null; try { threadFactoryClass = Class.forName(System.getProperty("qpid.thread_factory", - "org.apache.qpid.thread.DefaultThreadFactory")); + "org.apache.qpid.thread.DefaultThreadFactory")).asSubclass(ThreadFactory.class); } // If the thread factory class was wrong it will flagged way before it gets here. catch(Exception e) @@ -41,20 +44,19 @@ public class ThreadFactoryTest extends TestCase assertEquals(threadFactoryClass, Threading.getThreadFactory().getClass()); } - - public void testThreadCreate() + + /** + * Tests creating a thread without a priority. Also verifies that the factory sets the + * uncaught exception handler so uncaught exceptions are logged to SLF4J. + */ + public void testCreateThreadWithDefaultPriority() { - Runnable r = new Runnable(){ - - public void run(){ - - } - }; + Runnable r = createRunnable(); Thread t = null; try { - t = Threading.getThreadFactory().createThread(r,5); + t = Threading.getThreadFactory().createThread(r); } catch(Exception e) { @@ -62,6 +64,41 @@ public class ThreadFactoryTest extends TestCase } assertNotNull(t); - assertEquals(5,t.getPriority()); + assertEquals(Thread.NORM_PRIORITY, t.getPriority()); + assertTrue(t.getUncaughtExceptionHandler() instanceof LoggingUncaughtExceptionHandler); + } + + /** + * Tests creating thread with a priority. Also verifies that the factory sets the + * uncaught exception handler so uncaught exceptions are logged to SLF4J. + */ + public void testCreateThreadWithSpecifiedPriority() + { + Runnable r = createRunnable(); + + Thread t = null; + try + { + t = Threading.getThreadFactory().createThread(r, 4); + } + catch(Exception e) + { + fail("Error creating thread using Qpid thread factory"); + } + + assertNotNull(t); + assertEquals(4, t.getPriority()); + assertTrue(t.getUncaughtExceptionHandler() instanceof LoggingUncaughtExceptionHandler); + } + + private Runnable createRunnable() + { + Runnable r = new Runnable(){ + + public void run(){ + + } + }; + return r; } } diff --git a/java/systests/src/main/java/org/apache/qpid/client/MessageListenerTest.java b/java/systests/src/main/java/org/apache/qpid/client/MessageListenerTest.java index 11d9ce2bbc..e4d1c72208 100644 --- a/java/systests/src/main/java/org/apache/qpid/client/MessageListenerTest.java +++ b/java/systests/src/main/java/org/apache/qpid/client/MessageListenerTest.java @@ -20,12 +20,16 @@ */ package org.apache.qpid.client; +import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.util.LogMonitor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.jms.Connection; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; @@ -46,14 +50,15 @@ import java.util.concurrent.TimeUnit; * the message listener later the _synchronousQueue is just poll()'ed and the first message delivered the remaining * messages will be left on the queue and lost, subsequent messages on the session will arrive first. */ -public class MessageListenerTest extends QpidBrokerTestCase implements MessageListener +public class MessageListenerTest extends QpidBrokerTestCase implements MessageListener, ExceptionListener { private static final Logger _logger = LoggerFactory.getLogger(MessageListenerTest.class); Context _context; private static final int MSG_COUNT = 5; - private int receivedCount = 0; + private int _receivedCount = 0; + private int _errorCount = 0; private MessageConsumer _consumer; private Connection _clientConnection; private CountDownLatch _awaitMessages = new CountDownLatch(MSG_COUNT); @@ -94,11 +99,14 @@ public class MessageListenerTest extends QpidBrokerTestCase implements MessageLi protected void tearDown() throws Exception { - _clientConnection.close(); + if (_clientConnection != null) + { + _clientConnection.close(); + } super.tearDown(); } - public void testSynchronousRecieve() throws Exception + public void testSynchronousReceive() throws Exception { for (int msg = 0; msg < MSG_COUNT; msg++) { @@ -106,15 +114,15 @@ public class MessageListenerTest extends QpidBrokerTestCase implements MessageLi } } - public void testSynchronousRecieveNoWait() throws Exception + public void testSynchronousReceiveNoWait() throws Exception { for (int msg = 0; msg < MSG_COUNT; msg++) { - assertTrue(_consumer.receiveNoWait() != null); + assertTrue("Failed to receive message " + msg, _consumer.receiveNoWait() != null); } } - public void testAsynchronousRecieve() throws Exception + public void testAsynchronousReceive() throws Exception { _consumer.setMessageListener(this); @@ -128,18 +136,17 @@ public class MessageListenerTest extends QpidBrokerTestCase implements MessageLi { // do nothing } - // Should have recieved all async messages - assertEquals(MSG_COUNT, receivedCount); + // Should have received all async messages + assertEquals(MSG_COUNT, _receivedCount); } - public void testRecieveThenUseMessageListener() throws Exception + public void testReceiveThenUseMessageListener() throws Exception { - _logger.error("Test disabled as initial receive is not called first"); // Perform initial receive to start connection assertTrue(_consumer.receive(2000) != null); - receivedCount++; + _receivedCount++; // Sleep to ensure remaining 4 msgs end up on _synchronousQueue Thread.sleep(1000); @@ -157,8 +164,8 @@ public class MessageListenerTest extends QpidBrokerTestCase implements MessageLi { // do nothing } - // Should have recieved all async messages - assertEquals(MSG_COUNT, receivedCount); + // Should have received all async messages + assertEquals(MSG_COUNT, _receivedCount); _clientConnection.close(); @@ -172,14 +179,81 @@ public class MessageListenerTest extends QpidBrokerTestCase implements MessageLi assertTrue(cons.receive(2000) == null); } + /** + * Tests the case where the message listener throws an java.lang.Error. + * + */ + public void testMessageListenerThrowsError() throws Exception + { + final String javaLangErrorMessageText = "MessageListener failed with java.lang.Error"; + _clientConnection.setExceptionListener(this); + + _awaitMessages = new CountDownLatch(1); + + _consumer.setMessageListener(new MessageListener() + { + public void onMessage(Message message) + { + try + { + _logger.debug("onMessage called"); + _receivedCount++; + + + throw new Error(javaLangErrorMessageText); + } + finally + { + _awaitMessages.countDown(); + } + } + }); + + + _logger.info("Waiting 3 seconds for message"); + _awaitMessages.await(3000, TimeUnit.MILLISECONDS); + + assertEquals("onMessage should have been called", 1, _receivedCount); + assertEquals("onException should NOT have been called", 0, _errorCount); + + // Check that Error has been written to the application log. + + LogMonitor _monitor = new LogMonitor(_outputFile); + assertTrue("The expected message not written to log file.", + _monitor.waitForMessage(javaLangErrorMessageText, LOGMONITOR_TIMEOUT)); + + if (_clientConnection != null) + { + try + { + _clientConnection.close(); + } + catch (JMSException e) + { + // Ignore connection close errors for this test. + } + finally + { + _clientConnection = null; + } + } + } + public void onMessage(Message message) { - _logger.info("Received Message(" + receivedCount + "):" + message); + _logger.info("Received Message(" + _receivedCount + "):" + message); - receivedCount++; + _receivedCount++; _awaitMessages.countDown(); } + @Override + public void onException(JMSException e) + { + _logger.info("Exception received", e); + _errorCount++; + } + public static junit.framework.Test suite() { return new junit.framework.TestSuite(MessageListenerTest.class); diff --git a/java/test-profiles/08StandaloneExcludes b/java/test-profiles/08StandaloneExcludes index 214ee83bb2..b482a14c6d 100644 --- a/java/test-profiles/08StandaloneExcludes +++ b/java/test-profiles/08StandaloneExcludes @@ -34,6 +34,6 @@ org.apache.qpid.test.client.queue.QueuePolicyTest#testRejectPolicy // Those tests are written against the 0.10 path org.apache.qpid.test.unit.message.UTF8Test#* -org.apache.qpid.client.MessageListenerTest#testSynchronousRecieveNoWait +org.apache.qpid.client.MessageListenerTest#testSynchronousReceiveNoWait org.apache.qpid.test.unit.client.connection.ConnectionTest#testUnsupportedSASLMechanism -- cgit v1.2.1