From 4d02e072b47235cfb56635412aea6a4ed30e6869 Mon Sep 17 00:00:00 2001 From: Andrew Donald Kennedy Date: Thu, 22 Jul 2010 16:09:40 +0000 Subject: QPID-2657: Make Exceptions propagate to client for 0-10 git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@966722 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQConnection.java | 57 ++++++++++++---------- .../qpid/client/AMQConnectionDelegate_0_10.java | 15 ++++-- .../org/apache/qpid/client/AMQSession_0_10.java | 31 ++++++------ .../qpid/client/BasicMessageConsumer_0_10.java | 36 +++++--------- .../qpid/client/BasicMessageProducer_0_10.java | 5 +- .../handler/ConnectionCloseMethodHandler.java | 9 +++- 6 files changed, 81 insertions(+), 72 deletions(-) (limited to 'qpid/java/client') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index f8e18f80ee..499d138b84 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -1037,7 +1037,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { long startCloseTime = System.currentTimeMillis(); - closeAllSessions(null, timeout, startCloseTime); + closeAllSessions(null, timeout, startCloseTime); //This MUST occur after we have successfully closed all Channels/Sessions _taskPool.shutdown(); @@ -1433,39 +1433,44 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _protocolHandler.getProtocolSession().notifyError(je); } - if (_exceptionListener != null) + // get the failover mutex before trying to close + synchronized (getFailoverMutex()) { - _exceptionListener.onException(je); - } - else - { - _logger.error("Throwable Received but no listener set: " + cause.getMessage()); - } - - if (hardError(cause)) - { - try + // decide if we are going to close the session + if (hardError(cause)) { - if (_logger.isInfoEnabled()) - { - _logger.info("Closing AMQConnection due to :" + cause.getMessage()); - } - closer = (!_closed.getAndSet(true)) || closer; - if (closer) { - closeAllSessions(cause, -1, -1); // FIXME: when doing this end up with RejectedExecutionException from executor. + _logger.info("Closing AMQConnection due to :" + cause); } } - catch (JMSException e) + else { - _logger.error("Error closing all sessions: " + e, e); + _logger.info("Not a hard-error connection not closing: " + cause); + } + + // deliver the exception if there is a listener + if (_exceptionListener != null) + { + _exceptionListener.onException(je); + } + else + { + _logger.error("Throwable Received but no listener set: " + cause); + } + + // if we are closing the connection, close sessions first + if (closer) + { + try + { + closeAllSessions(cause, -1, -1); // FIXME: when doing this end up with RejectedExecutionException from executor. + } + catch (JMSException e) + { + _logger.error("Error closing all sessions: " + e, e); + } } - - } - else - { - _logger.info("Not a hard-error connection not closing: " + cause.getMessage()); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 8f67274f53..2ee0a86e7c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -165,13 +165,20 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec _conn._connected = true; _conn.setUsername(_qpidConnection.getUserID()); _conn._failoverPolicy.attainedConnection(); - } catch (ProtocolVersionException pe) + } + catch (ProtocolVersionException pe) { return new ProtocolVersion(pe.getMajor(), pe.getMinor()); - } catch (ConnectionException e) + } + catch (ConnectionException ce) { - throw new AMQException(AMQConstant.CHANNEL_ERROR, - "cannot connect to broker", e); + AMQConstant code = AMQConstant.REPLY_SUCCESS; + if (ce.getClose() != null && ce.getClose().getReplyCode() != null) + { + code = AMQConstant.getConstant(ce.getClose().getReplyCode().getValue()); + } + String msg = "Cannot connect to broker: " + ce.getMessage(); + throw new AMQException(code, msg, ce); } return null; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index f5bfea0155..a95380d821 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -128,7 +128,7 @@ public class AMQSession_0_10 extends AMQSession