diff options
Diffstat (limited to 'java/client')
3 files changed, 58 insertions, 19 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 2e3e417c95..5c565989d7 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -91,6 +91,7 @@ import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.jms.Session; import org.apache.qpid.thread.Threading; import org.apache.qpid.url.AMQBindingURL; +import org.apache.mina.common.IoSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1559,6 +1560,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic suspendChannel(true); } + // Let the dispatcher know that all the incomming messages + // should be rolled back(reject/release) + _rollbackMark.set(_highestDeliveryTag.get()); + + syncDispatchQueue(); + + _dispatcher.rollback(); + releaseForRollback(); sendRollback(); @@ -1851,26 +1860,58 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic void failoverPrep() { - startDispatcherIfNecessary(); syncDispatchQueue(); } void syncDispatchQueue() { - final CountDownLatch signal = new CountDownLatch(1); - _queue.add(new Dispatchable() { - public void dispatch(AMQSession ssn) + if (Thread.currentThread() == _dispatcherThread) + { + while (!_closed.get() && !_queue.isEmpty()) { - signal.countDown(); + Dispatchable disp; + try + { + disp = (Dispatchable) _queue.take(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + + // Check just in case _queue becomes empty, it shouldn't but + // better than an NPE. + if (disp == null) + { + _logger.debug("_queue became empty during sync."); + break; + } + + disp.dispatch(AMQSession.this); } - }); - try - { - signal.await(); } - catch (InterruptedException e) + else { - throw new RuntimeException(e); + startDispatcherIfNecessary(); + + final CountDownLatch signal = new CountDownLatch(1); + + _queue.add(new Dispatchable() + { + public void dispatch(AMQSession ssn) + { + signal.countDown(); + } + }); + + try + { + signal.await(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 0644bd88a8..1587d6a6bf 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -414,9 +414,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic public void releaseForRollback() { - startDispatcherIfNecessary(); - syncDispatchQueue(); - _dispatcher.rollback(); getQpidSession().messageRelease(_txRangeSet, Option.SET_REDELIVERED); _txRangeSet.clear(); _txSize = 0; diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index d7196c0abb..bc1453beaf 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -195,6 +195,12 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B public void releaseForRollback() { + // Reject all the messages that have been received in this session and + // have not yet been acknowledged. Should look to remove + // _deliveredMessageTags and use _txRangeSet as used by 0-10. + // Otherwise messages will be able to arrive out of order to a second + // consumer on the queue. Whilst this is within the JMS spec it is not + // user friendly and avoidable. while (true) { Long tag = _deliveredMessageTags.poll(); @@ -205,11 +211,6 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B rejectMessage(tag, true); } - - if (_dispatcher != null) - { - _dispatcher.rollback(); - } } public void rejectMessage(long deliveryTag, boolean requeue) |
