summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-02-10 09:32:24 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-02-10 09:32:24 +0000
commitda8935e0491a2b8e9edb6671e5874b77516ee2a9 (patch)
treeda388901de29284fc33849d906b80c52a0a8ee07 /qpid/java
parentfd62e0144c11a78253174f44fb84aa0964c20ec6 (diff)
downloadqpid-python-da8935e0491a2b8e9edb6671e5874b77516ee2a9.tar.gz
QPID-6374 : Reorder ock aquisition and remove synchronization where it is not desired to reduce deadlocks
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1658652 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java109
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java111
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java41
3 files changed, 133 insertions, 128 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 6c37462011..c7fcde824a 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -865,13 +865,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public void close(long timeout) throws JMSException
{
- close(new ArrayList<AMQSession>(_sessions.values()), timeout);
- }
+ boolean closed;
- public void close(List<AMQSession> sessions, long timeout) throws JMSException
- {
- if (!setClosed())
+ synchronized (_sessionCreationLock)
+ {
+ closed = setClosed();
+ }
+
+ if (!closed)
{
+ List<AMQSession> sessions = new ArrayList<>(_sessions.values());
+
setClosing(true);
try
{
@@ -886,54 +890,52 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private void doClose(List<AMQSession> sessions, long timeout) throws JMSException
{
- synchronized (_sessionCreationLock)
+ if (!sessions.isEmpty())
{
- if (!sessions.isEmpty())
+ AMQSession session = sessions.remove(0);
+ synchronized (session.getMessageDeliveryLock())
{
- AMQSession session = sessions.remove(0);
- synchronized (session.getMessageDeliveryLock())
- {
- doClose(sessions, timeout);
- }
+ doClose(sessions, timeout);
}
- else
+ }
+ else
+ {
+ synchronized (getFailoverMutex())
{
- synchronized (getFailoverMutex())
+ try
{
try
{
- try
- {
- closeAllSessions(null, timeout);
- }
- finally
- {
- //This MUST occur after we have successfully closed all Channels/Sessions
- shutdownTaskPool(timeout);
- }
+ closeAllSessions(null, timeout);
}
- catch (JMSException e)
+ finally
{
- _logger.error("Error closing connection", e);
- JMSException jmse = new JMSException("Error closing connection: " + e);
- jmse.setLinkedException(e);
- jmse.initCause(e);
- throw jmse;
+ //This MUST occur after we have successfully closed all Channels/Sessions
+ shutdownTaskPool(timeout);
}
- finally
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error closing connection", e);
+ JMSException jmse = new JMSException("Error closing connection: " + e);
+ jmse.setLinkedException(e);
+ jmse.initCause(e);
+ throw jmse;
+ }
+ finally
+ {
+ try
{
- try
- {
- _delegate.closeConnection(timeout);
- }
- catch (Exception e)
- {
- _logger.warn("Error closing underlying protocol connection", e);
- }
+ _delegate.closeConnection(timeout);
+ }
+ catch (Exception e)
+ {
+ _logger.warn("Error closing underlying protocol connection", e);
}
}
}
}
+
}
private void shutdownTaskPool(final long timeout)
@@ -1308,28 +1310,29 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
try
{
- // get the failover mutex before trying to close
- synchronized (getFailoverMutex())
+ // decide if we are going to close the session
+ if (hardError(cause))
{
- // decide if we are going to close the session
- if (hardError(cause))
+ closer = (!setClosed()) || closer;
{
- closer = (!setClosed()) || closer;
- {
- _logger.info("Closing AMQConnection due to :" + cause);
- }
- }
- else
- {
- _logger.info("Not a hard-error connection not closing: " + cause);
+ _logger.info("Closing AMQConnection due to :" + cause);
}
+ }
+ else
+ {
+ _logger.info("Not a hard-error connection not closing: " + cause);
+ }
+
- // if we are closing the connection, close sessions first
- if (closer)
+ // if we are closing the connection, close sessions first
+ if (closer)
+ {
+ // get the failover mutex before trying to close
+ synchronized (getFailoverMutex())
{
try
{
- closeAllSessions(cause, -1); // FIXME: when doing this end up with RejectedExecutionException from executor.
+ closeAllSessions(cause, -1);
}
catch (JMSException e)
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 29460bb42d..12e9285af8 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -808,7 +808,16 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
*/
public void close(long timeout) throws JMSException
{
- close(timeout, true);
+ synchronized (_messageDeliveryLock)
+ {
+ // We must close down all producers and consumers in an orderly fashion. This is the only method
+ // that can be called from a different thread of control from the one controlling the session.
+ synchronized (getFailoverMutex())
+ {
+
+ close(timeout, true);
+ }
+ }
}
private void close(long timeout, boolean sendClose) throws JMSException
@@ -822,52 +831,44 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
if (!setClosed())
{
setClosing(true);
- synchronized (getFailoverMutex())
+ // we pass null since this is not an error case
+ closeProducersAndConsumers(null);
+
+ try
{
- // We must close down all producers and consumers in an orderly fashion. This is the only method
- // that can be called from a different thread of control from the one controlling the session.
- synchronized (_messageDeliveryLock)
+ // If the connection is open or we are in the process
+ // of closing the connection then send a cance
+ // no point otherwise as the connection will be gone
+ if (!_connection.isClosed() || _connection.isClosing())
{
- // we pass null since this is not an error case
- closeProducersAndConsumers(null);
-
- try
- {
- // If the connection is open or we are in the process
- // of closing the connection then send a cance
- // no point otherwise as the connection will be gone
- if (!_connection.isClosed() || _connection.isClosing())
- {
- if (sendClose)
- {
- sendClose(timeout);
- }
- }
- }
- catch (AMQException e)
- {
- JMSException jmse = new JMSException("Error closing session: " + e);
- jmse.setLinkedException(e);
- jmse.initCause(e);
- throw jmse;
- }
- // This is ignored because the channel is already marked as closed so the fail-over process will
- // not re-open it.
- catch (FailoverException e)
- {
- _logger.debug(
- "Got FailoverException during channel close, ignored as channel already marked as closed.");
- }
- catch (TransportException e)
- {
- throw toJMSException("Error closing session:" + e.getMessage(), e);
- }
- finally
+ if (sendClose)
{
- _connection.deregisterSession(_channelId);
+ sendClose(timeout);
}
}
}
+ catch (AMQException e)
+ {
+ JMSException jmse = new JMSException("Error closing session: " + e);
+ jmse.setLinkedException(e);
+ jmse.initCause(e);
+ throw jmse;
+ }
+ // This is ignored because the channel is already marked as closed so the fail-over process will
+ // not re-open it.
+ catch (FailoverException e)
+ {
+ _logger.debug(
+ "Got FailoverException during channel close, ignored as channel already marked as closed.");
+ }
+ catch (TransportException e)
+ {
+ throw toJMSException("Error closing session:" + e.getMessage(), e);
+ }
+ finally
+ {
+ _connection.deregisterSession(_channelId);
+ }
}
}
@@ -899,24 +900,22 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
if (!setClosed())
{
- synchronized (_messageDeliveryLock)
+ // An AMQException has an error code and message already and will be passed in when closure occurs as a
+ // result of a channel close request
+ AMQException amqe;
+ if (e instanceof AMQException)
{
- // An AMQException has an error code and message already and will be passed in when closure occurs as a
- // result of a channel close request
- AMQException amqe;
- if (e instanceof AMQException)
- {
- amqe = (AMQException) e;
- }
- else
- {
- amqe = new AMQException("Closing session forcibly", e);
- }
-
- _connection.deregisterSession(_channelId);
- closeProducersAndConsumers(amqe);
+ amqe = (AMQException) e;
+ }
+ else
+ {
+ amqe = new AMQException("Closing session forcibly", e);
}
+
+ _connection.deregisterSession(_channelId);
+ closeProducersAndConsumers(amqe);
}
+
}
protected void stopDispatcherThread()
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index b1e606b8e9..9cef1f8dce 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -598,35 +598,38 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
if (sendClose)
{
+
// The Synchronized block only needs to protect network traffic.
- synchronized (_connection.getFailoverMutex())
+
+ try
{
- try
+ // If the session is open or we are in the process
+ // of closing the session then send a cance
+ // no point otherwise as the connection will be gone
+ if (!_session.isClosed() || _session.isClosing())
{
- // If the session is open or we are in the process
- // of closing the session then send a cance
- // no point otherwise as the connection will be gone
- if (!_session.isClosed() || _session.isClosing())
+ synchronized(_session.getMessageDeliveryLock())
{
- synchronized(_session.getMessageDeliveryLock())
+ synchronized (_connection.getFailoverMutex())
{
sendCancel();
}
}
}
- catch (AMQException e)
- {
- throw new JMSAMQException("Error closing consumer: " + e, e);
- }
- catch (FailoverException e)
- {
- throw new JMSAMQException("FailoverException interrupted basic cancel.", e);
- }
- catch (TransportException e)
- {
- throw _session.toJMSException("Exception while closing consumer: " + e.getMessage(), e);
- }
}
+ catch (AMQException e)
+ {
+ throw new JMSAMQException("Error closing consumer: " + e, e);
+ }
+ catch (FailoverException e)
+ {
+ throw new JMSAMQException("FailoverException interrupted basic cancel.", e);
+ }
+ catch (TransportException e)
+ {
+ throw _session.toJMSException("Exception while closing consumer: " + e.getMessage(), e);
+ }
+
}
else
{