diff options
| author | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-08-13 16:19:28 +0000 |
|---|---|---|
| committer | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-08-13 16:19:28 +0000 |
| commit | c700093f4294bb1bcda42f3fc1982dcc57dc44da (patch) | |
| tree | 5fc93a03fed2274513e7da3123258d12f58ffbcd /java/client/src | |
| parent | cf96b23f687c379bd71f465c837379b0966c2184 (diff) | |
| download | qpid-python-c700093f4294bb1bcda42f3fc1982dcc57dc44da.tar.gz | |
QPID-2657: Correct handling of sync on 0-10 client session for exceptions
AMQSession_0_10 is modified to contain a pair of get/set methods for the current
exception, using the set method to post the exception to the listener. The sync
method will now throw an exception if one has occurred and all other methods
that used to call sync()/getCurrentException() can just call sync(0 and get the
expected behaviour.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@985262 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
3 files changed, 95 insertions, 61 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index d24ad46512..430a4bd9e9 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -361,8 +361,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic if (!nowait) { // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); + sync(); } } @@ -382,9 +381,21 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic flushTask = null; } flushAcknowledgments(); - getQpidSession().sync(); - getQpidSession().close(); - getCurrentException(); + try + { + getQpidSession().sync(); + getQpidSession().close(); + } + catch (SessionException se) + { + setCurrentException(se); + } + + AMQException amqe = getCurrentException(); + if (amqe != null) + { + throw amqe; + } } @@ -403,7 +414,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic getQpidSession().setAutoSync(false); } // We need to sync so that we get notify of an error. - getCurrentException(); + sync(); } /** @@ -426,8 +437,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic autoDelete ? Option.AUTO_DELETE : Option.NONE, exclusive ? Option.EXCLUSIVE : Option.NONE); // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); + sync(); } /** @@ -451,8 +461,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED); // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); + sync(); } @@ -566,7 +575,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic try { boolean isTopic; - + if (consumer.getDestination().getDestSyntax() == AMQDestination.DestSyntax.BURL) { isTopic = consumer.getDestination() instanceof AMQTopic || @@ -583,7 +592,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic (isTopic || consumer.getMessageSelector() == null || consumer.getMessageSelector().equals("")); } - + getQpidSession().messageSubscribe (queueName.toString(), String.valueOf(tag), getAcknowledgeMode() == NO_ACKNOWLEDGE ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT, @@ -607,7 +616,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF, Option.UNRELIABLE); - + if(capacity > 0 && _dispatcher != null && (isStarted() || _immediatePrefetch)) { // set the flow @@ -619,11 +628,10 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic if (!nowait) { - getQpidSession().sync(); - getCurrentException(); + sync(); } } - + private long getCapacity(AMQDestination destination) { long capacity = 0; @@ -677,8 +685,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic // We need to sync so that we get notify of an error. if (!nowait) { - getQpidSession().sync(); - getCurrentException(); + sync(); } } @@ -710,7 +717,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { queueName = amqd.getAMQQueueName(); } - + if (amqd.getDestSyntax() == DestSyntax.BURL) { Map<String,Object> arguments = new HashMap<String,Object>(); @@ -718,7 +725,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { arguments.put("no-local", true); } - + getQpidSession().queueDeclare(queueName.toString(), "" , arguments, amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, amqd.isDurable() ? Option.DURABLE : Option.NONE, @@ -733,13 +740,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic node.isDurable() ? Option.DURABLE : Option.NONE, node.isExclusive() ? Option.EXCLUSIVE : Option.NONE); } - + // passive --> false if (!nowait) { // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); + sync(); } return queueName; } @@ -753,8 +759,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic // ifEmpty --> false // ifUnused --> false // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); + sync(); } /** @@ -807,8 +812,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); + sync(); } @@ -816,8 +820,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { getQpidSession().txRollback(); // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); + sync(); } //------ Private methods @@ -835,19 +838,20 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic /** * Get the latest thrown exception. * - * @throws org.apache.qpid.AMQException get the latest thrown error. + * @throws SessionException get the latest thrown error. */ - public void getCurrentException() throws AMQException + public AMQException getCurrentException() { + AMQException amqe = null; synchronized (_currentExceptionLock) { if (_currentException != null) { - AMQException amqe = _currentException; + amqe = _currentException; _currentException = null; - throw amqe; } } + return amqe; } public void opened(Session ssn) {} @@ -872,22 +876,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic public void exception(Session ssn, SessionException exc) { - synchronized (_currentExceptionLock) - { - ExecutionException ee = exc.getException(); - int code; - if (ee == null) - { - code = AMQConstant.INTERNAL_ERROR.getCode(); - } - else - { - code = ee.getErrorCode().getValue(); - } - AMQException amqe = new AMQException(AMQConstant.getConstant(code), exc.getMessage(), exc.getCause()); - _connection.exceptionReceived(amqe); - _currentException = amqe; - } + setCurrentException(exc); } public void closed(Session ssn) {} @@ -1041,11 +1030,41 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { return Serial.lt((int) currentMark, (int) deliveryTag); } - + public void sync() throws AMQException { - _qpidSession.sync(); - getCurrentException(); + try + { + getQpidSession().sync(); + } + catch (SessionException se) + { + setCurrentException(se); + } + + AMQException amqe = getCurrentException(); + if (amqe != null) + { + throw amqe; + } + } + + public void setCurrentException(SessionException se) + { + synchronized (_currentExceptionLock) + { + ExecutionException ee = se.getException(); + int code = AMQConstant.INTERNAL_ERROR.getCode(); + if (ee != null) + { + code = ee.getErrorCode().getValue(); + } + AMQException amqe = new AMQException(AMQConstant.getConstant(code), se.getMessage(), se.getCause()); + + _connection.exceptionReceived(amqe); + + _currentException = amqe; + } } public AMQMessageDelegateFactory getMessageDelegateFactory() diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index a942d808a9..eddaa1a6bb 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -168,16 +168,26 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM */ @Override void sendCancel() throws AMQException { - ((AMQSession_0_10) getSession()).getQpidSession().messageCancel(getConsumerTagString()); - ((AMQSession_0_10) getSession()).getQpidSession().sync(); - // confirm cancel - getSession().confirmConsumerCancelled(getConsumerTag()); - ((AMQSession_0_10) getSession()).getCurrentException(); + _0_10session.getQpidSession().messageCancel(getConsumerTagString()); + try + { + _0_10session.getQpidSession().sync(); + getSession().confirmConsumerCancelled(getConsumerTag()); // confirm cancel + } + catch (SessionException se) + { + _0_10session.setCurrentException(se); + } + + AMQException amqe = _0_10session.getCurrentException(); + if (amqe != null) + { + throw amqe; + } } @Override void notifyMessage(UnprocessedMessage_0_10 messageFrame) { - super.notifyMessage(messageFrame); } @@ -285,7 +295,12 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM _0_10session.messageAcknowledge (ranges, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); - _0_10session.getCurrentException(); + + AMQException amqe = _0_10session.getCurrentException(); + if (amqe != null) + { + throw amqe; + } } } @@ -302,7 +317,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM RangeSet ranges = new RangeSet(); ranges.add((int) message.getDeliveryTag()); _0_10session.getQpidSession().messageRelease(ranges); - _0_10session.getCurrentException(); + _0_10session.sync(); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index df59be25d0..14e1601993 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -266,7 +266,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac return _destination; } - public void close() throws JMSException + public void close() { _closed.set(true); _session.deregisterProducer(_producerId); |
