diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2015-02-10 09:32:24 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2015-02-10 09:32:24 +0000 |
| commit | da8935e0491a2b8e9edb6671e5874b77516ee2a9 (patch) | |
| tree | da388901de29284fc33849d906b80c52a0a8ee07 /qpid/java | |
| parent | fd62e0144c11a78253174f44fb84aa0964c20ec6 (diff) | |
| download | qpid-python-da8935e0491a2b8e9edb6671e5874b77516ee2a9.tar.gz | |
QPID-6374 : Reorder ock aquisition and remove synchronization where it is not desired to reduce deadlocks
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1658652 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
3 files changed, 133 insertions, 128 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 6c37462011..c7fcde824a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -865,13 +865,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public void close(long timeout) throws JMSException { - close(new ArrayList<AMQSession>(_sessions.values()), timeout); - } + boolean closed; - public void close(List<AMQSession> sessions, long timeout) throws JMSException - { - if (!setClosed()) + synchronized (_sessionCreationLock) + { + closed = setClosed(); + } + + if (!closed) { + List<AMQSession> sessions = new ArrayList<>(_sessions.values()); + setClosing(true); try { @@ -886,54 +890,52 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private void doClose(List<AMQSession> sessions, long timeout) throws JMSException { - synchronized (_sessionCreationLock) + if (!sessions.isEmpty()) { - if (!sessions.isEmpty()) + AMQSession session = sessions.remove(0); + synchronized (session.getMessageDeliveryLock()) { - AMQSession session = sessions.remove(0); - synchronized (session.getMessageDeliveryLock()) - { - doClose(sessions, timeout); - } + doClose(sessions, timeout); } - else + } + else + { + synchronized (getFailoverMutex()) { - synchronized (getFailoverMutex()) + try { try { - try - { - closeAllSessions(null, timeout); - } - finally - { - //This MUST occur after we have successfully closed all Channels/Sessions - shutdownTaskPool(timeout); - } + closeAllSessions(null, timeout); } - catch (JMSException e) + finally { - _logger.error("Error closing connection", e); - JMSException jmse = new JMSException("Error closing connection: " + e); - jmse.setLinkedException(e); - jmse.initCause(e); - throw jmse; + //This MUST occur after we have successfully closed all Channels/Sessions + shutdownTaskPool(timeout); } - finally + } + catch (JMSException e) + { + _logger.error("Error closing connection", e); + JMSException jmse = new JMSException("Error closing connection: " + e); + jmse.setLinkedException(e); + jmse.initCause(e); + throw jmse; + } + finally + { + try { - try - { - _delegate.closeConnection(timeout); - } - catch (Exception e) - { - _logger.warn("Error closing underlying protocol connection", e); - } + _delegate.closeConnection(timeout); + } + catch (Exception e) + { + _logger.warn("Error closing underlying protocol connection", e); } } } } + } private void shutdownTaskPool(final long timeout) @@ -1308,28 +1310,29 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect try { - // get the failover mutex before trying to close - synchronized (getFailoverMutex()) + // decide if we are going to close the session + if (hardError(cause)) { - // decide if we are going to close the session - if (hardError(cause)) + closer = (!setClosed()) || closer; { - closer = (!setClosed()) || closer; - { - _logger.info("Closing AMQConnection due to :" + cause); - } - } - else - { - _logger.info("Not a hard-error connection not closing: " + cause); + _logger.info("Closing AMQConnection due to :" + cause); } + } + else + { + _logger.info("Not a hard-error connection not closing: " + cause); + } + - // if we are closing the connection, close sessions first - if (closer) + // if we are closing the connection, close sessions first + if (closer) + { + // get the failover mutex before trying to close + synchronized (getFailoverMutex()) { try { - closeAllSessions(cause, -1); // FIXME: when doing this end up with RejectedExecutionException from executor. + closeAllSessions(cause, -1); } catch (JMSException e) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 29460bb42d..12e9285af8 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -808,7 +808,16 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic */ public void close(long timeout) throws JMSException { - close(timeout, true); + synchronized (_messageDeliveryLock) + { + // We must close down all producers and consumers in an orderly fashion. This is the only method + // that can be called from a different thread of control from the one controlling the session. + synchronized (getFailoverMutex()) + { + + close(timeout, true); + } + } } private void close(long timeout, boolean sendClose) throws JMSException @@ -822,52 +831,44 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic if (!setClosed()) { setClosing(true); - synchronized (getFailoverMutex()) + // we pass null since this is not an error case + closeProducersAndConsumers(null); + + try { - // We must close down all producers and consumers in an orderly fashion. This is the only method - // that can be called from a different thread of control from the one controlling the session. - synchronized (_messageDeliveryLock) + // If the connection is open or we are in the process + // of closing the connection then send a cance + // no point otherwise as the connection will be gone + if (!_connection.isClosed() || _connection.isClosing()) { - // we pass null since this is not an error case - closeProducersAndConsumers(null); - - try - { - // If the connection is open or we are in the process - // of closing the connection then send a cance - // no point otherwise as the connection will be gone - if (!_connection.isClosed() || _connection.isClosing()) - { - if (sendClose) - { - sendClose(timeout); - } - } - } - catch (AMQException e) - { - JMSException jmse = new JMSException("Error closing session: " + e); - jmse.setLinkedException(e); - jmse.initCause(e); - throw jmse; - } - // This is ignored because the channel is already marked as closed so the fail-over process will - // not re-open it. - catch (FailoverException e) - { - _logger.debug( - "Got FailoverException during channel close, ignored as channel already marked as closed."); - } - catch (TransportException e) - { - throw toJMSException("Error closing session:" + e.getMessage(), e); - } - finally + if (sendClose) { - _connection.deregisterSession(_channelId); + sendClose(timeout); } } } + catch (AMQException e) + { + JMSException jmse = new JMSException("Error closing session: " + e); + jmse.setLinkedException(e); + jmse.initCause(e); + throw jmse; + } + // This is ignored because the channel is already marked as closed so the fail-over process will + // not re-open it. + catch (FailoverException e) + { + _logger.debug( + "Got FailoverException during channel close, ignored as channel already marked as closed."); + } + catch (TransportException e) + { + throw toJMSException("Error closing session:" + e.getMessage(), e); + } + finally + { + _connection.deregisterSession(_channelId); + } } } @@ -899,24 +900,22 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic if (!setClosed()) { - synchronized (_messageDeliveryLock) + // An AMQException has an error code and message already and will be passed in when closure occurs as a + // result of a channel close request + AMQException amqe; + if (e instanceof AMQException) { - // An AMQException has an error code and message already and will be passed in when closure occurs as a - // result of a channel close request - AMQException amqe; - if (e instanceof AMQException) - { - amqe = (AMQException) e; - } - else - { - amqe = new AMQException("Closing session forcibly", e); - } - - _connection.deregisterSession(_channelId); - closeProducersAndConsumers(amqe); + amqe = (AMQException) e; + } + else + { + amqe = new AMQException("Closing session forcibly", e); } + + _connection.deregisterSession(_channelId); + closeProducersAndConsumers(amqe); } + } protected void stopDispatcherThread() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index b1e606b8e9..9cef1f8dce 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -598,35 +598,38 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa if (sendClose) { + // The Synchronized block only needs to protect network traffic. - synchronized (_connection.getFailoverMutex()) + + try { - try + // If the session is open or we are in the process + // of closing the session then send a cance + // no point otherwise as the connection will be gone + if (!_session.isClosed() || _session.isClosing()) { - // If the session is open or we are in the process - // of closing the session then send a cance - // no point otherwise as the connection will be gone - if (!_session.isClosed() || _session.isClosing()) + synchronized(_session.getMessageDeliveryLock()) { - synchronized(_session.getMessageDeliveryLock()) + synchronized (_connection.getFailoverMutex()) { sendCancel(); } } } - catch (AMQException e) - { - throw new JMSAMQException("Error closing consumer: " + e, e); - } - catch (FailoverException e) - { - throw new JMSAMQException("FailoverException interrupted basic cancel.", e); - } - catch (TransportException e) - { - throw _session.toJMSException("Exception while closing consumer: " + e.getMessage(), e); - } } + catch (AMQException e) + { + throw new JMSAMQException("Error closing consumer: " + e, e); + } + catch (FailoverException e) + { + throw new JMSAMQException("FailoverException interrupted basic cancel.", e); + } + catch (TransportException e) + { + throw _session.toJMSException("Exception while closing consumer: " + e.getMessage(), e); + } + } else { |
