diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2008-04-29 22:28:31 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2008-04-29 22:28:31 +0000 |
| commit | 0c8f372f71409444dd9f3bc38c481c1ec6ba4827 (patch) | |
| tree | 5bb4d9fed03bf57a402f7f8ce17a2e241222cf53 /java/client | |
| parent | 4ffe19dfe01e2de80b46943894501719b5e939ad (diff) | |
| download | qpid-python-0c8f372f71409444dd9f3bc38c481c1ec6ba4827.tar.gz | |
QPID-983: fixed deadlock between AMQConnection.close and FailoverHandler
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@652173 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQConnection.java | 84 |
1 files changed, 47 insertions, 37 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 3969eef8a9..ae54d9b9fe 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -914,6 +914,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public void close(List<AMQSession> sessions, long timeout) throws JMSException { + if (!_closed.getAndSet(true)) + { + doClose(sessions, timeout); + } + } + + private void doClose(List<AMQSession> sessions, long timeout) throws JMSException + { synchronized(_sessionCreationLock) { if(!sessions.isEmpty()) @@ -921,18 +929,16 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect AMQSession session = sessions.remove(0); synchronized(session.getMessageDeliveryLock()) { - close(sessions, timeout); + doClose(sessions, timeout); } } else { - if (!_closed.getAndSet(true)) + synchronized (getFailoverMutex()) { - synchronized (getFailoverMutex()) + try { - try - { - long startCloseTime = System.currentTimeMillis(); + long startCloseTime = System.currentTimeMillis(); closeAllSessions(null, timeout, startCloseTime); @@ -941,41 +947,40 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect if (!_taskPool.isTerminated()) { - try - { - // adjust timeout - long taskPoolTimeout = adjustTimeout(timeout, startCloseTime); - - _taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) - { - _logger.info("Interrupted while shutting down connection thread pool."); - } - } - - // adjust timeout - timeout = adjustTimeout(timeout, startCloseTime); - _delegate.closeConneciton(timeout); + try + { + // adjust timeout + long taskPoolTimeout = adjustTimeout(timeout, startCloseTime); - //If the taskpool hasn't shutdown by now then give it shutdownNow. - // This will interupt any running tasks. - if (!_taskPool.isTerminated()) + _taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) { - List<Runnable> tasks = _taskPool.shutdownNow(); - for (Runnable r : tasks) - { - _logger.warn("Connection close forced taskpool to prevent execution:" + r); - } + _logger.info("Interrupted while shutting down connection thread pool."); } } - catch (AMQException e) + + // adjust timeout + timeout = adjustTimeout(timeout, startCloseTime); + _delegate.closeConneciton(timeout); + + //If the taskpool hasn't shutdown by now then give it shutdownNow. + // This will interupt any running tasks. + if (!_taskPool.isTerminated()) { - JMSException jmse = new JMSException("Error closing connection: " + e); - jmse.setLinkedException(e); - throw jmse; + List<Runnable> tasks = _taskPool.shutdownNow(); + for (Runnable r : tasks) + { + _logger.warn("Connection close forced taskpool to prevent execution:" + r); + } } } + catch (AMQException e) + { + JMSException jmse = new JMSException("Error closing connection: " + e); + jmse.setLinkedException(e); + throw jmse; + } } } } @@ -1294,12 +1299,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } + boolean closer = false; + // in the case of an IOException, MINA has closed the protocol session so we set _closed to true // so that any generic client code that tries to close the connection will not mess up this error // handling sequence if (cause instanceof IOException) { - _closed.set(true); + closer = !_closed.getAndSet(true); } if (_exceptionListener != null) @@ -1320,8 +1327,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _logger.info("Closing AMQConnection due to :" + cause.getMessage()); } - _closed.set(true); - closeAllSessions(cause, -1, -1); // FIXME: when doing this end up with RejectedExecutionException from executor. + closer = (!_closed.getAndSet(true)) || closer; + if (closer) + { + closeAllSessions(cause, -1, -1); // FIXME: when doing this end up with RejectedExecutionException from executor. + } } catch (JMSException e) { |
