summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorAndrew Donald Kennedy <grkvlt@apache.org>2010-07-22 16:09:40 +0000
committerAndrew Donald Kennedy <grkvlt@apache.org>2010-07-22 16:09:40 +0000
commitc67cfe6cc625835ea7ed4b3af661c4a92989a57f (patch)
treeb5fed59c3e4d267f0ab42ff9877399d9e1c4796a /java
parent4a3228c8799af99f073d8a1e215058d23a6eb0da (diff)
downloadqpid-python-c67cfe6cc625835ea7ed4b3af661c4a92989a57f.tar.gz
QPID-2657: Make Exceptions propagate to client for 0-10
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@966722 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java146
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java57
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java15
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java31
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java36
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java9
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java22
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Session.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java1
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java33
-rw-r--r--java/systests/src/main/java/org/apache/qpid/systest/GlobalQueuesTest.java1
-rw-r--r--java/systests/src/main/java/org/apache/qpid/systest/TestingBaseCase.java24
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java2
16 files changed, 206 insertions, 186 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
index a800ea3328..8b5064e19d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
@@ -79,7 +79,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
public void stateChange(Subscription sub, State oldState, State newState)
{
-
+ // TODO something ? log a message here ?
}
};
private AMQQueue _queue;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
index 58dbc95224..e71782b116 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
@@ -105,7 +105,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel
public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
{
ExecutionException ex = new ExecutionException();
- ex.setErrorCode(ExecutionErrorCode.RESOURCE_LIMIT_EXCEEDED);
+ ex.setErrorCode(ExecutionErrorCode.get(cause.getCode()));
ex.setDescription(message);
((ServerSession)session).invoke(ex);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index 73ec7f1231..95ac75bc34 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -26,7 +26,6 @@ import java.util.Collection;
import java.util.Map;
import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.AMQUnknownExchangeType;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
@@ -233,8 +232,13 @@ public class ServerSessionDelegate extends SessionDelegate
}
catch (AMQException e)
{
- // TODO
- throw new RuntimeException(e);
+ ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR;
+ if (e.getErrorCode() != null)
+ {
+ errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode());
+ }
+ String description = "Cannot subscribe to '" + destination + "': " + e.getMessage();
+ exception(session, method, errorCode, description);
}
}
}
@@ -259,7 +263,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
exchange = exchangeRegistry.getDefaultExchange();
}
-
+
DeliveryProperties delvProps = null;
if(xfr.getHeader() != null && (delvProps = xfr.getHeader().get(DeliveryProperties.class)) != null && delvProps.hasTtl() && !delvProps.hasExpiration())
@@ -268,6 +272,17 @@ public class ServerSessionDelegate extends SessionDelegate
}
MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr);
+
+ if (!getVirtualHost(ssn).getSecurityManager().authorisePublish(messageMetaData.isImmediate(), messageMetaData.getRoutingKey(), exchange.getName()))
+ {
+ ExecutionErrorCode errorCode = ExecutionErrorCode.UNAUTHORIZED_ACCESS;
+ String description = "Permission denied: exchange-name '" + exchange.getName() + "'";
+ exception(ssn, xfr, errorCode, description);
+
+ ssn.processed(xfr);
+ return;
+ }
+
final MessageStore store = getVirtualHost(ssn).getMessageStore();
StoredMessage<MessageMetaData_0_10> storeMessage = store.addMessage(messageMetaData);
ByteBuffer body = xfr.getBody();
@@ -365,8 +380,13 @@ public class ServerSessionDelegate extends SessionDelegate
}
catch (AMQException e)
{
- //TODO
- throw new RuntimeException(e);
+ ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR;
+ if (e.getErrorCode() != null)
+ {
+ errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode());
+ }
+ String description = "Cannot flush subscription '" + destination + "': " + e.getMessage();
+ exception(session, method, errorCode, description);
}
}
}
@@ -453,17 +473,15 @@ public class ServerSessionDelegate extends SessionDelegate
{
exception(session, method, ExecutionErrorCode.NOT_FOUND, "Unknown Exchange Type: " + method.getType());
}
- catch (AMQSecurityException e)
- {
- ExecutionErrorCode errorCode = ExecutionErrorCode.NOT_ALLOWED;
- String description = "Permission denied: exchange-name '" + exchangeName + "'";
-
- exception(session, method, errorCode, description);
- }
catch (AMQException e)
{
- //TODO
- throw new RuntimeException(e);
+ ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR;
+ if (e.getErrorCode() != null)
+ {
+ errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode());
+ }
+ String description = "Cannot declare exchange '" + exchangeName + "': " + e.getMessage();
+ exception(session, method, errorCode, description);
}
}
else
@@ -486,6 +504,7 @@ public class ServerSessionDelegate extends SessionDelegate
session.invoke(ex);
+ session.close();
}
private Exchange getExchange(Session session, String exchangeName)
@@ -543,14 +562,15 @@ public class ServerSessionDelegate extends SessionDelegate
{
exception(session, method, ExecutionErrorCode.PRECONDITION_FAILED, "Exchange in use");
}
- catch (AMQSecurityException e)
- {
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Permission denied: " + method.getExchange());
- }
catch (AMQException e)
{
- // TODO
- throw new RuntimeException(e);
+ ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR;
+ if (e.getErrorCode() != null)
+ {
+ errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode());
+ }
+ String description = "Cannot delete exchange '" + method.getExchange() + "': " + e.getMessage();
+ exception(session, method, errorCode, description);
}
}
@@ -630,10 +650,15 @@ public class ServerSessionDelegate extends SessionDelegate
{
virtualHost.getBindingFactory().addBinding(method.getBindingKey(), queue, exchange, method.getArguments());
}
- catch (AMQSecurityException e)
+ catch (AMQException e)
{
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Bind Exchange: '" + method.getExchange()
- + "' to Queue: '" + method.getQueue() + "' not allowed");
+ ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR;
+ if (e.getErrorCode() != null)
+ {
+ errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode());
+ }
+ String description = "Cannot add binding '" + method.getBindingKey() + "': " + e.getMessage();
+ exception(session, method, errorCode, description);
}
}
else
@@ -686,9 +711,15 @@ public class ServerSessionDelegate extends SessionDelegate
{
virtualHost.getBindingFactory().removeBinding(method.getBindingKey(), queue, exchange, null);
}
- catch (AMQSecurityException e)
+ catch (AMQException e)
{
- exception(session,method, ExecutionErrorCode.NOT_ALLOWED, "Permission denied");
+ ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR;
+ if (e.getErrorCode() != null)
+ {
+ errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode());
+ }
+ String description = "Cannot remove binding '" + method.getBindingKey() + "': " + e.getMessage();
+ exception(session, method, errorCode, description);
}
}
}
@@ -801,7 +832,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
@Override
- public void queueDeclare(Session session, QueueDeclare method)
+ public void queueDeclare(Session session, final QueueDeclare method)
{
VirtualHost virtualHost = getVirtualHost(session);
@@ -909,8 +940,13 @@ public class ServerSessionDelegate extends SessionDelegate
}
catch (AMQException e)
{
- // TODO
- throw new RuntimeException(e);
+ ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR;
+ if (e.getErrorCode() != null)
+ {
+ errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode());
+ }
+ String description = "Cannot delete '" + method.getQueue() + "': " + e.getMessage();
+ exception(session, method, errorCode, description);
}
}
};
@@ -948,16 +984,15 @@ public class ServerSessionDelegate extends SessionDelegate
});
}
}
- catch (AMQSecurityException e)
- {
- String description = "Cannot declare queue('" + queueName + "'), permission denied";
- ExecutionErrorCode errorCode = ExecutionErrorCode.NOT_ALLOWED;
- exception(session, method, errorCode, description);
- }
catch (AMQException e)
{
- // TODO
- throw new RuntimeException(e);
+ ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR;
+ if (e.getErrorCode() != null)
+ {
+ errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode());
+ }
+ String description = "Cannot declare queue '" + queueName + "': " + e.getMessage();
+ exception(session, method, errorCode, description);
}
}
}
@@ -976,7 +1011,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
protected AMQQueue createQueue(final String queueName,
- QueueDeclare body,
+ final QueueDeclare body,
VirtualHost virtualHost,
final ServerSession session)
throws AMQException
@@ -1003,8 +1038,13 @@ public class ServerSessionDelegate extends SessionDelegate
}
catch (AMQException e)
{
- //TODO
- throw new RuntimeException(e);
+ ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR;
+ if (e.getErrorCode() != null)
+ {
+ errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode());
+ }
+ String description = "Cannot delete queue '" + body.getQueue() + "': " + e.getMessage();
+ exception(session, body, errorCode, description);
}
}
}
@@ -1071,14 +1111,15 @@ public class ServerSessionDelegate extends SessionDelegate
store.removeQueue(queue);
}
}
- catch (AMQSecurityException e)
- {
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Permission denied: " + queueName);
- }
catch (AMQException e)
{
- // TODO
- throw new RuntimeException(e);
+ ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR;
+ if (e.getErrorCode() != null)
+ {
+ errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode());
+ }
+ String description = "Cannot delete queue '" + queueName + "': " + e.getMessage();
+ exception(session, method, errorCode, description);
}
}
}
@@ -1107,14 +1148,15 @@ public class ServerSessionDelegate extends SessionDelegate
{
queue.clearQueue();
}
- catch (AMQSecurityException e)
- {
- exception(session,method, ExecutionErrorCode.NOT_ALLOWED, "Permission denied: " + queueName);
- }
catch (AMQException e)
{
- // TODO
- throw new RuntimeException(e);
+ ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR;
+ if (e.getErrorCode() != null)
+ {
+ errorCode = ExecutionErrorCode.get(e.getErrorCode().getCode());
+ }
+ String description = "Cannot purge queue '" + queueName + "': " + e.getMessage();
+ exception(session, method, errorCode, description);
}
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index f8e18f80ee..499d138b84 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -1037,7 +1037,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
long startCloseTime = System.currentTimeMillis();
- closeAllSessions(null, timeout, startCloseTime);
+ closeAllSessions(null, timeout, startCloseTime);
//This MUST occur after we have successfully closed all Channels/Sessions
_taskPool.shutdown();
@@ -1433,39 +1433,44 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_protocolHandler.getProtocolSession().notifyError(je);
}
- if (_exceptionListener != null)
+ // get the failover mutex before trying to close
+ synchronized (getFailoverMutex())
{
- _exceptionListener.onException(je);
- }
- else
- {
- _logger.error("Throwable Received but no listener set: " + cause.getMessage());
- }
-
- if (hardError(cause))
- {
- try
+ // decide if we are going to close the session
+ if (hardError(cause))
{
- if (_logger.isInfoEnabled())
- {
- _logger.info("Closing AMQConnection due to :" + cause.getMessage());
- }
-
closer = (!_closed.getAndSet(true)) || closer;
- if (closer)
{
- closeAllSessions(cause, -1, -1); // FIXME: when doing this end up with RejectedExecutionException from executor.
+ _logger.info("Closing AMQConnection due to :" + cause);
}
}
- catch (JMSException e)
+ else
{
- _logger.error("Error closing all sessions: " + e, e);
+ _logger.info("Not a hard-error connection not closing: " + cause);
+ }
+
+ // deliver the exception if there is a listener
+ if (_exceptionListener != null)
+ {
+ _exceptionListener.onException(je);
+ }
+ else
+ {
+ _logger.error("Throwable Received but no listener set: " + cause);
+ }
+
+ // if we are closing the connection, close sessions first
+ if (closer)
+ {
+ try
+ {
+ closeAllSessions(cause, -1, -1); // FIXME: when doing this end up with RejectedExecutionException from executor.
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error closing all sessions: " + e, e);
+ }
}
-
- }
- else
- {
- _logger.info("Not a hard-error connection not closing: " + cause.getMessage());
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index 8f67274f53..2ee0a86e7c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -165,13 +165,20 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
_conn._connected = true;
_conn.setUsername(_qpidConnection.getUserID());
_conn._failoverPolicy.attainedConnection();
- } catch (ProtocolVersionException pe)
+ }
+ catch (ProtocolVersionException pe)
{
return new ProtocolVersion(pe.getMajor(), pe.getMinor());
- } catch (ConnectionException e)
+ }
+ catch (ConnectionException ce)
{
- throw new AMQException(AMQConstant.CHANNEL_ERROR,
- "cannot connect to broker", e);
+ AMQConstant code = AMQConstant.REPLY_SUCCESS;
+ if (ce.getClose() != null && ce.getClose().getReplyCode() != null)
+ {
+ code = AMQConstant.getConstant(ce.getClose().getReplyCode().getValue());
+ }
+ String msg = "Cannot connect to broker: " + ce.getMessage();
+ throw new AMQException(code, msg, ce);
}
return null;
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index f5bfea0155..a95380d821 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -128,7 +128,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
* The latest qpid Exception that has been raised.
*/
private Object _currentExceptionLock = new Object();
- private SessionException _currentException;
+ private AMQException _currentException;
// a ref on the qpid connection
protected org.apache.qpid.transport.Connection _qpidConnection;
@@ -827,20 +827,9 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
if (_currentException != null)
{
- SessionException se = _currentException;
+ AMQException amqe = _currentException;
_currentException = null;
- ExecutionException ee = se.getException();
- int code;
- if (ee == null)
- {
- code = 0;
- }
- else
- {
- code = ee.getErrorCode().getValue();
- }
- throw new AMQException
- (AMQConstant.getConstant(code), se.getMessage(), se);
+ throw amqe;
}
}
}
@@ -869,7 +858,19 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
synchronized (_currentExceptionLock)
{
- _currentException = exc;
+ ExecutionException ee = exc.getException();
+ int code;
+ if (ee == null)
+ {
+ code = AMQConstant.INTERNAL_ERROR.getCode();
+ }
+ else
+ {
+ code = ee.getErrorCode().getValue();
+ }
+ AMQException amqe = new AMQException(AMQConstant.getConstant(code), exc.getMessage(), exc.getCause());
+ _connection.exceptionReceived(amqe);
+ _currentException = amqe;
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 9d597d8290..c275905a67 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -139,36 +139,24 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
*/
@Override public void notifyMessage(AbstractJMSMessage jmsMessage)
{
- boolean messageOk = false;
try
{
- messageOk = checkPreConditions(jmsMessage);
- }
- catch (AMQException e)
- {
- _logger.error("Receivecd an Exception when receiving message",e);
- try
- {
-
- getSession().getAMQConnection().getExceptionListener()
- .onException(new JMSAMQException("Error when receiving message", e));
- }
- catch (Exception e1)
+ if (checkPreConditions(jmsMessage))
{
- // we should silently log thie exception as it only hanppens when the connection is closed
- _logger.error("Exception when receiving message", e1);
+ if (isMessageListenerSet() && capacity == 0)
+ {
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
+ }
+ _logger.debug("messageOk, trying to notify");
+ super.notifyMessage(jmsMessage);
}
}
- if (messageOk)
+ catch (AMQException e)
{
- if (isMessageListenerSet() && capacity == 0)
- {
- _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1,
- Option.UNRELIABLE);
- }
- _logger.debug("messageOk, trying to notify");
- super.notifyMessage(jmsMessage);
+ _logger.error("Receivecd an Exception when receiving message",e);
+ getSession().getAMQConnection().exceptionReceived(e);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index 6b7525b796..f874ea08f2 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -29,6 +29,7 @@ import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
+import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
import org.apache.qpid.client.message.AbstractJMSMessage;
@@ -220,11 +221,11 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
if (sync)
{
ssn.sync();
+ ((AMQSession_0_10) getSession()).getCurrentException();
}
-
}
- catch (RuntimeException e)
+ catch (Exception e)
{
JMSException jmse = new JMSException("Exception when sending message");
jmse.setLinkedException(e);
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
index bc82d6bc62..b392604822 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
@@ -22,6 +22,7 @@ package org.apache.qpid.client.handler;
import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.client.AMQAuthenticationException;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQState;
@@ -72,12 +73,18 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<Co
if (errorCode != AMQConstant.REPLY_SUCCESS)
{
- if (errorCode == AMQConstant.NOT_ALLOWED || (errorCode == AMQConstant.ACCESS_REFUSED))
+ if (errorCode == AMQConstant.NOT_ALLOWED)
{
_logger.info("Error :" + errorCode + ":" + Thread.currentThread().getName());
error = new AMQAuthenticationException(errorCode, reason == null ? null : reason.toString(), null);
}
+ else if (errorCode == AMQConstant.ACCESS_REFUSED)
+ {
+ _logger.info("Error :" + errorCode + ":" + Thread.currentThread().getName());
+
+ error = new AMQSecurityException(reason == null ? null : reason.toString(), null);
+ }
else
{
_logger.info("Connection close received with error code " + errorCode);
diff --git a/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
index 453921ea2b..9c56d36ade 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
@@ -20,29 +20,19 @@
*/
package org.apache.qpid.transport;
-import java.util.Collections;
-
+import static org.apache.qpid.transport.Connection.State.*;
-import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-
-import java.io.UnsupportedEncodingException;
import org.apache.qpid.QpidException;
import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
-import static org.apache.qpid.transport.Connection.State.*;
-
-
/**
* ServerDelegate
*
@@ -96,8 +86,7 @@ public class ServerDelegate extends ConnectionDelegate
SaslServer ss = createSaslServer(mechanism);
if (ss == null)
{
- conn.connectionClose
- (ConnectionCloseCode.CONNECTION_FORCED,
+ conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED,
"null SASL mechanism: " + mechanism);
return;
}
@@ -107,14 +96,14 @@ public class ServerDelegate extends ConnectionDelegate
catch (SaslException e)
{
conn.exception(e);
+ conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage());
}
}
protected SaslServer createSaslServer(String mechanism)
throws SaslException
{
- SaslServer ss = Sasl.createSaslServer
- (mechanism, "AMQP", "localhost", null, null);
+ SaslServer ss = Sasl.createSaslServer(mechanism, "AMQP", "localhost", null, null);
return ss;
}
@@ -141,6 +130,7 @@ public class ServerDelegate extends ConnectionDelegate
catch (SaslException e)
{
conn.exception(e);
+ conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage());
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java
index d9a8e5550c..f361012c79 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -734,11 +734,7 @@ public class Session extends SessionInvoker
if (lt(maxComplete, point))
{
- if (state == CLOSED)
- {
- throw new SessionException(getException());
- }
- else
+ if (state != CLOSED)
{
throw new SessionException
(String.format
diff --git a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
index 6146f029b2..05f3947654 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
@@ -139,6 +139,7 @@ public class SessionDelegate
@Override public void executionException(Session ssn, ExecutionException exc)
{
ssn.setException(exc);
+ ssn.getSessionListener().exception(ssn, new SessionException(exc));
}
@Override public void messageTransfer(Session ssn, MessageTransfer xfr)
diff --git a/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java b/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
index ee2938f2fe..28125f2f19 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
@@ -38,6 +38,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.url.URLSyntaxException;
/**
@@ -59,7 +60,7 @@ import org.apache.qpid.url.URLSyntaxException;
*/
public class SimpleACLTest extends AbstractACLTestCase
{
- public void testAccessAuthorized() throws AMQException, URLSyntaxException, Exception
+ public void testAccessAuthorizedSuccess() throws AMQException, URLSyntaxException, Exception
{
try
{
@@ -78,7 +79,7 @@ public class SimpleACLTest extends AbstractACLTestCase
}
}
- public void testAccessVhostAuthorisedGuest() throws IOException, Exception
+ public void testAccessVhostAuthorisedGuestSuccess() throws IOException, Exception
{
//The 'guest' user has no access to the 'test' vhost, as tested below in testAccessNoRights(), and so
//is unable to perform actions such as connecting (and by extension, creating a queue, and consuming
@@ -117,8 +118,7 @@ public class SimpleACLTest extends AbstractACLTestCase
}
}
- // XXX one
- public void testAccessNoRights() throws Exception
+ public void testAccessNoRightsFailure() throws Exception
{
try
{
@@ -131,13 +131,14 @@ public class SimpleACLTest extends AbstractACLTestCase
}
catch (JMSException e)
{
- // XXX JMSException -> linkedException -> cause = AMQException.403
+ // JMSException -> linkedException -> cause = AMQException (403 or 320)
Exception linkedException = e.getLinkedException();
assertNotNull("There was no linked exception", linkedException);
Throwable cause = linkedException.getCause();
assertNotNull("Cause was null", cause);
- assertTrue("Wrong linked exception type",cause instanceof AMQException);
- assertEquals("Incorrect error code received", 403, ((AMQException) cause).getErrorCode().getCode());
+ assertTrue("Wrong linked exception type", cause instanceof AMQException);
+ AMQConstant errorCode = isBroker010() ? AMQConstant.CONTEXT_IN_USE : AMQConstant.ACCESS_REFUSED;
+ assertEquals("Incorrect error code received", errorCode, ((AMQException) cause).getErrorCode());
}
}
@@ -166,7 +167,6 @@ public class SimpleACLTest extends AbstractACLTestCase
}
}
- // XXX two
public void testServerDeleteQueueFailure() throws Exception
{
try
@@ -188,12 +188,12 @@ public class SimpleACLTest extends AbstractACLTestCase
}
catch (JMSException e)
{
- // XXX JMSException -> linedException = AMQException.403
+ // JMSException -> linedException = AMQException.403
check403Exception(e.getLinkedException());
}
}
- public void testClientConsumeFromTempQueueValid() throws AMQException, URLSyntaxException, Exception
+ public void testClientConsumeFromTempQueueSuccess() throws AMQException, URLSyntaxException, Exception
{
try
{
@@ -213,7 +213,7 @@ public class SimpleACLTest extends AbstractACLTestCase
}
}
- public void testClientConsumeFromNamedQueueInvalid() throws NamingException, Exception
+ public void testClientConsumeFromNamedQueueFailure() throws NamingException, Exception
{
try
{
@@ -225,8 +225,6 @@ public class SimpleACLTest extends AbstractACLTestCase
sess.createConsumer(sess.createQueue("IllegalQueue"));
- conn.close();
-
fail("Test failed as consumer was created.");
}
catch (JMSException e)
@@ -235,7 +233,7 @@ public class SimpleACLTest extends AbstractACLTestCase
}
}
- public void testClientCreateTemporaryQueue() throws JMSException, URLSyntaxException, Exception
+ public void testClientCreateTemporaryQueueSuccess() throws JMSException, URLSyntaxException, Exception
{
try
{
@@ -257,7 +255,7 @@ public class SimpleACLTest extends AbstractACLTestCase
}
}
- public void testClientCreateNamedQueue() throws NamingException, JMSException, AMQException, Exception
+ public void testClientCreateNamedQueueFailure() throws NamingException, JMSException, AMQException, Exception
{
try
{
@@ -275,7 +273,6 @@ public class SimpleACLTest extends AbstractACLTestCase
}
catch (AMQException e)
{
- // XXX AMQException.403
check403Exception(e);
}
}
@@ -405,8 +402,6 @@ public class SimpleACLTest extends AbstractACLTestCase
conn.start();
sess.createConsumer(sess.createQueue("Invalid"));
-
- conn.close();
fail("Test failed as consumer was created.");
}
@@ -520,7 +515,7 @@ public class SimpleACLTest extends AbstractACLTestCase
/**
* This test uses both the cilent and sender to validate that the Server is able to publish to a temporary queue.
- * The reason the client must be in volved is that the Serve is unable to create its own Temporary Queues.
+ * The reason the client must be involved is that the Server is unable to create its own Temporary Queues.
*
* @throws AMQException
* @throws URLSyntaxException
diff --git a/java/systests/src/main/java/org/apache/qpid/systest/GlobalQueuesTest.java b/java/systests/src/main/java/org/apache/qpid/systest/GlobalQueuesTest.java
index e0934faf44..9ff143daf3 100644
--- a/java/systests/src/main/java/org/apache/qpid/systest/GlobalQueuesTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/systest/GlobalQueuesTest.java
@@ -61,6 +61,7 @@ public class GlobalQueuesTest extends TestingBaseCase
*/
+
/**
* VirtualHost Plugin Configuration
diff --git a/java/systests/src/main/java/org/apache/qpid/systest/TestingBaseCase.java b/java/systests/src/main/java/org/apache/qpid/systest/TestingBaseCase.java
index 9831c74574..08a7b7a6e5 100644
--- a/java/systests/src/main/java/org/apache/qpid/systest/TestingBaseCase.java
+++ b/java/systests/src/main/java/org/apache/qpid/systest/TestingBaseCase.java
@@ -172,22 +172,8 @@ public class TestingBaseCase extends QpidBrokerTestCase implements ExceptionList
startPublisher(_destination);
boolean disconnected = _disconnectionLatch.await(DISCONNECTION_WAIT, TimeUnit.SECONDS);
-
- if (!disconnected && isBroker010())
- {
- try
- {
- ((AMQSession_0_10) session).sync();
- }
- catch (AMQException amqe)
- {
- JMSException jmsException = new JMSException(amqe.getMessage());
- jmsException.setLinkedException(amqe);
- jmsException.initCause(amqe);
- _connectionException = jmsException;
- }
- }
-
+
+ assertTrue("Client was not disconnected", disconnected);
assertTrue("Client was not disconnected.", _connectionException != null);
Exception linked = _connectionException.getLinkedException();
@@ -209,11 +195,11 @@ public class TestingBaseCase extends QpidBrokerTestCase implements ExceptionList
assertNotNull("No linked exception set on:" + _connectionException.getMessage(), linked);
- assertEquals("Incorrect linked exception received.", AMQChannelClosedException.class, linked.getClass());
+ assertTrue("Incorrect linked exception received.", linked instanceof AMQException);
- AMQChannelClosedException ccException = (AMQChannelClosedException) linked;
+ AMQException amqException = (AMQException) linked;
- assertEquals("Channel was not closed with correct code.", AMQConstant.RESOURCE_ERROR, ccException.getErrorCode());
+ assertEquals("Channel was not closed with correct code.", AMQConstant.RESOURCE_ERROR, amqException.getErrorCode());
}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
index fb23d80843..8bb588a036 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
@@ -145,7 +145,7 @@ public class ConnectionTest extends QpidBrokerTestCase
catch (AMQConnectionFailureException amqe)
{
assertNotNull("No cause set:" + amqe.getMessage(), amqe.getCause());
- assertEquals("Exception was wrong type", AMQAuthenticationException.class, amqe.getCause().getClass());
+ assertTrue("Exception was wrong type", amqe.getCause() instanceof AMQException);
}
finally
{