diff options
| author | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-07-22 16:09:40 +0000 |
|---|---|---|
| committer | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-07-22 16:09:40 +0000 |
| commit | c67cfe6cc625835ea7ed4b3af661c4a92989a57f (patch) | |
| tree | b5fed59c3e4d267f0ab42ff9877399d9e1c4796a /java/client/src | |
| parent | 4a3228c8799af99f073d8a1e215058d23a6eb0da (diff) | |
| download | qpid-python-c67cfe6cc625835ea7ed4b3af661c4a92989a57f.tar.gz | |
QPID-2657: Make Exceptions propagate to client for 0-10
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@966722 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
6 files changed, 81 insertions, 72 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index f8e18f80ee..499d138b84 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -1037,7 +1037,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { long startCloseTime = System.currentTimeMillis(); - closeAllSessions(null, timeout, startCloseTime); + closeAllSessions(null, timeout, startCloseTime); //This MUST occur after we have successfully closed all Channels/Sessions _taskPool.shutdown(); @@ -1433,39 +1433,44 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _protocolHandler.getProtocolSession().notifyError(je); } - if (_exceptionListener != null) + // get the failover mutex before trying to close + synchronized (getFailoverMutex()) { - _exceptionListener.onException(je); - } - else - { - _logger.error("Throwable Received but no listener set: " + cause.getMessage()); - } - - if (hardError(cause)) - { - try + // decide if we are going to close the session + if (hardError(cause)) { - if (_logger.isInfoEnabled()) - { - _logger.info("Closing AMQConnection due to :" + cause.getMessage()); - } - closer = (!_closed.getAndSet(true)) || closer; - if (closer) { - closeAllSessions(cause, -1, -1); // FIXME: when doing this end up with RejectedExecutionException from executor. + _logger.info("Closing AMQConnection due to :" + cause); } } - catch (JMSException e) + else { - _logger.error("Error closing all sessions: " + e, e); + _logger.info("Not a hard-error connection not closing: " + cause); + } + + // deliver the exception if there is a listener + if (_exceptionListener != null) + { + _exceptionListener.onException(je); + } + else + { + _logger.error("Throwable Received but no listener set: " + cause); + } + + // if we are closing the connection, close sessions first + if (closer) + { + try + { + closeAllSessions(cause, -1, -1); // FIXME: when doing this end up with RejectedExecutionException from executor. + } + catch (JMSException e) + { + _logger.error("Error closing all sessions: " + e, e); + } } - - } - else - { - _logger.info("Not a hard-error connection not closing: " + cause.getMessage()); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 8f67274f53..2ee0a86e7c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -165,13 +165,20 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec _conn._connected = true; _conn.setUsername(_qpidConnection.getUserID()); _conn._failoverPolicy.attainedConnection(); - } catch (ProtocolVersionException pe) + } + catch (ProtocolVersionException pe) { return new ProtocolVersion(pe.getMajor(), pe.getMinor()); - } catch (ConnectionException e) + } + catch (ConnectionException ce) { - throw new AMQException(AMQConstant.CHANNEL_ERROR, - "cannot connect to broker", e); + AMQConstant code = AMQConstant.REPLY_SUCCESS; + if (ce.getClose() != null && ce.getClose().getReplyCode() != null) + { + code = AMQConstant.getConstant(ce.getClose().getReplyCode().getValue()); + } + String msg = "Cannot connect to broker: " + ce.getMessage(); + throw new AMQException(code, msg, ce); } return null; diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index f5bfea0155..a95380d821 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -128,7 +128,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic * The latest qpid Exception that has been raised. */ private Object _currentExceptionLock = new Object(); - private SessionException _currentException; + private AMQException _currentException; // a ref on the qpid connection protected org.apache.qpid.transport.Connection _qpidConnection; @@ -827,20 +827,9 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { if (_currentException != null) { - SessionException se = _currentException; + AMQException amqe = _currentException; _currentException = null; - ExecutionException ee = se.getException(); - int code; - if (ee == null) - { - code = 0; - } - else - { - code = ee.getErrorCode().getValue(); - } - throw new AMQException - (AMQConstant.getConstant(code), se.getMessage(), se); + throw amqe; } } } @@ -869,7 +858,19 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { synchronized (_currentExceptionLock) { - _currentException = exc; + ExecutionException ee = exc.getException(); + int code; + if (ee == null) + { + code = AMQConstant.INTERNAL_ERROR.getCode(); + } + else + { + code = ee.getErrorCode().getValue(); + } + AMQException amqe = new AMQException(AMQConstant.getConstant(code), exc.getMessage(), exc.getCause()); + _connection.exceptionReceived(amqe); + _currentException = amqe; } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 9d597d8290..c275905a67 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -139,36 +139,24 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM */ @Override public void notifyMessage(AbstractJMSMessage jmsMessage) { - boolean messageOk = false; try { - messageOk = checkPreConditions(jmsMessage); - } - catch (AMQException e) - { - _logger.error("Receivecd an Exception when receiving message",e); - try - { - - getSession().getAMQConnection().getExceptionListener() - .onException(new JMSAMQException("Error when receiving message", e)); - } - catch (Exception e1) + if (checkPreConditions(jmsMessage)) { - // we should silently log thie exception as it only hanppens when the connection is closed - _logger.error("Exception when receiving message", e1); + if (isMessageListenerSet() && capacity == 0) + { + _0_10session.getQpidSession().messageFlow(getConsumerTagString(), + MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); + } + _logger.debug("messageOk, trying to notify"); + super.notifyMessage(jmsMessage); } } - if (messageOk) + catch (AMQException e) { - if (isMessageListenerSet() && capacity == 0) - { - _0_10session.getQpidSession().messageFlow(getConsumerTagString(), - MessageCreditUnit.MESSAGE, 1, - Option.UNRELIABLE); - } - _logger.debug("messageOk, trying to notify"); - super.notifyMessage(jmsMessage); + _logger.error("Receivecd an Exception when receiving message",e); + getSession().getAMQConnection().exceptionReceived(e); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 6b7525b796..f874ea08f2 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -29,6 +29,7 @@ import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.Message; +import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQDestination.DestSyntax; import org.apache.qpid.client.message.AMQMessageDelegate_0_10; import org.apache.qpid.client.message.AbstractJMSMessage; @@ -220,11 +221,11 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer if (sync) { ssn.sync(); + ((AMQSession_0_10) getSession()).getCurrentException(); } - } - catch (RuntimeException e) + catch (Exception e) { JMSException jmse = new JMSException("Exception when sending message"); jmse.setLinkedException(e); diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java index bc82d6bc62..b392604822 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java @@ -22,6 +22,7 @@ package org.apache.qpid.client.handler; import org.apache.qpid.AMQConnectionClosedException; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQSecurityException; import org.apache.qpid.client.AMQAuthenticationException; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQState; @@ -72,12 +73,18 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<Co if (errorCode != AMQConstant.REPLY_SUCCESS) { - if (errorCode == AMQConstant.NOT_ALLOWED || (errorCode == AMQConstant.ACCESS_REFUSED)) + if (errorCode == AMQConstant.NOT_ALLOWED) { _logger.info("Error :" + errorCode + ":" + Thread.currentThread().getName()); error = new AMQAuthenticationException(errorCode, reason == null ? null : reason.toString(), null); } + else if (errorCode == AMQConstant.ACCESS_REFUSED) + { + _logger.info("Error :" + errorCode + ":" + Thread.currentThread().getName()); + + error = new AMQSecurityException(reason == null ? null : reason.toString(), null); + } else { _logger.info("Connection close received with error code " + errorCode); |
