From c67cfe6cc625835ea7ed4b3af661c4a92989a57f Mon Sep 17 00:00:00 2001 From: Andrew Donald Kennedy Date: Thu, 22 Jul 2010 16:09:40 +0000 Subject: QPID-2657: Make Exceptions propagate to client for 0-10 git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@966722 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/subscription/Subscription_0_10.java | 2 +- .../qpid/server/transport/ServerConnection.java | 2 +- .../server/transport/ServerSessionDelegate.java | 146 +++++++++++++-------- .../java/org/apache/qpid/client/AMQConnection.java | 57 ++++---- .../qpid/client/AMQConnectionDelegate_0_10.java | 15 ++- .../org/apache/qpid/client/AMQSession_0_10.java | 31 ++--- .../qpid/client/BasicMessageConsumer_0_10.java | 36 ++--- .../qpid/client/BasicMessageProducer_0_10.java | 5 +- .../handler/ConnectionCloseMethodHandler.java | 9 +- .../org/apache/qpid/transport/ServerDelegate.java | 22 +--- .../java/org/apache/qpid/transport/Session.java | 6 +- .../org/apache/qpid/transport/SessionDelegate.java | 1 + .../qpid/server/security/acl/SimpleACLTest.java | 33 ++--- .../org/apache/qpid/systest/GlobalQueuesTest.java | 1 + .../org/apache/qpid/systest/TestingBaseCase.java | 24 +--- .../unit/client/connection/ConnectionTest.java | 2 +- 16 files changed, 206 insertions(+), 186 deletions(-) (limited to 'java') diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index a800ea3328..8b5064e19d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -79,7 +79,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr public void stateChange(Subscription sub, State oldState, State newState) { - + // TODO something ? log a message here ? } }; private AMQQueue _queue; diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java index 58dbc95224..e71782b116 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java @@ -105,7 +105,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException { ExecutionException ex = new ExecutionException(); - ex.setErrorCode(ExecutionErrorCode.RESOURCE_LIMIT_EXCEEDED); + ex.setErrorCode(ExecutionErrorCode.get(cause.getCode())); ex.setDescription(message); ((ServerSession)session).invoke(ex); diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index 73ec7f1231..95ac75bc34 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -26,7 +26,6 @@ import java.util.Collection; import java.util.Map; import org.apache.qpid.AMQException; -import org.apache.qpid.AMQSecurityException; import org.apache.qpid.AMQUnknownExchangeType; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; @@ -233,8 +232,13 @@ public class ServerSessionDelegate extends SessionDelegate } catch (AMQException e) { - // TODO - throw new RuntimeException(e); + ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR; + if (e.getErrorCode() != null) + { + errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode()); + } + String description = "Cannot subscribe to '" + destination + "': " + e.getMessage(); + exception(session, method, errorCode, description); } } } @@ -259,7 +263,7 @@ public class ServerSessionDelegate extends SessionDelegate { exchange = exchangeRegistry.getDefaultExchange(); } - + DeliveryProperties delvProps = null; if(xfr.getHeader() != null && (delvProps = xfr.getHeader().get(DeliveryProperties.class)) != null && delvProps.hasTtl() && !delvProps.hasExpiration()) @@ -268,6 +272,17 @@ public class ServerSessionDelegate extends SessionDelegate } MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr); + + if (!getVirtualHost(ssn).getSecurityManager().authorisePublish(messageMetaData.isImmediate(), messageMetaData.getRoutingKey(), exchange.getName())) + { + ExecutionErrorCode errorCode = ExecutionErrorCode.UNAUTHORIZED_ACCESS; + String description = "Permission denied: exchange-name '" + exchange.getName() + "'"; + exception(ssn, xfr, errorCode, description); + + ssn.processed(xfr); + return; + } + final MessageStore store = getVirtualHost(ssn).getMessageStore(); StoredMessage storeMessage = store.addMessage(messageMetaData); ByteBuffer body = xfr.getBody(); @@ -365,8 +380,13 @@ public class ServerSessionDelegate extends SessionDelegate } catch (AMQException e) { - //TODO - throw new RuntimeException(e); + ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR; + if (e.getErrorCode() != null) + { + errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode()); + } + String description = "Cannot flush subscription '" + destination + "': " + e.getMessage(); + exception(session, method, errorCode, description); } } } @@ -453,17 +473,15 @@ public class ServerSessionDelegate extends SessionDelegate { exception(session, method, ExecutionErrorCode.NOT_FOUND, "Unknown Exchange Type: " + method.getType()); } - catch (AMQSecurityException e) - { - ExecutionErrorCode errorCode = ExecutionErrorCode.NOT_ALLOWED; - String description = "Permission denied: exchange-name '" + exchangeName + "'"; - - exception(session, method, errorCode, description); - } catch (AMQException e) { - //TODO - throw new RuntimeException(e); + ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR; + if (e.getErrorCode() != null) + { + errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode()); + } + String description = "Cannot declare exchange '" + exchangeName + "': " + e.getMessage(); + exception(session, method, errorCode, description); } } else @@ -486,6 +504,7 @@ public class ServerSessionDelegate extends SessionDelegate session.invoke(ex); + session.close(); } private Exchange getExchange(Session session, String exchangeName) @@ -543,14 +562,15 @@ public class ServerSessionDelegate extends SessionDelegate { exception(session, method, ExecutionErrorCode.PRECONDITION_FAILED, "Exchange in use"); } - catch (AMQSecurityException e) - { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Permission denied: " + method.getExchange()); - } catch (AMQException e) { - // TODO - throw new RuntimeException(e); + ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR; + if (e.getErrorCode() != null) + { + errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode()); + } + String description = "Cannot delete exchange '" + method.getExchange() + "': " + e.getMessage(); + exception(session, method, errorCode, description); } } @@ -630,10 +650,15 @@ public class ServerSessionDelegate extends SessionDelegate { virtualHost.getBindingFactory().addBinding(method.getBindingKey(), queue, exchange, method.getArguments()); } - catch (AMQSecurityException e) + catch (AMQException e) { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Bind Exchange: '" + method.getExchange() - + "' to Queue: '" + method.getQueue() + "' not allowed"); + ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR; + if (e.getErrorCode() != null) + { + errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode()); + } + String description = "Cannot add binding '" + method.getBindingKey() + "': " + e.getMessage(); + exception(session, method, errorCode, description); } } else @@ -686,9 +711,15 @@ public class ServerSessionDelegate extends SessionDelegate { virtualHost.getBindingFactory().removeBinding(method.getBindingKey(), queue, exchange, null); } - catch (AMQSecurityException e) + catch (AMQException e) { - exception(session,method, ExecutionErrorCode.NOT_ALLOWED, "Permission denied"); + ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR; + if (e.getErrorCode() != null) + { + errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode()); + } + String description = "Cannot remove binding '" + method.getBindingKey() + "': " + e.getMessage(); + exception(session, method, errorCode, description); } } } @@ -801,7 +832,7 @@ public class ServerSessionDelegate extends SessionDelegate } @Override - public void queueDeclare(Session session, QueueDeclare method) + public void queueDeclare(Session session, final QueueDeclare method) { VirtualHost virtualHost = getVirtualHost(session); @@ -909,8 +940,13 @@ public class ServerSessionDelegate extends SessionDelegate } catch (AMQException e) { - // TODO - throw new RuntimeException(e); + ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR; + if (e.getErrorCode() != null) + { + errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode()); + } + String description = "Cannot delete '" + method.getQueue() + "': " + e.getMessage(); + exception(session, method, errorCode, description); } } }; @@ -948,16 +984,15 @@ public class ServerSessionDelegate extends SessionDelegate }); } } - catch (AMQSecurityException e) - { - String description = "Cannot declare queue('" + queueName + "'), permission denied"; - ExecutionErrorCode errorCode = ExecutionErrorCode.NOT_ALLOWED; - exception(session, method, errorCode, description); - } catch (AMQException e) { - // TODO - throw new RuntimeException(e); + ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR; + if (e.getErrorCode() != null) + { + errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode()); + } + String description = "Cannot declare queue '" + queueName + "': " + e.getMessage(); + exception(session, method, errorCode, description); } } } @@ -976,7 +1011,7 @@ public class ServerSessionDelegate extends SessionDelegate } protected AMQQueue createQueue(final String queueName, - QueueDeclare body, + final QueueDeclare body, VirtualHost virtualHost, final ServerSession session) throws AMQException @@ -1003,8 +1038,13 @@ public class ServerSessionDelegate extends SessionDelegate } catch (AMQException e) { - //TODO - throw new RuntimeException(e); + ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR; + if (e.getErrorCode() != null) + { + errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode()); + } + String description = "Cannot delete queue '" + body.getQueue() + "': " + e.getMessage(); + exception(session, body, errorCode, description); } } } @@ -1071,14 +1111,15 @@ public class ServerSessionDelegate extends SessionDelegate store.removeQueue(queue); } } - catch (AMQSecurityException e) - { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Permission denied: " + queueName); - } catch (AMQException e) { - // TODO - throw new RuntimeException(e); + ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR; + if (e.getErrorCode() != null) + { + errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode()); + } + String description = "Cannot delete queue '" + queueName + "': " + e.getMessage(); + exception(session, method, errorCode, description); } } } @@ -1107,14 +1148,15 @@ public class ServerSessionDelegate extends SessionDelegate { queue.clearQueue(); } - catch (AMQSecurityException e) - { - exception(session,method, ExecutionErrorCode.NOT_ALLOWED, "Permission denied: " + queueName); - } catch (AMQException e) { - // TODO - throw new RuntimeException(e); + ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR; + if (e.getErrorCode() != null) + { + errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode()); + } + String description = "Cannot purge queue '" + queueName + "': " + e.getMessage(); + exception(session, method, errorCode, description); } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index f8e18f80ee..499d138b84 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -1037,7 +1037,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { long startCloseTime = System.currentTimeMillis(); - closeAllSessions(null, timeout, startCloseTime); + closeAllSessions(null, timeout, startCloseTime); //This MUST occur after we have successfully closed all Channels/Sessions _taskPool.shutdown(); @@ -1433,39 +1433,44 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _protocolHandler.getProtocolSession().notifyError(je); } - if (_exceptionListener != null) + // get the failover mutex before trying to close + synchronized (getFailoverMutex()) { - _exceptionListener.onException(je); - } - else - { - _logger.error("Throwable Received but no listener set: " + cause.getMessage()); - } - - if (hardError(cause)) - { - try + // decide if we are going to close the session + if (hardError(cause)) { - if (_logger.isInfoEnabled()) - { - _logger.info("Closing AMQConnection due to :" + cause.getMessage()); - } - closer = (!_closed.getAndSet(true)) || closer; - if (closer) { - closeAllSessions(cause, -1, -1); // FIXME: when doing this end up with RejectedExecutionException from executor. + _logger.info("Closing AMQConnection due to :" + cause); } } - catch (JMSException e) + else { - _logger.error("Error closing all sessions: " + e, e); + _logger.info("Not a hard-error connection not closing: " + cause); + } + + // deliver the exception if there is a listener + if (_exceptionListener != null) + { + _exceptionListener.onException(je); + } + else + { + _logger.error("Throwable Received but no listener set: " + cause); + } + + // if we are closing the connection, close sessions first + if (closer) + { + try + { + closeAllSessions(cause, -1, -1); // FIXME: when doing this end up with RejectedExecutionException from executor. + } + catch (JMSException e) + { + _logger.error("Error closing all sessions: " + e, e); + } } - - } - else - { - _logger.info("Not a hard-error connection not closing: " + cause.getMessage()); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 8f67274f53..2ee0a86e7c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -165,13 +165,20 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec _conn._connected = true; _conn.setUsername(_qpidConnection.getUserID()); _conn._failoverPolicy.attainedConnection(); - } catch (ProtocolVersionException pe) + } + catch (ProtocolVersionException pe) { return new ProtocolVersion(pe.getMajor(), pe.getMinor()); - } catch (ConnectionException e) + } + catch (ConnectionException ce) { - throw new AMQException(AMQConstant.CHANNEL_ERROR, - "cannot connect to broker", e); + AMQConstant code = AMQConstant.REPLY_SUCCESS; + if (ce.getClose() != null && ce.getClose().getReplyCode() != null) + { + code = AMQConstant.getConstant(ce.getClose().getReplyCode().getValue()); + } + String msg = "Cannot connect to broker: " + ce.getMessage(); + throw new AMQException(code, msg, ce); } return null; diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index f5bfea0155..a95380d821 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -128,7 +128,7 @@ public class AMQSession_0_10 extends AMQSession linkedException -> cause = AMQException.403 + // JMSException -> linkedException -> cause = AMQException (403 or 320) Exception linkedException = e.getLinkedException(); assertNotNull("There was no linked exception", linkedException); Throwable cause = linkedException.getCause(); assertNotNull("Cause was null", cause); - assertTrue("Wrong linked exception type",cause instanceof AMQException); - assertEquals("Incorrect error code received", 403, ((AMQException) cause).getErrorCode().getCode()); + assertTrue("Wrong linked exception type", cause instanceof AMQException); + AMQConstant errorCode = isBroker010() ? AMQConstant.CONTEXT_IN_USE : AMQConstant.ACCESS_REFUSED; + assertEquals("Incorrect error code received", errorCode, ((AMQException) cause).getErrorCode()); } } @@ -166,7 +167,6 @@ public class SimpleACLTest extends AbstractACLTestCase } } - // XXX two public void testServerDeleteQueueFailure() throws Exception { try @@ -188,12 +188,12 @@ public class SimpleACLTest extends AbstractACLTestCase } catch (JMSException e) { - // XXX JMSException -> linedException = AMQException.403 + // JMSException -> linedException = AMQException.403 check403Exception(e.getLinkedException()); } } - public void testClientConsumeFromTempQueueValid() throws AMQException, URLSyntaxException, Exception + public void testClientConsumeFromTempQueueSuccess() throws AMQException, URLSyntaxException, Exception { try { @@ -213,7 +213,7 @@ public class SimpleACLTest extends AbstractACLTestCase } } - public void testClientConsumeFromNamedQueueInvalid() throws NamingException, Exception + public void testClientConsumeFromNamedQueueFailure() throws NamingException, Exception { try { @@ -225,8 +225,6 @@ public class SimpleACLTest extends AbstractACLTestCase sess.createConsumer(sess.createQueue("IllegalQueue")); - conn.close(); - fail("Test failed as consumer was created."); } catch (JMSException e) @@ -235,7 +233,7 @@ public class SimpleACLTest extends AbstractACLTestCase } } - public void testClientCreateTemporaryQueue() throws JMSException, URLSyntaxException, Exception + public void testClientCreateTemporaryQueueSuccess() throws JMSException, URLSyntaxException, Exception { try { @@ -257,7 +255,7 @@ public class SimpleACLTest extends AbstractACLTestCase } } - public void testClientCreateNamedQueue() throws NamingException, JMSException, AMQException, Exception + public void testClientCreateNamedQueueFailure() throws NamingException, JMSException, AMQException, Exception { try { @@ -275,7 +273,6 @@ public class SimpleACLTest extends AbstractACLTestCase } catch (AMQException e) { - // XXX AMQException.403 check403Exception(e); } } @@ -405,8 +402,6 @@ public class SimpleACLTest extends AbstractACLTestCase conn.start(); sess.createConsumer(sess.createQueue("Invalid")); - - conn.close(); fail("Test failed as consumer was created."); } @@ -520,7 +515,7 @@ public class SimpleACLTest extends AbstractACLTestCase /** * This test uses both the cilent and sender to validate that the Server is able to publish to a temporary queue. - * The reason the client must be in volved is that the Serve is unable to create its own Temporary Queues. + * The reason the client must be involved is that the Server is unable to create its own Temporary Queues. * * @throws AMQException * @throws URLSyntaxException diff --git a/java/systests/src/main/java/org/apache/qpid/systest/GlobalQueuesTest.java b/java/systests/src/main/java/org/apache/qpid/systest/GlobalQueuesTest.java index e0934faf44..9ff143daf3 100644 --- a/java/systests/src/main/java/org/apache/qpid/systest/GlobalQueuesTest.java +++ b/java/systests/src/main/java/org/apache/qpid/systest/GlobalQueuesTest.java @@ -61,6 +61,7 @@ public class GlobalQueuesTest extends TestingBaseCase */ + /** * VirtualHost Plugin Configuration diff --git a/java/systests/src/main/java/org/apache/qpid/systest/TestingBaseCase.java b/java/systests/src/main/java/org/apache/qpid/systest/TestingBaseCase.java index 9831c74574..08a7b7a6e5 100644 --- a/java/systests/src/main/java/org/apache/qpid/systest/TestingBaseCase.java +++ b/java/systests/src/main/java/org/apache/qpid/systest/TestingBaseCase.java @@ -172,22 +172,8 @@ public class TestingBaseCase extends QpidBrokerTestCase implements ExceptionList startPublisher(_destination); boolean disconnected = _disconnectionLatch.await(DISCONNECTION_WAIT, TimeUnit.SECONDS); - - if (!disconnected && isBroker010()) - { - try - { - ((AMQSession_0_10) session).sync(); - } - catch (AMQException amqe) - { - JMSException jmsException = new JMSException(amqe.getMessage()); - jmsException.setLinkedException(amqe); - jmsException.initCause(amqe); - _connectionException = jmsException; - } - } - + + assertTrue("Client was not disconnected", disconnected); assertTrue("Client was not disconnected.", _connectionException != null); Exception linked = _connectionException.getLinkedException(); @@ -209,11 +195,11 @@ public class TestingBaseCase extends QpidBrokerTestCase implements ExceptionList assertNotNull("No linked exception set on:" + _connectionException.getMessage(), linked); - assertEquals("Incorrect linked exception received.", AMQChannelClosedException.class, linked.getClass()); + assertTrue("Incorrect linked exception received.", linked instanceof AMQException); - AMQChannelClosedException ccException = (AMQChannelClosedException) linked; + AMQException amqException = (AMQException) linked; - assertEquals("Channel was not closed with correct code.", AMQConstant.RESOURCE_ERROR, ccException.getErrorCode()); + assertEquals("Channel was not closed with correct code.", AMQConstant.RESOURCE_ERROR, amqException.getErrorCode()); } diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java index fb23d80843..8bb588a036 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java @@ -145,7 +145,7 @@ public class ConnectionTest extends QpidBrokerTestCase catch (AMQConnectionFailureException amqe) { assertNotNull("No cause set:" + amqe.getMessage(), amqe.getCause()); - assertEquals("Exception was wrong type", AMQAuthenticationException.class, amqe.getCause().getClass()); + assertTrue("Exception was wrong type", amqe.getCause() instanceof AMQException); } finally { -- cgit v1.2.1