summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorAndrew Donald Kennedy <grkvlt@apache.org>2010-07-22 16:09:40 +0000
committerAndrew Donald Kennedy <grkvlt@apache.org>2010-07-22 16:09:40 +0000
commitc67cfe6cc625835ea7ed4b3af661c4a92989a57f (patch)
treeb5fed59c3e4d267f0ab42ff9877399d9e1c4796a /java/client/src
parent4a3228c8799af99f073d8a1e215058d23a6eb0da (diff)
downloadqpid-python-c67cfe6cc625835ea7ed4b3af661c4a92989a57f.tar.gz
QPID-2657: Make Exceptions propagate to client for 0-10
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@966722 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java57
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java15
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java31
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java36
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java9
6 files changed, 81 insertions, 72 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index f8e18f80ee..499d138b84 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -1037,7 +1037,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
long startCloseTime = System.currentTimeMillis();
- closeAllSessions(null, timeout, startCloseTime);
+ closeAllSessions(null, timeout, startCloseTime);
//This MUST occur after we have successfully closed all Channels/Sessions
_taskPool.shutdown();
@@ -1433,39 +1433,44 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_protocolHandler.getProtocolSession().notifyError(je);
}
- if (_exceptionListener != null)
+ // get the failover mutex before trying to close
+ synchronized (getFailoverMutex())
{
- _exceptionListener.onException(je);
- }
- else
- {
- _logger.error("Throwable Received but no listener set: " + cause.getMessage());
- }
-
- if (hardError(cause))
- {
- try
+ // decide if we are going to close the session
+ if (hardError(cause))
{
- if (_logger.isInfoEnabled())
- {
- _logger.info("Closing AMQConnection due to :" + cause.getMessage());
- }
-
closer = (!_closed.getAndSet(true)) || closer;
- if (closer)
{
- closeAllSessions(cause, -1, -1); // FIXME: when doing this end up with RejectedExecutionException from executor.
+ _logger.info("Closing AMQConnection due to :" + cause);
}
}
- catch (JMSException e)
+ else
{
- _logger.error("Error closing all sessions: " + e, e);
+ _logger.info("Not a hard-error connection not closing: " + cause);
+ }
+
+ // deliver the exception if there is a listener
+ if (_exceptionListener != null)
+ {
+ _exceptionListener.onException(je);
+ }
+ else
+ {
+ _logger.error("Throwable Received but no listener set: " + cause);
+ }
+
+ // if we are closing the connection, close sessions first
+ if (closer)
+ {
+ try
+ {
+ closeAllSessions(cause, -1, -1); // FIXME: when doing this end up with RejectedExecutionException from executor.
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error closing all sessions: " + e, e);
+ }
}
-
- }
- else
- {
- _logger.info("Not a hard-error connection not closing: " + cause.getMessage());
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index 8f67274f53..2ee0a86e7c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -165,13 +165,20 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
_conn._connected = true;
_conn.setUsername(_qpidConnection.getUserID());
_conn._failoverPolicy.attainedConnection();
- } catch (ProtocolVersionException pe)
+ }
+ catch (ProtocolVersionException pe)
{
return new ProtocolVersion(pe.getMajor(), pe.getMinor());
- } catch (ConnectionException e)
+ }
+ catch (ConnectionException ce)
{
- throw new AMQException(AMQConstant.CHANNEL_ERROR,
- "cannot connect to broker", e);
+ AMQConstant code = AMQConstant.REPLY_SUCCESS;
+ if (ce.getClose() != null && ce.getClose().getReplyCode() != null)
+ {
+ code = AMQConstant.getConstant(ce.getClose().getReplyCode().getValue());
+ }
+ String msg = "Cannot connect to broker: " + ce.getMessage();
+ throw new AMQException(code, msg, ce);
}
return null;
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index f5bfea0155..a95380d821 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -128,7 +128,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
* The latest qpid Exception that has been raised.
*/
private Object _currentExceptionLock = new Object();
- private SessionException _currentException;
+ private AMQException _currentException;
// a ref on the qpid connection
protected org.apache.qpid.transport.Connection _qpidConnection;
@@ -827,20 +827,9 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
if (_currentException != null)
{
- SessionException se = _currentException;
+ AMQException amqe = _currentException;
_currentException = null;
- ExecutionException ee = se.getException();
- int code;
- if (ee == null)
- {
- code = 0;
- }
- else
- {
- code = ee.getErrorCode().getValue();
- }
- throw new AMQException
- (AMQConstant.getConstant(code), se.getMessage(), se);
+ throw amqe;
}
}
}
@@ -869,7 +858,19 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
synchronized (_currentExceptionLock)
{
- _currentException = exc;
+ ExecutionException ee = exc.getException();
+ int code;
+ if (ee == null)
+ {
+ code = AMQConstant.INTERNAL_ERROR.getCode();
+ }
+ else
+ {
+ code = ee.getErrorCode().getValue();
+ }
+ AMQException amqe = new AMQException(AMQConstant.getConstant(code), exc.getMessage(), exc.getCause());
+ _connection.exceptionReceived(amqe);
+ _currentException = amqe;
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 9d597d8290..c275905a67 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -139,36 +139,24 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
*/
@Override public void notifyMessage(AbstractJMSMessage jmsMessage)
{
- boolean messageOk = false;
try
{
- messageOk = checkPreConditions(jmsMessage);
- }
- catch (AMQException e)
- {
- _logger.error("Receivecd an Exception when receiving message",e);
- try
- {
-
- getSession().getAMQConnection().getExceptionListener()
- .onException(new JMSAMQException("Error when receiving message", e));
- }
- catch (Exception e1)
+ if (checkPreConditions(jmsMessage))
{
- // we should silently log thie exception as it only hanppens when the connection is closed
- _logger.error("Exception when receiving message", e1);
+ if (isMessageListenerSet() && capacity == 0)
+ {
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
+ }
+ _logger.debug("messageOk, trying to notify");
+ super.notifyMessage(jmsMessage);
}
}
- if (messageOk)
+ catch (AMQException e)
{
- if (isMessageListenerSet() && capacity == 0)
- {
- _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1,
- Option.UNRELIABLE);
- }
- _logger.debug("messageOk, trying to notify");
- super.notifyMessage(jmsMessage);
+ _logger.error("Receivecd an Exception when receiving message",e);
+ getSession().getAMQConnection().exceptionReceived(e);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index 6b7525b796..f874ea08f2 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -29,6 +29,7 @@ import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
+import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
import org.apache.qpid.client.message.AbstractJMSMessage;
@@ -220,11 +221,11 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
if (sync)
{
ssn.sync();
+ ((AMQSession_0_10) getSession()).getCurrentException();
}
-
}
- catch (RuntimeException e)
+ catch (Exception e)
{
JMSException jmse = new JMSException("Exception when sending message");
jmse.setLinkedException(e);
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
index bc82d6bc62..b392604822 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
@@ -22,6 +22,7 @@ package org.apache.qpid.client.handler;
import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.client.AMQAuthenticationException;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQState;
@@ -72,12 +73,18 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<Co
if (errorCode != AMQConstant.REPLY_SUCCESS)
{
- if (errorCode == AMQConstant.NOT_ALLOWED || (errorCode == AMQConstant.ACCESS_REFUSED))
+ if (errorCode == AMQConstant.NOT_ALLOWED)
{
_logger.info("Error :" + errorCode + ":" + Thread.currentThread().getName());
error = new AMQAuthenticationException(errorCode, reason == null ? null : reason.toString(), null);
}
+ else if (errorCode == AMQConstant.ACCESS_REFUSED)
+ {
+ _logger.info("Error :" + errorCode + ":" + Thread.currentThread().getName());
+
+ error = new AMQSecurityException(reason == null ? null : reason.toString(), null);
+ }
else
{
_logger.info("Connection close received with error code " + errorCode);