diff options
| author | Keith Wall <kwall@apache.org> | 2014-10-14 16:13:20 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2014-10-14 16:13:20 +0000 |
| commit | 7fee0f39c4539c6348494ad902de348245b0b610 (patch) | |
| tree | a96296ac6faf71e942ff7cfcd3640d43a99af317 /qpid/java/client | |
| parent | a830da546c31f11c6cc7646cb198b7d5d226a8c4 (diff) | |
| download | qpid-python-7fee0f39c4539c6348494ad902de348245b0b610.tar.gz | |
QPID-6152: [Java Client] Allow connection/session close time to be overridden by system property.
Change implementation so that the timeout is applied individually to each session close, the connection and
the shutdown of the task pool. Also moved the shutdown of the task pool to a finally, so that it occurs
even if one or more sessions fail to close.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1631810 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java | 80 |
1 files changed, 33 insertions, 47 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 6b3b4601d9..d9298abd0f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -81,7 +81,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class); private static final AtomicLong CONN_NUMBER_GENERATOR = new AtomicLong(); - private static final long DEFAULT_CLOSE_TIMEOUT = 2000l; + private static final long DEFAULT_CLOSE_TIMEOUT = Long.getLong(ClientProperties.QPID_CLOSE_TIMEOUT, + ClientProperties.DEFAULT_CLOSE_TIMEOUT); private final long _connectionNumber; @@ -880,44 +881,19 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { try { - long startCloseTime = System.currentTimeMillis(); - - closeAllSessions(null, timeout, startCloseTime); - - //This MUST occur after we have successfully closed all Channels/Sessions - _taskPool.shutdown(); - - if (!_taskPool.isTerminated()) + try { - try - { - // adjust timeout - long taskPoolTimeout = adjustTimeout(timeout, startCloseTime); - - _taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) - { - _logger.info("Interrupted while shutting down connection thread pool."); - } + closeAllSessions(null, timeout); } - - // adjust timeout - timeout = adjustTimeout(timeout, startCloseTime); - //If the taskpool hasn't shutdown by now then give it shutdownNow. - // This will interupt any running tasks. - if (!_taskPool.isTerminated()) + finally { - List<Runnable> tasks = _taskPool.shutdownNow(); - for (Runnable r : tasks) - { - _logger.warn("Connection close forced taskpool to prevent execution:" + r); - } + //This MUST occur after we have successfully closed all Channels/Sessions + shutdownTaskPool(timeout); } } catch (JMSException e) { - _logger.error("error:", e); + _logger.error("Error closing connection", e); JMSException jmse = new JMSException("Error closing connection: " + e); jmse.setLinkedException(e); jmse.initCause(e); @@ -939,16 +915,32 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } - private long adjustTimeout(long timeout, long startTime) + private void shutdownTaskPool(final long timeout) { - long now = System.currentTimeMillis(); - timeout -= now - startTime; - if (timeout < 0) + _taskPool.shutdown(); + + if (!_taskPool.isTerminated()) { - timeout = 0; + try + { + _taskPool.awaitTermination(timeout, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + _logger.info("Interrupted while shutting down connection thread pool."); + } } - return timeout; + //If the taskpool hasn't shutdown by now then give it shutdownNow. + // This will interrupt any running tasks. + if (!_taskPool.isTerminated()) + { + List<Runnable> tasks = _taskPool.shutdownNow(); + for (Runnable r : tasks) + { + _logger.warn("Connection close forced taskpool to prevent execution:" + r); + } + } } /** @@ -976,7 +968,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect * @param cause if not null, the error that is causing this shutdown <p/> The caller must hold the failover mutex * before calling this method. */ - private void closeAllSessions(Throwable cause, long timeout, long starttime) throws JMSException + private void closeAllSessions(Throwable cause, long timeout) throws JMSException { final LinkedList sessionCopy = new LinkedList(_sessions.values()); final Iterator it = sessionCopy.iterator(); @@ -992,11 +984,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { try { - if (starttime != -1) - { - timeout = adjustTimeout(timeout, starttime); - } - session.close(timeout); } catch (JMSException e) @@ -1042,7 +1029,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { - // TODO Auto-generated method stub checkNotClosed(); throw new JmsNotImplementedException(); @@ -1322,7 +1308,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { try { - closeAllSessions(cause, -1, -1); // FIXME: when doing this end up with RejectedExecutionException from executor. + closeAllSessions(cause, -1); // FIXME: when doing this end up with RejectedExecutionException from executor. } catch (JMSException e) { @@ -1444,7 +1430,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public AMQShortString getTemporaryQueueExchangeName() { - return _temporaryQueueExchangeName; // To change body of created methods use File | Settings | File Templates. + return _temporaryQueueExchangeName; } public void setTemporaryTopicExchangeName(AMQShortString temporaryTopicExchangeName) |
