diff options
| author | Keith Wall <kwall@apache.org> | 2015-02-10 16:15:08 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2015-02-10 16:15:08 +0000 |
| commit | 085486ebe5ff21133b9caf1c31625ac6ea356568 (patch) | |
| tree | 7acbe9ca99a345dca71f9f80cd3e29ea4e3710f0 /qpid/java/client | |
| parent | 60c62c03ca404e98e4fbd1abf4a5ebf50763d604 (diff) | |
| parent | e2e6d542b8cde9e702d1c3b63376e9d8380ba1c7 (diff) | |
| download | qpid-python-085486ebe5ff21133b9caf1c31625ac6ea356568.tar.gz | |
merge from trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1658748 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
6 files changed, 249 insertions, 180 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 70d91ad817..4c596b88a0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -32,6 +32,7 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -64,7 +65,9 @@ import org.apache.qpid.AMQUnresolvedAddressException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.client.security.CallbackHandlerRegistry; import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.common.QpidProperties; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; @@ -192,6 +195,22 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private boolean _compressMessages; private int _messageCompressionThresholdSize; + static + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Qpid version : " + QpidProperties.getVersionString()); + } + + // The registering of any additional SASL mechanisms with the Java Security API requires + // SecurityManager permissions. In execution environments such as web containers, + // this may require adjustments to the Java security.policy. + CallbackHandlerRegistry registry = CallbackHandlerRegistry.getInstance(); + if (_logger.isDebugEnabled()) + { + _logger.debug("Loaded mechanisms " + registry.getMechanisms()); + } + } /** * @param broker brokerdetails * @param username username @@ -847,13 +866,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public void close(long timeout) throws JMSException { - close(new ArrayList<AMQSession>(_sessions.values()), timeout); - } + boolean closed; - public void close(List<AMQSession> sessions, long timeout) throws JMSException - { - if (!setClosed()) + synchronized (_sessionCreationLock) + { + closed = setClosed(); + } + + if (!closed) { + List<AMQSession> sessions = new ArrayList<>(_sessions.values()); + setClosing(true); try { @@ -868,54 +891,52 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private void doClose(List<AMQSession> sessions, long timeout) throws JMSException { - synchronized (_sessionCreationLock) + if (!sessions.isEmpty()) { - if (!sessions.isEmpty()) + AMQSession session = sessions.remove(0); + synchronized (session.getMessageDeliveryLock()) { - AMQSession session = sessions.remove(0); - synchronized (session.getMessageDeliveryLock()) - { - doClose(sessions, timeout); - } + doClose(sessions, timeout); } - else + } + else + { + synchronized (getFailoverMutex()) { - synchronized (getFailoverMutex()) + try { try { - try - { - closeAllSessions(null, timeout); - } - finally - { - //This MUST occur after we have successfully closed all Channels/Sessions - shutdownTaskPool(timeout); - } + closeAllSessions(null, timeout); } - catch (JMSException e) + finally { - _logger.error("Error closing connection", e); - JMSException jmse = new JMSException("Error closing connection: " + e); - jmse.setLinkedException(e); - jmse.initCause(e); - throw jmse; + //This MUST occur after we have successfully closed all Channels/Sessions + shutdownTaskPool(timeout); } - finally + } + catch (JMSException e) + { + _logger.error("Error closing connection", e); + JMSException jmse = new JMSException("Error closing connection: " + e); + jmse.setLinkedException(e); + jmse.initCause(e); + throw jmse; + } + finally + { + try + { + _delegate.closeConnection(timeout); + } + catch (Exception e) { - try - { - _delegate.closeConnection(timeout); - } - catch (Exception e) - { - _logger.warn("Error closing underlying protocol connection", e); - } + _logger.warn("Error closing underlying protocol connection", e); } } } } + } private void shutdownTaskPool(final long timeout) @@ -1290,28 +1311,29 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect try { - // get the failover mutex before trying to close - synchronized (getFailoverMutex()) + // decide if we are going to close the session + if (hardError(cause)) { - // decide if we are going to close the session - if (hardError(cause)) - { - closer = (!setClosed()) || closer; - { - _logger.info("Closing AMQConnection due to :" + cause); - } - } - else + closer = (!setClosed()) || closer; { - _logger.info("Not a hard-error connection not closing: " + cause); + _logger.info("Closing AMQConnection due to :" + cause); } + } + else + { + _logger.info("Not a hard-error connection not closing: " + cause); + } - // if we are closing the connection, close sessions first - if (closer) + + // if we are closing the connection, close sessions first + if (closer) + { + // get the failover mutex before trying to close + synchronized (getFailoverMutex()) { try { - closeAllSessions(cause, -1); // FIXME: when doing this end up with RejectedExecutionException from executor. + closeAllSessions(cause, -1); } catch (JMSException e) { @@ -1328,16 +1350,32 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private void deliverJMSExceptionToExceptionListenerOrLog(final JMSException je, final Throwable cause) { - // deliver the exception if there is a listener - ExceptionListener exceptionListener = getExceptionListenerNoCheck(); + final ExceptionListener exceptionListener = getExceptionListenerNoCheck(); if (exceptionListener != null) { - exceptionListener.onException(je); - } - else + performConnectionTask(new Runnable() + { + @Override + public void run() + { + // deliver the exception if there is a listener + try + { + exceptionListener.onException(je); + } + catch (RuntimeException e) + { + _logger.error("Exception occurred in ExceptionListener", e); + } + } + }); + } + else { _logger.error("Throwable Received but no listener set: " + cause); } + + } private boolean hardError(Throwable cause) @@ -1448,7 +1486,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public void performConnectionTask(Runnable task) { - _taskPool.execute(task); + try + { + _taskPool.execute(task); + } + catch (RejectedExecutionException e) + { + if(!(isClosed() || isClosing())) + { + throw e; + } + } } public AMQSession getSession(int channelId) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 4e9164c3b0..fdeab7ae70 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -291,7 +291,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec public void closed(Connection conn) { - ConnectionException exc = exception; + final ConnectionException exc = exception; exception = null; if (exc == null) @@ -299,7 +299,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec return; } - ConnectionClose close = exc.getClose(); + final ConnectionClose close = exc.getClose(); if (close == null || close.getReplyCode() == ConnectionCloseCode.CONNECTION_FORCED) { _conn.getProtocolHandler().setFailoverLatch(new CountDownLatch(1)); @@ -332,23 +332,31 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec _conn.setClosed(); - ExceptionListener listener = _conn.getExceptionListenerNoCheck(); + final ExceptionListener listener = _conn.getExceptionListenerNoCheck(); if (listener == null) { _logger.error("connection exception: " + conn, exc); } else { - String code = null; - if (close != null) + _conn.performConnectionTask(new Runnable() { - code = close.getReplyCode().toString(); - } + @Override + public void run() + { + String code = null; + if (close != null) + { + code = close.getReplyCode().toString(); + } + + JMSException ex = new JMSException(exc.getMessage(), code); + ex.setLinkedException(exc); + ex.initCause(exc); + listener.onException(ex); + } + }); - JMSException ex = new JMSException(exc.getMessage(), code); - ex.setLinkedException(exc); - ex.initCause(exc); - listener.onException(ex); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 29460bb42d..12e9285af8 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -808,7 +808,16 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic */ public void close(long timeout) throws JMSException { - close(timeout, true); + synchronized (_messageDeliveryLock) + { + // We must close down all producers and consumers in an orderly fashion. This is the only method + // that can be called from a different thread of control from the one controlling the session. + synchronized (getFailoverMutex()) + { + + close(timeout, true); + } + } } private void close(long timeout, boolean sendClose) throws JMSException @@ -822,52 +831,44 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic if (!setClosed()) { setClosing(true); - synchronized (getFailoverMutex()) + // we pass null since this is not an error case + closeProducersAndConsumers(null); + + try { - // We must close down all producers and consumers in an orderly fashion. This is the only method - // that can be called from a different thread of control from the one controlling the session. - synchronized (_messageDeliveryLock) + // If the connection is open or we are in the process + // of closing the connection then send a cance + // no point otherwise as the connection will be gone + if (!_connection.isClosed() || _connection.isClosing()) { - // we pass null since this is not an error case - closeProducersAndConsumers(null); - - try - { - // If the connection is open or we are in the process - // of closing the connection then send a cance - // no point otherwise as the connection will be gone - if (!_connection.isClosed() || _connection.isClosing()) - { - if (sendClose) - { - sendClose(timeout); - } - } - } - catch (AMQException e) - { - JMSException jmse = new JMSException("Error closing session: " + e); - jmse.setLinkedException(e); - jmse.initCause(e); - throw jmse; - } - // This is ignored because the channel is already marked as closed so the fail-over process will - // not re-open it. - catch (FailoverException e) - { - _logger.debug( - "Got FailoverException during channel close, ignored as channel already marked as closed."); - } - catch (TransportException e) - { - throw toJMSException("Error closing session:" + e.getMessage(), e); - } - finally + if (sendClose) { - _connection.deregisterSession(_channelId); + sendClose(timeout); } } } + catch (AMQException e) + { + JMSException jmse = new JMSException("Error closing session: " + e); + jmse.setLinkedException(e); + jmse.initCause(e); + throw jmse; + } + // This is ignored because the channel is already marked as closed so the fail-over process will + // not re-open it. + catch (FailoverException e) + { + _logger.debug( + "Got FailoverException during channel close, ignored as channel already marked as closed."); + } + catch (TransportException e) + { + throw toJMSException("Error closing session:" + e.getMessage(), e); + } + finally + { + _connection.deregisterSession(_channelId); + } } } @@ -899,24 +900,22 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic if (!setClosed()) { - synchronized (_messageDeliveryLock) + // An AMQException has an error code and message already and will be passed in when closure occurs as a + // result of a channel close request + AMQException amqe; + if (e instanceof AMQException) { - // An AMQException has an error code and message already and will be passed in when closure occurs as a - // result of a channel close request - AMQException amqe; - if (e instanceof AMQException) - { - amqe = (AMQException) e; - } - else - { - amqe = new AMQException("Closing session forcibly", e); - } - - _connection.deregisterSession(_channelId); - closeProducersAndConsumers(amqe); + amqe = (AMQException) e; + } + else + { + amqe = new AMQException("Closing session forcibly", e); } + + _connection.deregisterSession(_channelId); + closeProducersAndConsumers(amqe); } + } protected void stopDispatcherThread() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index bb0f0d9b13..143de271a1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -772,42 +772,47 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe private void returnBouncedMessage(final ReturnMessage msg) { - getAMQConnection().performConnectionTask(new Runnable() + try { - public void run() + // Bounced message is processed here, away from the mina thread + AbstractJMSMessage bouncedMessage = + getMessageFactoryRegistry().createMessage(0, + false, + msg.getExchange(), + msg.getRoutingKey(), + msg.getContentHeader(), + msg.getBodies(), + _queueDestinationCache, + _topicDestinationCache, + AMQDestination.UNKNOWN_TYPE); + AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode()); + AMQShortString reason = msg.getReplyText(); + _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); + + // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. + if (errorCode == AMQConstant.NO_CONSUMERS) { - try - { - // Bounced message is processed here, away from the mina thread - AbstractJMSMessage bouncedMessage = - getMessageFactoryRegistry().createMessage(0, false, msg.getExchange(), - msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies(), _queueDestinationCache, - _topicDestinationCache, AMQDestination.UNKNOWN_TYPE); - AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode()); - AMQShortString reason = msg.getReplyText(); - _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); - - // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. - if (errorCode == AMQConstant.NO_CONSUMERS) - { - getAMQConnection().exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null)); - } else if (errorCode == AMQConstant.NO_ROUTE) - { - getAMQConnection().exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null)); - } else - { - getAMQConnection().exceptionReceived( - new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage, null)); - } - - } catch (Exception e) - { - _logger.error( - "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", - e); - } + getAMQConnection().exceptionReceived(new AMQNoConsumersException("Error: " + reason, + bouncedMessage, + null)); } - }); + else if (errorCode == AMQConstant.NO_ROUTE) + { + getAMQConnection().exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null)); + } + else + { + getAMQConnection().exceptionReceived( + new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage, null)); + } + + } + catch (Exception e) + { + _logger.error( + "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", + e); + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index b1e606b8e9..9cef1f8dce 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -598,35 +598,38 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa if (sendClose) { + // The Synchronized block only needs to protect network traffic. - synchronized (_connection.getFailoverMutex()) + + try { - try + // If the session is open or we are in the process + // of closing the session then send a cance + // no point otherwise as the connection will be gone + if (!_session.isClosed() || _session.isClosing()) { - // If the session is open or we are in the process - // of closing the session then send a cance - // no point otherwise as the connection will be gone - if (!_session.isClosed() || _session.isClosing()) + synchronized(_session.getMessageDeliveryLock()) { - synchronized(_session.getMessageDeliveryLock()) + synchronized (_connection.getFailoverMutex()) { sendCancel(); } } } - catch (AMQException e) - { - throw new JMSAMQException("Error closing consumer: " + e, e); - } - catch (FailoverException e) - { - throw new JMSAMQException("FailoverException interrupted basic cancel.", e); - } - catch (TransportException e) - { - throw _session.toJMSException("Exception while closing consumer: " + e.getMessage(), e); - } } + catch (AMQException e) + { + throw new JMSAMQException("Error closing consumer: " + e, e); + } + catch (FailoverException e) + { + throw new JMSAMQException("FailoverException interrupted basic cancel.", e); + } + catch (TransportException e) + { + throw _session.toJMSException("Exception while closing consumer: " + e.getMessage(), e); + } + } else { diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java b/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java index 009598d8a4..ceb2a323ca 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java @@ -20,14 +20,14 @@ */ package org.apache.qpid.client; +import java.io.IOException; + import org.apache.qpid.AMQException; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.url.URLSyntaxException; -import java.io.IOException; - public class MockAMQConnection extends AMQConnection { public MockAMQConnection(String broker, String username, String password, String clientName, String virtualHost) @@ -60,4 +60,10 @@ public class MockAMQConnection extends AMQConnection { return super.getDelegate(); } + + @Override + public void performConnectionTask(final Runnable task) + { + task.run(); + } } |
