diff options
| author | Keith Wall <kwall@apache.org> | 2015-03-20 12:19:33 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2015-03-20 12:19:33 +0000 |
| commit | a3132b9031d594ffccefd0ce6b9c2d3f19952d65 (patch) | |
| tree | 8535490fe01ce78882cded2e6962d7130dfbb305 /qpid/java/client | |
| parent | 87629732fae81a4e9ac1a500e878dc3c57dc3ab8 (diff) | |
| download | qpid-python-a3132b9031d594ffccefd0ce6b9c2d3f19952d65.tar.gz | |
QPID-6460, QPID-6460: [Java Client] Make task pool used for exception reporting duties exactly one thread to serialise the callbacks
Also,
* name the task pool thread (for diagnostic purposes)
* no longer forcedily shutdown the pool on close as an unexpected InterruptedException may corrupt an application's state
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1668000 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java | 71 |
1 files changed, 24 insertions, 47 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index ec60bd2914..717ebcc86f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -33,7 +33,7 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicLong; import javax.jms.ConnectionConsumer; @@ -89,7 +89,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private static final long DEFAULT_CLOSE_TIMEOUT = Long.getLong(ClientProperties.QPID_CLOSE_TIMEOUT, ClientProperties.DEFAULT_CLOSE_TIMEOUT); - private final long _connectionNumber; + private final long _connectionNumber = CONN_NUMBER_GENERATOR.incrementAndGet(); /** * This is the "root" mutex that must be held when doing anything that could be impacted by failover. This must be @@ -160,8 +160,24 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private AMQShortString _temporaryTopicExchangeName = AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME); private AMQShortString _temporaryQueueExchangeName = AMQShortString.valueOf(ExchangeDefaults.DIRECT_EXCHANGE_NAME); - /** Thread Pool for executing connection level processes. Such as returning bounced messages. */ - private final ExecutorService _taskPool = Executors.newCachedThreadPool(); + /** + * Thread Pool for executing connection level processes such as reporting asynchronous exceptions + * and for 0-8..0-91 returning bounced messages. + */ + private final ExecutorService _taskPool = Executors.newSingleThreadExecutor(new ThreadFactory() + { + @Override + public Thread newThread(final Runnable r) + { + Thread thread = new Thread(r, "Connection_" + AMQConnection.this._connectionNumber + "_task"); + if (!thread.isDaemon()) + { + thread.setDaemon(true); + } + + return thread; + } + }); private AMQConnectionDelegate _delegate; @@ -255,8 +271,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect throw new IllegalArgumentException("Connection must be specified"); } - _connectionNumber = CONN_NUMBER_GENERATOR.incrementAndGet(); - if (_logger.isDebugEnabled()) { _logger.debug("Connection(" + _connectionNumber + "):" + connectionURL); @@ -545,18 +559,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _connectionMetaData = new QpidConnectionMetaData(this); } - protected boolean checkException(Throwable thrown) - { - Throwable cause = thrown.getCause(); - - if (cause == null) - { - cause = thrown; - } - - return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException)); - } - private void initDelegate(ProtocolVersion pe) throws AMQProtocolException { try @@ -935,8 +937,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } finally { - //This MUST occur after we have successfully closed all Channels/Sessions - shutdownTaskPool(timeout); + shutdownTaskPool(); } } catch (JMSException e) @@ -960,35 +961,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } } - } - private void shutdownTaskPool(final long timeout) + private void shutdownTaskPool() { _taskPool.shutdown(); - - if (!_taskPool.isTerminated()) - { - try - { - _taskPool.awaitTermination(timeout, TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) - { - _logger.info("Interrupted while shutting down connection thread pool."); - } - } - - //If the taskpool hasn't shutdown by now then give it shutdownNow. - // This will interrupt any running tasks. - if (!_taskPool.isTerminated()) - { - List<Runnable> tasks = _taskPool.shutdownNow(); - for (Runnable r : tasks) - { - _logger.warn("Connection close forced taskpool to prevent execution:" + r); - } - } } /** @@ -1388,8 +1365,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } }); - } - else + } + else { _logger.error("Throwable Received but no listener set: " + cause); } |
