diff options
| author | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-07-22 16:09:40 +0000 |
|---|---|---|
| committer | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-07-22 16:09:40 +0000 |
| commit | c67cfe6cc625835ea7ed4b3af661c4a92989a57f (patch) | |
| tree | b5fed59c3e4d267f0ab42ff9877399d9e1c4796a /java | |
| parent | 4a3228c8799af99f073d8a1e215058d23a6eb0da (diff) | |
| download | qpid-python-c67cfe6cc625835ea7ed4b3af661c4a92989a57f.tar.gz | |
QPID-2657: Make Exceptions propagate to client for 0-10
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@966722 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
16 files changed, 206 insertions, 186 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index a800ea3328..8b5064e19d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -79,7 +79,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr public void stateChange(Subscription sub, State oldState, State newState) { - + // TODO something ? log a message here ? } }; private AMQQueue _queue; diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java index 58dbc95224..e71782b116 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java @@ -105,7 +105,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException { ExecutionException ex = new ExecutionException(); - ex.setErrorCode(ExecutionErrorCode.RESOURCE_LIMIT_EXCEEDED); + ex.setErrorCode(ExecutionErrorCode.get(cause.getCode())); ex.setDescription(message); ((ServerSession)session).invoke(ex); diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index 73ec7f1231..95ac75bc34 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -26,7 +26,6 @@ import java.util.Collection; import java.util.Map; import org.apache.qpid.AMQException; -import org.apache.qpid.AMQSecurityException; import org.apache.qpid.AMQUnknownExchangeType; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; @@ -233,8 +232,13 @@ public class ServerSessionDelegate extends SessionDelegate } catch (AMQException e) { - // TODO - throw new RuntimeException(e); + ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR; + if (e.getErrorCode() != null) + { + errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode()); + } + String description = "Cannot subscribe to '" + destination + "': " + e.getMessage(); + exception(session, method, errorCode, description); } } } @@ -259,7 +263,7 @@ public class ServerSessionDelegate extends SessionDelegate { exchange = exchangeRegistry.getDefaultExchange(); } - + DeliveryProperties delvProps = null; if(xfr.getHeader() != null && (delvProps = xfr.getHeader().get(DeliveryProperties.class)) != null && delvProps.hasTtl() && !delvProps.hasExpiration()) @@ -268,6 +272,17 @@ public class ServerSessionDelegate extends SessionDelegate } MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr); + + if (!getVirtualHost(ssn).getSecurityManager().authorisePublish(messageMetaData.isImmediate(), messageMetaData.getRoutingKey(), exchange.getName())) + { + ExecutionErrorCode errorCode = ExecutionErrorCode.UNAUTHORIZED_ACCESS; + String description = "Permission denied: exchange-name '" + exchange.getName() + "'"; + exception(ssn, xfr, errorCode, description); + + ssn.processed(xfr); + return; + } + final MessageStore store = getVirtualHost(ssn).getMessageStore(); StoredMessage<MessageMetaData_0_10> storeMessage = store.addMessage(messageMetaData); ByteBuffer body = xfr.getBody(); @@ -365,8 +380,13 @@ public class ServerSessionDelegate extends SessionDelegate } catch (AMQException e) { - //TODO - throw new RuntimeException(e); + ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR; + if (e.getErrorCode() != null) + { + errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode()); + } + String description = "Cannot flush subscription '" + destination + "': " + e.getMessage(); + exception(session, method, errorCode, description); } } } @@ -453,17 +473,15 @@ public class ServerSessionDelegate extends SessionDelegate { exception(session, method, ExecutionErrorCode.NOT_FOUND, "Unknown Exchange Type: " + method.getType()); } - catch (AMQSecurityException e) - { - ExecutionErrorCode errorCode = ExecutionErrorCode.NOT_ALLOWED; - String description = "Permission denied: exchange-name '" + exchangeName + "'"; - - exception(session, method, errorCode, description); - } catch (AMQException e) { - //TODO - throw new RuntimeException(e); + ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR; + if (e.getErrorCode() != null) + { + errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode()); + } + String description = "Cannot declare exchange '" + exchangeName + "': " + e.getMessage(); + exception(session, method, errorCode, description); } } else @@ -486,6 +504,7 @@ public class ServerSessionDelegate extends SessionDelegate session.invoke(ex); + session.close(); } private Exchange getExchange(Session session, String exchangeName) @@ -543,14 +562,15 @@ public class ServerSessionDelegate extends SessionDelegate { exception(session, method, ExecutionErrorCode.PRECONDITION_FAILED, "Exchange in use"); } - catch (AMQSecurityException e) - { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Permission denied: " + method.getExchange()); - } catch (AMQException e) { - // TODO - throw new RuntimeException(e); + ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR; + if (e.getErrorCode() != null) + { + errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode()); + } + String description = "Cannot delete exchange '" + method.getExchange() + "': " + e.getMessage(); + exception(session, method, errorCode, description); } } @@ -630,10 +650,15 @@ public class ServerSessionDelegate extends SessionDelegate { virtualHost.getBindingFactory().addBinding(method.getBindingKey(), queue, exchange, method.getArguments()); } - catch (AMQSecurityException e) + catch (AMQException e) { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Bind Exchange: '" + method.getExchange() - + "' to Queue: '" + method.getQueue() + "' not allowed"); + ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR; + if (e.getErrorCode() != null) + { + errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode()); + } + String description = "Cannot add binding '" + method.getBindingKey() + "': " + e.getMessage(); + exception(session, method, errorCode, description); } } else @@ -686,9 +711,15 @@ public class ServerSessionDelegate extends SessionDelegate { virtualHost.getBindingFactory().removeBinding(method.getBindingKey(), queue, exchange, null); } - catch (AMQSecurityException e) + catch (AMQException e) { - exception(session,method, ExecutionErrorCode.NOT_ALLOWED, "Permission denied"); + ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR; + if (e.getErrorCode() != null) + { + errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode()); + } + String description = "Cannot remove binding '" + method.getBindingKey() + "': " + e.getMessage(); + exception(session, method, errorCode, description); } } } @@ -801,7 +832,7 @@ public class ServerSessionDelegate extends SessionDelegate } @Override - public void queueDeclare(Session session, QueueDeclare method) + public void queueDeclare(Session session, final QueueDeclare method) { VirtualHost virtualHost = getVirtualHost(session); @@ -909,8 +940,13 @@ public class ServerSessionDelegate extends SessionDelegate } catch (AMQException e) { - // TODO - throw new RuntimeException(e); + ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR; + if (e.getErrorCode() != null) + { + errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode()); + } + String description = "Cannot delete '" + method.getQueue() + "': " + e.getMessage(); + exception(session, method, errorCode, description); } } }; @@ -948,16 +984,15 @@ public class ServerSessionDelegate extends SessionDelegate }); } } - catch (AMQSecurityException e) - { - String description = "Cannot declare queue('" + queueName + "'), permission denied"; - ExecutionErrorCode errorCode = ExecutionErrorCode.NOT_ALLOWED; - exception(session, method, errorCode, description); - } catch (AMQException e) { - // TODO - throw new RuntimeException(e); + ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR; + if (e.getErrorCode() != null) + { + errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode()); + } + String description = "Cannot declare queue '" + queueName + "': " + e.getMessage(); + exception(session, method, errorCode, description); } } } @@ -976,7 +1011,7 @@ public class ServerSessionDelegate extends SessionDelegate } protected AMQQueue createQueue(final String queueName, - QueueDeclare body, + final QueueDeclare body, VirtualHost virtualHost, final ServerSession session) throws AMQException @@ -1003,8 +1038,13 @@ public class ServerSessionDelegate extends SessionDelegate } catch (AMQException e) { - //TODO - throw new RuntimeException(e); + ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR; + if (e.getErrorCode() != null) + { + errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode()); + } + String description = "Cannot delete queue '" + body.getQueue() + "': " + e.getMessage(); + exception(session, body, errorCode, description); } } } @@ -1071,14 +1111,15 @@ public class ServerSessionDelegate extends SessionDelegate store.removeQueue(queue); } } - catch (AMQSecurityException e) - { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Permission denied: " + queueName); - } catch (AMQException e) { - // TODO - throw new RuntimeException(e); + ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR; + if (e.getErrorCode() != null) + { + errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode()); + } + String description = "Cannot delete queue '" + queueName + "': " + e.getMessage(); + exception(session, method, errorCode, description); } } } @@ -1107,14 +1148,15 @@ public class ServerSessionDelegate extends SessionDelegate { queue.clearQueue(); } - catch (AMQSecurityException e) - { - exception(session,method, ExecutionErrorCode.NOT_ALLOWED, "Permission denied: " + queueName); - } catch (AMQException e) { - // TODO - throw new RuntimeException(e); + ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR; + if (e.getErrorCode() != null) + { + errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode()); + } + String description = "Cannot purge queue '" + queueName + "': " + e.getMessage(); + exception(session, method, errorCode, description); } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index f8e18f80ee..499d138b84 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -1037,7 +1037,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { long startCloseTime = System.currentTimeMillis(); - closeAllSessions(null, timeout, startCloseTime); + closeAllSessions(null, timeout, startCloseTime); //This MUST occur after we have successfully closed all Channels/Sessions _taskPool.shutdown(); @@ -1433,39 +1433,44 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _protocolHandler.getProtocolSession().notifyError(je); } - if (_exceptionListener != null) + // get the failover mutex before trying to close + synchronized (getFailoverMutex()) { - _exceptionListener.onException(je); - } - else - { - _logger.error("Throwable Received but no listener set: " + cause.getMessage()); - } - - if (hardError(cause)) - { - try + // decide if we are going to close the session + if (hardError(cause)) { - if (_logger.isInfoEnabled()) - { - _logger.info("Closing AMQConnection due to :" + cause.getMessage()); - } - closer = (!_closed.getAndSet(true)) || closer; - if (closer) { - closeAllSessions(cause, -1, -1); // FIXME: when doing this end up with RejectedExecutionException from executor. + _logger.info("Closing AMQConnection due to :" + cause); } } - catch (JMSException e) + else { - _logger.error("Error closing all sessions: " + e, e); + _logger.info("Not a hard-error connection not closing: " + cause); + } + + // deliver the exception if there is a listener + if (_exceptionListener != null) + { + _exceptionListener.onException(je); + } + else + { + _logger.error("Throwable Received but no listener set: " + cause); + } + + // if we are closing the connection, close sessions first + if (closer) + { + try + { + closeAllSessions(cause, -1, -1); // FIXME: when doing this end up with RejectedExecutionException from executor. + } + catch (JMSException e) + { + _logger.error("Error closing all sessions: " + e, e); + } } - - } - else - { - _logger.info("Not a hard-error connection not closing: " + cause.getMessage()); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 8f67274f53..2ee0a86e7c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -165,13 +165,20 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec _conn._connected = true; _conn.setUsername(_qpidConnection.getUserID()); _conn._failoverPolicy.attainedConnection(); - } catch (ProtocolVersionException pe) + } + catch (ProtocolVersionException pe) { return new ProtocolVersion(pe.getMajor(), pe.getMinor()); - } catch (ConnectionException e) + } + catch (ConnectionException ce) { - throw new AMQException(AMQConstant.CHANNEL_ERROR, - "cannot connect to broker", e); + AMQConstant code = AMQConstant.REPLY_SUCCESS; + if (ce.getClose() != null && ce.getClose().getReplyCode() != null) + { + code = AMQConstant.getConstant(ce.getClose().getReplyCode().getValue()); + } + String msg = "Cannot connect to broker: " + ce.getMessage(); + throw new AMQException(code, msg, ce); } return null; diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index f5bfea0155..a95380d821 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -128,7 +128,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic * The latest qpid Exception that has been raised. */ private Object _currentExceptionLock = new Object(); - private SessionException _currentException; + private AMQException _currentException; // a ref on the qpid connection protected org.apache.qpid.transport.Connection _qpidConnection; @@ -827,20 +827,9 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { if (_currentException != null) { - SessionException se = _currentException; + AMQException amqe = _currentException; _currentException = null; - ExecutionException ee = se.getException(); - int code; - if (ee == null) - { - code = 0; - } - else - { - code = ee.getErrorCode().getValue(); - } - throw new AMQException - (AMQConstant.getConstant(code), se.getMessage(), se); + throw amqe; } } } @@ -869,7 +858,19 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { synchronized (_currentExceptionLock) { - _currentException = exc; + ExecutionException ee = exc.getException(); + int code; + if (ee == null) + { + code = AMQConstant.INTERNAL_ERROR.getCode(); + } + else + { + code = ee.getErrorCode().getValue(); + } + AMQException amqe = new AMQException(AMQConstant.getConstant(code), exc.getMessage(), exc.getCause()); + _connection.exceptionReceived(amqe); + _currentException = amqe; } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 9d597d8290..c275905a67 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -139,36 +139,24 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM */ @Override public void notifyMessage(AbstractJMSMessage jmsMessage) { - boolean messageOk = false; try { - messageOk = checkPreConditions(jmsMessage); - } - catch (AMQException e) - { - _logger.error("Receivecd an Exception when receiving message",e); - try - { - - getSession().getAMQConnection().getExceptionListener() - .onException(new JMSAMQException("Error when receiving message", e)); - } - catch (Exception e1) + if (checkPreConditions(jmsMessage)) { - // we should silently log thie exception as it only hanppens when the connection is closed - _logger.error("Exception when receiving message", e1); + if (isMessageListenerSet() && capacity == 0) + { + _0_10session.getQpidSession().messageFlow(getConsumerTagString(), + MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); + } + _logger.debug("messageOk, trying to notify"); + super.notifyMessage(jmsMessage); } } - if (messageOk) + catch (AMQException e) { - if (isMessageListenerSet() && capacity == 0) - { - _0_10session.getQpidSession().messageFlow(getConsumerTagString(), - MessageCreditUnit.MESSAGE, 1, - Option.UNRELIABLE); - } - _logger.debug("messageOk, trying to notify"); - super.notifyMessage(jmsMessage); + _logger.error("Receivecd an Exception when receiving message",e); + getSession().getAMQConnection().exceptionReceived(e); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 6b7525b796..f874ea08f2 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -29,6 +29,7 @@ import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.Message; +import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQDestination.DestSyntax; import org.apache.qpid.client.message.AMQMessageDelegate_0_10; import org.apache.qpid.client.message.AbstractJMSMessage; @@ -220,11 +221,11 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer if (sync) { ssn.sync(); + ((AMQSession_0_10) getSession()).getCurrentException(); } - } - catch (RuntimeException e) + catch (Exception e) { JMSException jmse = new JMSException("Exception when sending message"); jmse.setLinkedException(e); diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java index bc82d6bc62..b392604822 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java @@ -22,6 +22,7 @@ package org.apache.qpid.client.handler; import org.apache.qpid.AMQConnectionClosedException; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQSecurityException; import org.apache.qpid.client.AMQAuthenticationException; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQState; @@ -72,12 +73,18 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<Co if (errorCode != AMQConstant.REPLY_SUCCESS) { - if (errorCode == AMQConstant.NOT_ALLOWED || (errorCode == AMQConstant.ACCESS_REFUSED)) + if (errorCode == AMQConstant.NOT_ALLOWED) { _logger.info("Error :" + errorCode + ":" + Thread.currentThread().getName()); error = new AMQAuthenticationException(errorCode, reason == null ? null : reason.toString(), null); } + else if (errorCode == AMQConstant.ACCESS_REFUSED) + { + _logger.info("Error :" + errorCode + ":" + Thread.currentThread().getName()); + + error = new AMQSecurityException(reason == null ? null : reason.toString(), null); + } else { _logger.info("Connection close received with error code " + errorCode); diff --git a/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java index 453921ea2b..9c56d36ade 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java @@ -20,29 +20,19 @@ */ package org.apache.qpid.transport; -import java.util.Collections; - +import static org.apache.qpid.transport.Connection.State.*; -import java.util.ArrayList; -import java.util.HashMap; +import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; - -import java.io.UnsupportedEncodingException; import org.apache.qpid.QpidException; import javax.security.sasl.Sasl; -import javax.security.sasl.SaslClient; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; -import static org.apache.qpid.transport.Connection.State.*; - - /** * ServerDelegate * @@ -96,8 +86,7 @@ public class ServerDelegate extends ConnectionDelegate SaslServer ss = createSaslServer(mechanism); if (ss == null) { - conn.connectionClose - (ConnectionCloseCode.CONNECTION_FORCED, + conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, "null SASL mechanism: " + mechanism); return; } @@ -107,14 +96,14 @@ public class ServerDelegate extends ConnectionDelegate catch (SaslException e) { conn.exception(e); + conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage()); } } protected SaslServer createSaslServer(String mechanism) throws SaslException { - SaslServer ss = Sasl.createSaslServer - (mechanism, "AMQP", "localhost", null, null); + SaslServer ss = Sasl.createSaslServer(mechanism, "AMQP", "localhost", null, null); return ss; } @@ -141,6 +130,7 @@ public class ServerDelegate extends ConnectionDelegate catch (SaslException e) { conn.exception(e); + conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage()); } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java index d9a8e5550c..f361012c79 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -734,11 +734,7 @@ public class Session extends SessionInvoker if (lt(maxComplete, point)) { - if (state == CLOSED) - { - throw new SessionException(getException()); - } - else + if (state != CLOSED) { throw new SessionException (String.format diff --git a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java index 6146f029b2..05f3947654 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java @@ -139,6 +139,7 @@ public class SessionDelegate @Override public void executionException(Session ssn, ExecutionException exc) { ssn.setException(exc); + ssn.getSessionListener().exception(ssn, new SessionException(exc)); } @Override public void messageTransfer(Session ssn, MessageTransfer xfr) diff --git a/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java b/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java index ee2938f2fe..28125f2f19 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java @@ -38,6 +38,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.url.URLSyntaxException; /** @@ -59,7 +60,7 @@ import org.apache.qpid.url.URLSyntaxException; */ public class SimpleACLTest extends AbstractACLTestCase { - public void testAccessAuthorized() throws AMQException, URLSyntaxException, Exception + public void testAccessAuthorizedSuccess() throws AMQException, URLSyntaxException, Exception { try { @@ -78,7 +79,7 @@ public class SimpleACLTest extends AbstractACLTestCase } } - public void testAccessVhostAuthorisedGuest() throws IOException, Exception + public void testAccessVhostAuthorisedGuestSuccess() throws IOException, Exception { //The 'guest' user has no access to the 'test' vhost, as tested below in testAccessNoRights(), and so //is unable to perform actions such as connecting (and by extension, creating a queue, and consuming @@ -117,8 +118,7 @@ public class SimpleACLTest extends AbstractACLTestCase } } - // XXX one - public void testAccessNoRights() throws Exception + public void testAccessNoRightsFailure() throws Exception { try { @@ -131,13 +131,14 @@ public class SimpleACLTest extends AbstractACLTestCase } catch (JMSException e) { - // XXX JMSException -> linkedException -> cause = AMQException.403 + // JMSException -> linkedException -> cause = AMQException (403 or 320) Exception linkedException = e.getLinkedException(); assertNotNull("There was no linked exception", linkedException); Throwable cause = linkedException.getCause(); assertNotNull("Cause was null", cause); - assertTrue("Wrong linked exception type",cause instanceof AMQException); - assertEquals("Incorrect error code received", 403, ((AMQException) cause).getErrorCode().getCode()); + assertTrue("Wrong linked exception type", cause instanceof AMQException); + AMQConstant errorCode = isBroker010() ? AMQConstant.CONTEXT_IN_USE : AMQConstant.ACCESS_REFUSED; + assertEquals("Incorrect error code received", errorCode, ((AMQException) cause).getErrorCode()); } } @@ -166,7 +167,6 @@ public class SimpleACLTest extends AbstractACLTestCase } } - // XXX two public void testServerDeleteQueueFailure() throws Exception { try @@ -188,12 +188,12 @@ public class SimpleACLTest extends AbstractACLTestCase } catch (JMSException e) { - // XXX JMSException -> linedException = AMQException.403 + // JMSException -> linedException = AMQException.403 check403Exception(e.getLinkedException()); } } - public void testClientConsumeFromTempQueueValid() throws AMQException, URLSyntaxException, Exception + public void testClientConsumeFromTempQueueSuccess() throws AMQException, URLSyntaxException, Exception { try { @@ -213,7 +213,7 @@ public class SimpleACLTest extends AbstractACLTestCase } } - public void testClientConsumeFromNamedQueueInvalid() throws NamingException, Exception + public void testClientConsumeFromNamedQueueFailure() throws NamingException, Exception { try { @@ -225,8 +225,6 @@ public class SimpleACLTest extends AbstractACLTestCase sess.createConsumer(sess.createQueue("IllegalQueue")); - conn.close(); - fail("Test failed as consumer was created."); } catch (JMSException e) @@ -235,7 +233,7 @@ public class SimpleACLTest extends AbstractACLTestCase } } - public void testClientCreateTemporaryQueue() throws JMSException, URLSyntaxException, Exception + public void testClientCreateTemporaryQueueSuccess() throws JMSException, URLSyntaxException, Exception { try { @@ -257,7 +255,7 @@ public class SimpleACLTest extends AbstractACLTestCase } } - public void testClientCreateNamedQueue() throws NamingException, JMSException, AMQException, Exception + public void testClientCreateNamedQueueFailure() throws NamingException, JMSException, AMQException, Exception { try { @@ -275,7 +273,6 @@ public class SimpleACLTest extends AbstractACLTestCase } catch (AMQException e) { - // XXX AMQException.403 check403Exception(e); } } @@ -405,8 +402,6 @@ public class SimpleACLTest extends AbstractACLTestCase conn.start(); sess.createConsumer(sess.createQueue("Invalid")); - - conn.close(); fail("Test failed as consumer was created."); } @@ -520,7 +515,7 @@ public class SimpleACLTest extends AbstractACLTestCase /** * This test uses both the cilent and sender to validate that the Server is able to publish to a temporary queue. - * The reason the client must be in volved is that the Serve is unable to create its own Temporary Queues. + * The reason the client must be involved is that the Server is unable to create its own Temporary Queues. * * @throws AMQException * @throws URLSyntaxException diff --git a/java/systests/src/main/java/org/apache/qpid/systest/GlobalQueuesTest.java b/java/systests/src/main/java/org/apache/qpid/systest/GlobalQueuesTest.java index e0934faf44..9ff143daf3 100644 --- a/java/systests/src/main/java/org/apache/qpid/systest/GlobalQueuesTest.java +++ b/java/systests/src/main/java/org/apache/qpid/systest/GlobalQueuesTest.java @@ -61,6 +61,7 @@ public class GlobalQueuesTest extends TestingBaseCase */ + /** * VirtualHost Plugin Configuration diff --git a/java/systests/src/main/java/org/apache/qpid/systest/TestingBaseCase.java b/java/systests/src/main/java/org/apache/qpid/systest/TestingBaseCase.java index 9831c74574..08a7b7a6e5 100644 --- a/java/systests/src/main/java/org/apache/qpid/systest/TestingBaseCase.java +++ b/java/systests/src/main/java/org/apache/qpid/systest/TestingBaseCase.java @@ -172,22 +172,8 @@ public class TestingBaseCase extends QpidBrokerTestCase implements ExceptionList startPublisher(_destination); boolean disconnected = _disconnectionLatch.await(DISCONNECTION_WAIT, TimeUnit.SECONDS); - - if (!disconnected && isBroker010()) - { - try - { - ((AMQSession_0_10) session).sync(); - } - catch (AMQException amqe) - { - JMSException jmsException = new JMSException(amqe.getMessage()); - jmsException.setLinkedException(amqe); - jmsException.initCause(amqe); - _connectionException = jmsException; - } - } - + + assertTrue("Client was not disconnected", disconnected); assertTrue("Client was not disconnected.", _connectionException != null); Exception linked = _connectionException.getLinkedException(); @@ -209,11 +195,11 @@ public class TestingBaseCase extends QpidBrokerTestCase implements ExceptionList assertNotNull("No linked exception set on:" + _connectionException.getMessage(), linked); - assertEquals("Incorrect linked exception received.", AMQChannelClosedException.class, linked.getClass()); + assertTrue("Incorrect linked exception received.", linked instanceof AMQException); - AMQChannelClosedException ccException = (AMQChannelClosedException) linked; + AMQException amqException = (AMQException) linked; - assertEquals("Channel was not closed with correct code.", AMQConstant.RESOURCE_ERROR, ccException.getErrorCode()); + assertEquals("Channel was not closed with correct code.", AMQConstant.RESOURCE_ERROR, amqException.getErrorCode()); } diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java index fb23d80843..8bb588a036 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java @@ -145,7 +145,7 @@ public class ConnectionTest extends QpidBrokerTestCase catch (AMQConnectionFailureException amqe) { assertNotNull("No cause set:" + amqe.getMessage(), amqe.getCause()); - assertEquals("Exception was wrong type", AMQAuthenticationException.class, amqe.getCause().getClass()); + assertTrue("Exception was wrong type", amqe.getCause() instanceof AMQException); } finally { |
