summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-04-29 22:28:31 +0000
committerRafael H. Schloming <rhs@apache.org>2008-04-29 22:28:31 +0000
commit0c8f372f71409444dd9f3bc38c481c1ec6ba4827 (patch)
tree5bb4d9fed03bf57a402f7f8ce17a2e241222cf53 /java/client
parent4ffe19dfe01e2de80b46943894501719b5e939ad (diff)
downloadqpid-python-0c8f372f71409444dd9f3bc38c481c1ec6ba4827.tar.gz
QPID-983: fixed deadlock between AMQConnection.close and FailoverHandler
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@652173 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java84
1 files changed, 47 insertions, 37 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 3969eef8a9..ae54d9b9fe 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -914,6 +914,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public void close(List<AMQSession> sessions, long timeout) throws JMSException
{
+ if (!_closed.getAndSet(true))
+ {
+ doClose(sessions, timeout);
+ }
+ }
+
+ private void doClose(List<AMQSession> sessions, long timeout) throws JMSException
+ {
synchronized(_sessionCreationLock)
{
if(!sessions.isEmpty())
@@ -921,18 +929,16 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
AMQSession session = sessions.remove(0);
synchronized(session.getMessageDeliveryLock())
{
- close(sessions, timeout);
+ doClose(sessions, timeout);
}
}
else
{
- if (!_closed.getAndSet(true))
+ synchronized (getFailoverMutex())
{
- synchronized (getFailoverMutex())
+ try
{
- try
- {
- long startCloseTime = System.currentTimeMillis();
+ long startCloseTime = System.currentTimeMillis();
closeAllSessions(null, timeout, startCloseTime);
@@ -941,41 +947,40 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
if (!_taskPool.isTerminated())
{
- try
- {
- // adjust timeout
- long taskPoolTimeout = adjustTimeout(timeout, startCloseTime);
-
- _taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException e)
- {
- _logger.info("Interrupted while shutting down connection thread pool.");
- }
- }
-
- // adjust timeout
- timeout = adjustTimeout(timeout, startCloseTime);
- _delegate.closeConneciton(timeout);
+ try
+ {
+ // adjust timeout
+ long taskPoolTimeout = adjustTimeout(timeout, startCloseTime);
- //If the taskpool hasn't shutdown by now then give it shutdownNow.
- // This will interupt any running tasks.
- if (!_taskPool.isTerminated())
+ _taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
{
- List<Runnable> tasks = _taskPool.shutdownNow();
- for (Runnable r : tasks)
- {
- _logger.warn("Connection close forced taskpool to prevent execution:" + r);
- }
+ _logger.info("Interrupted while shutting down connection thread pool.");
}
}
- catch (AMQException e)
+
+ // adjust timeout
+ timeout = adjustTimeout(timeout, startCloseTime);
+ _delegate.closeConneciton(timeout);
+
+ //If the taskpool hasn't shutdown by now then give it shutdownNow.
+ // This will interupt any running tasks.
+ if (!_taskPool.isTerminated())
{
- JMSException jmse = new JMSException("Error closing connection: " + e);
- jmse.setLinkedException(e);
- throw jmse;
+ List<Runnable> tasks = _taskPool.shutdownNow();
+ for (Runnable r : tasks)
+ {
+ _logger.warn("Connection close forced taskpool to prevent execution:" + r);
+ }
}
}
+ catch (AMQException e)
+ {
+ JMSException jmse = new JMSException("Error closing connection: " + e);
+ jmse.setLinkedException(e);
+ throw jmse;
+ }
}
}
}
@@ -1294,12 +1299,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
+ boolean closer = false;
+
// in the case of an IOException, MINA has closed the protocol session so we set _closed to true
// so that any generic client code that tries to close the connection will not mess up this error
// handling sequence
if (cause instanceof IOException)
{
- _closed.set(true);
+ closer = !_closed.getAndSet(true);
}
if (_exceptionListener != null)
@@ -1320,8 +1327,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_logger.info("Closing AMQConnection due to :" + cause.getMessage());
}
- _closed.set(true);
- closeAllSessions(cause, -1, -1); // FIXME: when doing this end up with RejectedExecutionException from executor.
+ closer = (!_closed.getAndSet(true)) || closer;
+ if (closer)
+ {
+ closeAllSessions(cause, -1, -1); // FIXME: when doing this end up with RejectedExecutionException from executor.
+ }
}
catch (JMSException e)
{