diff options
author | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-08-13 16:19:28 +0000 |
---|---|---|
committer | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-08-13 16:19:28 +0000 |
commit | c700093f4294bb1bcda42f3fc1982dcc57dc44da (patch) | |
tree | 5fc93a03fed2274513e7da3123258d12f58ffbcd | |
parent | cf96b23f687c379bd71f465c837379b0966c2184 (diff) | |
download | qpid-python-c700093f4294bb1bcda42f3fc1982dcc57dc44da.tar.gz |
QPID-2657: Correct handling of sync on 0-10 client session for exceptions
AMQSession_0_10 is modified to contain a pair of get/set methods for the current
exception, using the set method to post the exception to the listener. The sync
method will now throw an exception if one has occurred and all other methods
that used to call sync()/getCurrentException() can just call sync(0 and get the
expected behaviour.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@985262 13f79535-47bb-0310-9956-ffa450edef68
9 files changed, 147 insertions, 93 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java index e71782b116..8c7b374791 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java @@ -105,11 +105,19 @@ public class ServerConnection extends Connection implements AMQConnectionModel public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException { ExecutionException ex = new ExecutionException(); - ex.setErrorCode(ExecutionErrorCode.get(cause.getCode())); + ExecutionErrorCode code = ExecutionErrorCode.INTERNAL_ERROR; + try + { + code = ExecutionErrorCode.get(cause.getCode()); + } + catch (IllegalArgumentException iae) + { + // Ignore, already set to INTERNAL_ERROR + } + ex.setErrorCode(code); ex.setDescription(message); ((ServerSession)session).invoke(ex); ((ServerSession)session).close(); } - } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index d24ad46512..430a4bd9e9 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -361,8 +361,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic if (!nowait) { // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); + sync(); } } @@ -382,9 +381,21 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic flushTask = null; } flushAcknowledgments(); - getQpidSession().sync(); - getQpidSession().close(); - getCurrentException(); + try + { + getQpidSession().sync(); + getQpidSession().close(); + } + catch (SessionException se) + { + setCurrentException(se); + } + + AMQException amqe = getCurrentException(); + if (amqe != null) + { + throw amqe; + } } @@ -403,7 +414,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic getQpidSession().setAutoSync(false); } // We need to sync so that we get notify of an error. - getCurrentException(); + sync(); } /** @@ -426,8 +437,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic autoDelete ? Option.AUTO_DELETE : Option.NONE, exclusive ? Option.EXCLUSIVE : Option.NONE); // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); + sync(); } /** @@ -451,8 +461,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED); // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); + sync(); } @@ -566,7 +575,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic try { boolean isTopic; - + if (consumer.getDestination().getDestSyntax() == AMQDestination.DestSyntax.BURL) { isTopic = consumer.getDestination() instanceof AMQTopic || @@ -583,7 +592,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic (isTopic || consumer.getMessageSelector() == null || consumer.getMessageSelector().equals("")); } - + getQpidSession().messageSubscribe (queueName.toString(), String.valueOf(tag), getAcknowledgeMode() == NO_ACKNOWLEDGE ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT, @@ -607,7 +616,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF, Option.UNRELIABLE); - + if(capacity > 0 && _dispatcher != null && (isStarted() || _immediatePrefetch)) { // set the flow @@ -619,11 +628,10 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic if (!nowait) { - getQpidSession().sync(); - getCurrentException(); + sync(); } } - + private long getCapacity(AMQDestination destination) { long capacity = 0; @@ -677,8 +685,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic // We need to sync so that we get notify of an error. if (!nowait) { - getQpidSession().sync(); - getCurrentException(); + sync(); } } @@ -710,7 +717,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { queueName = amqd.getAMQQueueName(); } - + if (amqd.getDestSyntax() == DestSyntax.BURL) { Map<String,Object> arguments = new HashMap<String,Object>(); @@ -718,7 +725,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { arguments.put("no-local", true); } - + getQpidSession().queueDeclare(queueName.toString(), "" , arguments, amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, amqd.isDurable() ? Option.DURABLE : Option.NONE, @@ -733,13 +740,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic node.isDurable() ? Option.DURABLE : Option.NONE, node.isExclusive() ? Option.EXCLUSIVE : Option.NONE); } - + // passive --> false if (!nowait) { // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); + sync(); } return queueName; } @@ -753,8 +759,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic // ifEmpty --> false // ifUnused --> false // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); + sync(); } /** @@ -807,8 +812,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); + sync(); } @@ -816,8 +820,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { getQpidSession().txRollback(); // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); + sync(); } //------ Private methods @@ -835,19 +838,20 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic /** * Get the latest thrown exception. * - * @throws org.apache.qpid.AMQException get the latest thrown error. + * @throws SessionException get the latest thrown error. */ - public void getCurrentException() throws AMQException + public AMQException getCurrentException() { + AMQException amqe = null; synchronized (_currentExceptionLock) { if (_currentException != null) { - AMQException amqe = _currentException; + amqe = _currentException; _currentException = null; - throw amqe; } } + return amqe; } public void opened(Session ssn) {} @@ -872,22 +876,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic public void exception(Session ssn, SessionException exc) { - synchronized (_currentExceptionLock) - { - ExecutionException ee = exc.getException(); - int code; - if (ee == null) - { - code = AMQConstant.INTERNAL_ERROR.getCode(); - } - else - { - code = ee.getErrorCode().getValue(); - } - AMQException amqe = new AMQException(AMQConstant.getConstant(code), exc.getMessage(), exc.getCause()); - _connection.exceptionReceived(amqe); - _currentException = amqe; - } + setCurrentException(exc); } public void closed(Session ssn) {} @@ -1041,11 +1030,41 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { return Serial.lt((int) currentMark, (int) deliveryTag); } - + public void sync() throws AMQException { - _qpidSession.sync(); - getCurrentException(); + try + { + getQpidSession().sync(); + } + catch (SessionException se) + { + setCurrentException(se); + } + + AMQException amqe = getCurrentException(); + if (amqe != null) + { + throw amqe; + } + } + + public void setCurrentException(SessionException se) + { + synchronized (_currentExceptionLock) + { + ExecutionException ee = se.getException(); + int code = AMQConstant.INTERNAL_ERROR.getCode(); + if (ee != null) + { + code = ee.getErrorCode().getValue(); + } + AMQException amqe = new AMQException(AMQConstant.getConstant(code), se.getMessage(), se.getCause()); + + _connection.exceptionReceived(amqe); + + _currentException = amqe; + } } public AMQMessageDelegateFactory getMessageDelegateFactory() diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index a942d808a9..eddaa1a6bb 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -168,16 +168,26 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM */ @Override void sendCancel() throws AMQException { - ((AMQSession_0_10) getSession()).getQpidSession().messageCancel(getConsumerTagString()); - ((AMQSession_0_10) getSession()).getQpidSession().sync(); - // confirm cancel - getSession().confirmConsumerCancelled(getConsumerTag()); - ((AMQSession_0_10) getSession()).getCurrentException(); + _0_10session.getQpidSession().messageCancel(getConsumerTagString()); + try + { + _0_10session.getQpidSession().sync(); + getSession().confirmConsumerCancelled(getConsumerTag()); // confirm cancel + } + catch (SessionException se) + { + _0_10session.setCurrentException(se); + } + + AMQException amqe = _0_10session.getCurrentException(); + if (amqe != null) + { + throw amqe; + } } @Override void notifyMessage(UnprocessedMessage_0_10 messageFrame) { - super.notifyMessage(messageFrame); } @@ -285,7 +295,12 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM _0_10session.messageAcknowledge (ranges, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); - _0_10session.getCurrentException(); + + AMQException amqe = _0_10session.getCurrentException(); + if (amqe != null) + { + throw amqe; + } } } @@ -302,7 +317,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM RangeSet ranges = new RangeSet(); ranges.add((int) message.getDeliveryTag()); _0_10session.getQpidSession().messageRelease(ranges); - _0_10session.getCurrentException(); + _0_10session.sync(); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index df59be25d0..14e1601993 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -266,7 +266,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac return _destination; } - public void close() throws JMSException + public void close() { _closed.set(true); _session.deregisterProducer(_producerId); diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java index ed6f00a51c..13b8e461d4 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -56,7 +56,7 @@ public class Connection extends ConnectionInvoker implements Receiver<ProtocolEvent>, Sender<ProtocolEvent> { - private static final Logger log = Logger.get(Connection.class); + protected static final Logger log = Logger.get(Connection.class); public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD } diff --git a/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java index 29389df99a..88dd2d6afa 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java @@ -99,5 +99,4 @@ public abstract class ConnectionDelegate ssn.closed(); } } - } diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java index 5e40527c2f..9b84ff422b 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -280,7 +280,7 @@ public class Session extends SessionInvoker { if (m != null) { - System.out.println(m); + log.debug("%s", m); } } } @@ -732,8 +732,7 @@ public class Session extends SessionInvoker Waiter w = new Waiter(commands, timeout); while (w.hasTime() && state != CLOSED && lt(maxComplete, point)) { - log.debug("%s waiting for[%d]: %d, %s", this, point, - maxComplete, commands); + log.debug("%s waiting for[%d]: %d, %s", this, point, maxComplete, commands); w.await(); } @@ -741,16 +740,23 @@ public class Session extends SessionInvoker { if (state != CLOSED) { - throw new SessionException - (String.format - ("timed out waiting for sync: complete = %s, point = %s", maxComplete, point)); + throw new SessionException( + String.format("timed out waiting for sync: complete = %s, point = %s", + maxComplete, point)); + } + else + { + ExecutionException ee = getException(); + if (ee != null) + { + throw new SessionException(ee); + } } } } } - private Map<Integer,ResultFuture<?>> results = - new HashMap<Integer,ResultFuture<?>>(); + private Map<Integer,ResultFuture<?>> results = new HashMap<Integer,ResultFuture<?>>(); private ExecutionException exception = null; void result(int command, Struct result) @@ -769,9 +775,8 @@ public class Session extends SessionInvoker { if (exception != null) { - throw new IllegalStateException - (String.format - ("too many exceptions: %s, %s", exception, exc)); + throw new IllegalStateException( + String.format("too many exceptions: %s, %s", exception, exc)); } exception = exc; } @@ -849,8 +854,8 @@ public class Session extends SessionInvoker } else { - throw new SessionException - (String.format("%s timed out waiting for result: %s", + throw new SessionException( + String.format("%s timed out waiting for result: %s", Session.this, this)); } } @@ -961,5 +966,4 @@ public class Session extends SessionInvoker { return String.format("ssn:%s", name); } - } diff --git a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java index 15539c1d07..5d8e4d5565 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java @@ -33,11 +33,15 @@ public class SessionDelegate extends MethodDelegate<Session> implements ProtocolDelegate<Session> { - private static final Logger log = Logger.get(SessionDelegate.class); + protected static final Logger log = Logger.get(SessionDelegate.class); - public void init(Session ssn, ProtocolHeader hdr) { } + public void init(Session ssn, ProtocolHeader hdr) + { + log.warn("INIT: [%s] %s", ssn, hdr); + } - public void control(Session ssn, Method method) { + public void control(Session ssn, Method method) + { method.dispatch(ssn, this); } @@ -50,7 +54,10 @@ public class SessionDelegate } } - public void error(Session ssn, ProtocolError error) { } + public void error(Session ssn, ProtocolError error) + { + log.warn("ERROR: [%s] %s", ssn, error); + } public void handle(Session ssn, Method method) { @@ -195,9 +202,11 @@ public class SessionDelegate public void closed(Session session) { + log.warn("CLOSED: [%s]", session); } public void detached(Session session) - { + { + log.warn("DETACHED: [%s]", session); } } diff --git a/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java index bdd3a0c93b..375a326654 100644 --- a/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java +++ b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java @@ -424,10 +424,6 @@ public class ConnectionTest extends QpidTestCase implements SessionListener } } - /** - * The 0-10 {@code executionSync} command should set the exception status in the session, - * so that the client session object can then throw it as an {@link AMQException}. - */ public void testExecutionExceptionSync() throws Exception { startServer(); @@ -436,11 +432,15 @@ public class ConnectionTest extends QpidTestCase implements SessionListener conn.connect("localhost", port, null, "guest", "guest"); Session ssn = conn.createSession(); send(ssn, "EXCP 0", true); - ExecutionException before = ssn.getException(); - assertNull("There should not be an exception stored in the session", before); - ssn.sync(); - ExecutionException after = ssn.getException(); - assertNotNull("There should be an exception stored in the session", after); + try + { + ssn.sync(); + fail("this should have failed"); + } + catch (SessionException exc) + { + assertNotNull(exc.getException()); + } } } |