diff options
Diffstat (limited to 'java/client/src')
3 files changed, 95 insertions, 61 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index d24ad46512..430a4bd9e9 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -361,8 +361,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic if (!nowait) { // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); + sync(); } } @@ -382,9 +381,21 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic flushTask = null; } flushAcknowledgments(); - getQpidSession().sync(); - getQpidSession().close(); - getCurrentException(); + try + { + getQpidSession().sync(); + getQpidSession().close(); + } + catch (SessionException se) + { + setCurrentException(se); + } + + AMQException amqe = getCurrentException(); + if (amqe != null) + { + throw amqe; + } } @@ -403,7 +414,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic getQpidSession().setAutoSync(false); } // We need to sync so that we get notify of an error. - getCurrentException(); + sync(); } /** @@ -426,8 +437,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic autoDelete ? Option.AUTO_DELETE : Option.NONE, exclusive ? Option.EXCLUSIVE : Option.NONE); // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); + sync(); } /** @@ -451,8 +461,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED); // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); + sync(); } @@ -566,7 +575,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic try { boolean isTopic; - + if (consumer.getDestination().getDestSyntax() == AMQDestination.DestSyntax.BURL) { isTopic = consumer.getDestination() instanceof AMQTopic || @@ -583,7 +592,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic (isTopic || consumer.getMessageSelector() == null || consumer.getMessageSelector().equals("")); } - + getQpidSession().messageSubscribe (queueName.toString(), String.valueOf(tag), getAcknowledgeMode() == NO_ACKNOWLEDGE ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT, @@ -607,7 +616,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF, Option.UNRELIABLE); - + if(capacity > 0 && _dispatcher != null && (isStarted() || _immediatePrefetch)) { // set the flow @@ -619,11 +628,10 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic if (!nowait) { - getQpidSession().sync(); - getCurrentException(); + sync(); } } - + private long getCapacity(AMQDestination destination) { long capacity = 0; @@ -677,8 +685,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic // We need to sync so that we get notify of an error. if (!nowait) { - getQpidSession().sync(); - getCurrentException(); + sync(); } } @@ -710,7 +717,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { queueName = amqd.getAMQQueueName(); } - + if (amqd.getDestSyntax() == DestSyntax.BURL) { Map<String,Object> arguments = new HashMap<String,Object>(); @@ -718,7 +725,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { arguments.put("no-local", true); } - + getQpidSession().queueDeclare(queueName.toString(), "" , arguments, amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, amqd.isDurable() ? Option.DURABLE : Option.NONE, @@ -733,13 +740,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic node.isDurable() ? Option.DURABLE : Option.NONE, node.isExclusive() ? Option.EXCLUSIVE : Option.NONE); } - + // passive --> false if (!nowait) { // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); + sync(); } return queueName; } @@ -753,8 +759,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic // ifEmpty --> false // ifUnused --> false // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); + sync(); } /** @@ -807,8 +812,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); + sync(); } @@ -816,8 +820,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { getQpidSession().txRollback(); // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); + sync(); } //------ Private methods @@ -835,19 +838,20 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic /** * Get the latest thrown exception. * - * @throws org.apache.qpid.AMQException get the latest thrown error. + * @throws SessionException get the latest thrown error. */ - public void getCurrentException() throws AMQException + public AMQException getCurrentException() { + AMQException amqe = null; synchronized (_currentExceptionLock) { if (_currentException != null) { - AMQException amqe = _currentException; + amqe = _currentException; _currentException = null; - throw amqe; } } + return amqe; } public void opened(Session ssn) {} @@ -872,22 +876,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic public void exception(Session ssn, SessionException exc) { - synchronized (_currentExceptionLock) - { - ExecutionException ee = exc.getException(); - int code; - if (ee == null) - { - code = AMQConstant.INTERNAL_ERROR.getCode(); - } - else - { - code = ee.getErrorCode().getValue(); - } - AMQException amqe = new AMQException(AMQConstant.getConstant(code), exc.getMessage(), exc.getCause()); - _connection.exceptionReceived(amqe); - _currentException = amqe; - } + setCurrentException(exc); } public void closed(Session ssn) {} @@ -1041,11 +1030,41 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { return Serial.lt((int) currentMark, (int) deliveryTag); } - + public void sync() throws AMQException { - _qpidSession.sync(); - getCurrentException(); + try + { + getQpidSession().sync(); + } + catch (SessionException se) + { + setCurrentException(se); + } + + AMQException amqe = getCurrentException(); + if (amqe != null) + { + throw amqe; + } + } + + public void setCurrentException(SessionException se) + { + synchronized (_currentExceptionLock) + { + ExecutionException ee = se.getException(); + int code = AMQConstant.INTERNAL_ERROR.getCode(); + if (ee != null) + { + code = ee.getErrorCode().getValue(); + } + AMQException amqe = new AMQException(AMQConstant.getConstant(code), se.getMessage(), se.getCause()); + + _connection.exceptionReceived(amqe); + + _currentException = amqe; + } } public AMQMessageDelegateFactory getMessageDelegateFactory() diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index a942d808a9..eddaa1a6bb 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -168,16 +168,26 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM */ @Override void sendCancel() throws AMQException { - ((AMQSession_0_10) getSession()).getQpidSession().messageCancel(getConsumerTagString()); - ((AMQSession_0_10) getSession()).getQpidSession().sync(); - // confirm cancel - getSession().confirmConsumerCancelled(getConsumerTag()); - ((AMQSession_0_10) getSession()).getCurrentException(); + _0_10session.getQpidSession().messageCancel(getConsumerTagString()); + try + { + _0_10session.getQpidSession().sync(); + getSession().confirmConsumerCancelled(getConsumerTag()); // confirm cancel + } + catch (SessionException se) + { + _0_10session.setCurrentException(se); + } + + AMQException amqe = _0_10session.getCurrentException(); + if (amqe != null) + { + throw amqe; + } } @Override void notifyMessage(UnprocessedMessage_0_10 messageFrame) { - super.notifyMessage(messageFrame); } @@ -285,7 +295,12 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM _0_10session.messageAcknowledge (ranges, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); - _0_10session.getCurrentException(); + + AMQException amqe = _0_10session.getCurrentException(); + if (amqe != null) + { + throw amqe; + } } } @@ -302,7 +317,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM RangeSet ranges = new RangeSet(); ranges.add((int) message.getDeliveryTag()); _0_10session.getQpidSession().messageRelease(ranges); - _0_10session.getCurrentException(); + _0_10session.sync(); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index df59be25d0..14e1601993 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -266,7 +266,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac return _destination; } - public void close() throws JMSException + public void close() { _closed.set(true); _session.deregisterProducer(_producerId); |
