diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2009-09-28 15:33:18 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2009-09-28 15:33:18 +0000 |
| commit | fc1ba27a41f5a6223c7331004d1ab3a44ec07cf6 (patch) | |
| tree | 591c591d862cfa87d34284dba8424bb4e2a71b7f /java/client/src | |
| parent | 490d068a2aeed02fb7521caccee7bff6c1b77bb7 (diff) | |
| download | qpid-python-fc1ba27a41f5a6223c7331004d1ab3a44ec07cf6.tar.gz | |
QPID-1871 : Updated RollbackOrderTest to include an onMessage test. Fixed deadlock issue with 0-10 rollback method and onMessage usage. Moved 0-10 rollback strategy to the abstract AMQSession and updated 0-8 to use that approach.
0-8 Still excluded from test runs as the race condition is not that the dispatcher would hold a message and reject after the TxRollback. The problem is the Java Broker sends a message out after the FlowOk message so the Dispatcher then sits on it, see QPID-2116. This exact problem was hidden due to the way the Dispatcher is stopped. This problem has not been addressed. The request to stop the dispatcher can only actually stop the dispatcher whilst it is in the middle of processing a message. The stop needs to interrupt the _queue.take() and hold the dispatcher AFTER the processing of any message that it needs to do: see QPID-2117.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@819590 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
3 files changed, 58 insertions, 19 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 2e3e417c95..5c565989d7 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -91,6 +91,7 @@ import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.jms.Session; import org.apache.qpid.thread.Threading; import org.apache.qpid.url.AMQBindingURL; +import org.apache.mina.common.IoSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1559,6 +1560,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic suspendChannel(true); } + // Let the dispatcher know that all the incomming messages + // should be rolled back(reject/release) + _rollbackMark.set(_highestDeliveryTag.get()); + + syncDispatchQueue(); + + _dispatcher.rollback(); + releaseForRollback(); sendRollback(); @@ -1851,26 +1860,58 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic void failoverPrep() { - startDispatcherIfNecessary(); syncDispatchQueue(); } void syncDispatchQueue() { - final CountDownLatch signal = new CountDownLatch(1); - _queue.add(new Dispatchable() { - public void dispatch(AMQSession ssn) + if (Thread.currentThread() == _dispatcherThread) + { + while (!_closed.get() && !_queue.isEmpty()) { - signal.countDown(); + Dispatchable disp; + try + { + disp = (Dispatchable) _queue.take(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + + // Check just in case _queue becomes empty, it shouldn't but + // better than an NPE. + if (disp == null) + { + _logger.debug("_queue became empty during sync."); + break; + } + + disp.dispatch(AMQSession.this); } - }); - try - { - signal.await(); } - catch (InterruptedException e) + else { - throw new RuntimeException(e); + startDispatcherIfNecessary(); + + final CountDownLatch signal = new CountDownLatch(1); + + _queue.add(new Dispatchable() + { + public void dispatch(AMQSession ssn) + { + signal.countDown(); + } + }); + + try + { + signal.await(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 0644bd88a8..1587d6a6bf 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -414,9 +414,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic public void releaseForRollback() { - startDispatcherIfNecessary(); - syncDispatchQueue(); - _dispatcher.rollback(); getQpidSession().messageRelease(_txRangeSet, Option.SET_REDELIVERED); _txRangeSet.clear(); _txSize = 0; diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index d7196c0abb..bc1453beaf 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -195,6 +195,12 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B public void releaseForRollback() { + // Reject all the messages that have been received in this session and + // have not yet been acknowledged. Should look to remove + // _deliveredMessageTags and use _txRangeSet as used by 0-10. + // Otherwise messages will be able to arrive out of order to a second + // consumer on the queue. Whilst this is within the JMS spec it is not + // user friendly and avoidable. while (true) { Long tag = _deliveredMessageTags.poll(); @@ -205,11 +211,6 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B rejectMessage(tag, true); } - - if (_dispatcher != null) - { - _dispatcher.rollback(); - } } public void rejectMessage(long deliveryTag, boolean requeue) |
