summaryrefslogtreecommitdiff
path: root/qpid/java/client
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2015-03-20 12:19:33 +0000
committerKeith Wall <kwall@apache.org>2015-03-20 12:19:33 +0000
commita3132b9031d594ffccefd0ce6b9c2d3f19952d65 (patch)
tree8535490fe01ce78882cded2e6962d7130dfbb305 /qpid/java/client
parent87629732fae81a4e9ac1a500e878dc3c57dc3ab8 (diff)
downloadqpid-python-a3132b9031d594ffccefd0ce6b9c2d3f19952d65.tar.gz
QPID-6460, QPID-6460: [Java Client] Make task pool used for exception reporting duties exactly one thread to serialise the callbacks
Also, * name the task pool thread (for diagnostic purposes) * no longer forcedily shutdown the pool on close as an unexpected InterruptedException may corrupt an application's state git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1668000 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java71
1 files changed, 24 insertions, 47 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index ec60bd2914..717ebcc86f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -33,7 +33,7 @@ import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.ConnectionConsumer;
@@ -89,7 +89,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private static final long DEFAULT_CLOSE_TIMEOUT = Long.getLong(ClientProperties.QPID_CLOSE_TIMEOUT,
ClientProperties.DEFAULT_CLOSE_TIMEOUT);
- private final long _connectionNumber;
+ private final long _connectionNumber = CONN_NUMBER_GENERATOR.incrementAndGet();
/**
* This is the "root" mutex that must be held when doing anything that could be impacted by failover. This must be
@@ -160,8 +160,24 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private AMQShortString _temporaryTopicExchangeName = AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME);
private AMQShortString _temporaryQueueExchangeName = AMQShortString.valueOf(ExchangeDefaults.DIRECT_EXCHANGE_NAME);
- /** Thread Pool for executing connection level processes. Such as returning bounced messages. */
- private final ExecutorService _taskPool = Executors.newCachedThreadPool();
+ /**
+ * Thread Pool for executing connection level processes such as reporting asynchronous exceptions
+ * and for 0-8..0-91 returning bounced messages.
+ */
+ private final ExecutorService _taskPool = Executors.newSingleThreadExecutor(new ThreadFactory()
+ {
+ @Override
+ public Thread newThread(final Runnable r)
+ {
+ Thread thread = new Thread(r, "Connection_" + AMQConnection.this._connectionNumber + "_task");
+ if (!thread.isDaemon())
+ {
+ thread.setDaemon(true);
+ }
+
+ return thread;
+ }
+ });
private AMQConnectionDelegate _delegate;
@@ -255,8 +271,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
throw new IllegalArgumentException("Connection must be specified");
}
- _connectionNumber = CONN_NUMBER_GENERATOR.incrementAndGet();
-
if (_logger.isDebugEnabled())
{
_logger.debug("Connection(" + _connectionNumber + "):" + connectionURL);
@@ -545,18 +559,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_connectionMetaData = new QpidConnectionMetaData(this);
}
- protected boolean checkException(Throwable thrown)
- {
- Throwable cause = thrown.getCause();
-
- if (cause == null)
- {
- cause = thrown;
- }
-
- return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException));
- }
-
private void initDelegate(ProtocolVersion pe) throws AMQProtocolException
{
try
@@ -935,8 +937,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
finally
{
- //This MUST occur after we have successfully closed all Channels/Sessions
- shutdownTaskPool(timeout);
+ shutdownTaskPool();
}
}
catch (JMSException e)
@@ -960,35 +961,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
}
-
}
- private void shutdownTaskPool(final long timeout)
+ private void shutdownTaskPool()
{
_taskPool.shutdown();
-
- if (!_taskPool.isTerminated())
- {
- try
- {
- _taskPool.awaitTermination(timeout, TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException e)
- {
- _logger.info("Interrupted while shutting down connection thread pool.");
- }
- }
-
- //If the taskpool hasn't shutdown by now then give it shutdownNow.
- // This will interrupt any running tasks.
- if (!_taskPool.isTerminated())
- {
- List<Runnable> tasks = _taskPool.shutdownNow();
- for (Runnable r : tasks)
- {
- _logger.warn("Connection close forced taskpool to prevent execution:" + r);
- }
- }
}
/**
@@ -1388,8 +1365,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
});
- }
- else
+ }
+ else
{
_logger.error("Throwable Received but no listener set: " + cause);
}