summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java63
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java3
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java11
3 files changed, 58 insertions, 19 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 2e3e417c95..5c565989d7 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -91,6 +91,7 @@ import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.jms.Session;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.url.AMQBindingURL;
+import org.apache.mina.common.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1559,6 +1560,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
suspendChannel(true);
}
+ // Let the dispatcher know that all the incomming messages
+ // should be rolled back(reject/release)
+ _rollbackMark.set(_highestDeliveryTag.get());
+
+ syncDispatchQueue();
+
+ _dispatcher.rollback();
+
releaseForRollback();
sendRollback();
@@ -1851,26 +1860,58 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
void failoverPrep()
{
- startDispatcherIfNecessary();
syncDispatchQueue();
}
void syncDispatchQueue()
{
- final CountDownLatch signal = new CountDownLatch(1);
- _queue.add(new Dispatchable() {
- public void dispatch(AMQSession ssn)
+ if (Thread.currentThread() == _dispatcherThread)
+ {
+ while (!_closed.get() && !_queue.isEmpty())
{
- signal.countDown();
+ Dispatchable disp;
+ try
+ {
+ disp = (Dispatchable) _queue.take();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ // Check just in case _queue becomes empty, it shouldn't but
+ // better than an NPE.
+ if (disp == null)
+ {
+ _logger.debug("_queue became empty during sync.");
+ break;
+ }
+
+ disp.dispatch(AMQSession.this);
}
- });
- try
- {
- signal.await();
}
- catch (InterruptedException e)
+ else
{
- throw new RuntimeException(e);
+ startDispatcherIfNecessary();
+
+ final CountDownLatch signal = new CountDownLatch(1);
+
+ _queue.add(new Dispatchable()
+ {
+ public void dispatch(AMQSession ssn)
+ {
+ signal.countDown();
+ }
+ });
+
+ try
+ {
+ signal.await();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 0644bd88a8..1587d6a6bf 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -414,9 +414,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
public void releaseForRollback()
{
- startDispatcherIfNecessary();
- syncDispatchQueue();
- _dispatcher.rollback();
getQpidSession().messageRelease(_txRangeSet, Option.SET_REDELIVERED);
_txRangeSet.clear();
_txSize = 0;
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index d7196c0abb..bc1453beaf 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -195,6 +195,12 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
public void releaseForRollback()
{
+ // Reject all the messages that have been received in this session and
+ // have not yet been acknowledged. Should look to remove
+ // _deliveredMessageTags and use _txRangeSet as used by 0-10.
+ // Otherwise messages will be able to arrive out of order to a second
+ // consumer on the queue. Whilst this is within the JMS spec it is not
+ // user friendly and avoidable.
while (true)
{
Long tag = _deliveredMessageTags.poll();
@@ -205,11 +211,6 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
rejectMessage(tag, true);
}
-
- if (_dispatcher != null)
- {
- _dispatcher.rollback();
- }
}
public void rejectMessage(long deliveryTag, boolean requeue)