summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java54
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java40
2 files changed, 93 insertions, 1 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 48c4e3e3e6..90e3100690 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -371,7 +371,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
* Set when the dispatcher should direct incoming messages straight into the UnackedMessage list instead of
* to the syncRecieveQueue or MessageListener. Used during cleanup, e.g. in Session.recover().
*/
- private volatile boolean _usingDispatcherForCleanup;
+ protected volatile boolean _usingDispatcherForCleanup;
/** Used to indicates that the connection to which this session belongs, has been stopped. */
private boolean _connectionStopped;
@@ -2247,6 +2247,58 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
}
+ void drainDispatchQueue()
+ {
+ if (Thread.currentThread() == _dispatcherThread)
+ {
+ while (!_closed.get() && !_queue.isEmpty())
+ {
+ Dispatchable disp;
+ try
+ {
+ disp = (Dispatchable) _queue.take();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ // Check just in case _queue becomes empty, it shouldn't but
+ // better than an NPE.
+ if (disp == null)
+ {
+ _logger.debug("_queue became empty during sync.");
+ break;
+ }
+
+ disp.dispatch(AMQSession.this);
+ }
+ }
+ else
+ {
+ startDispatcherIfNecessary(false);
+
+ final CountDownLatch signal = new CountDownLatch(1);
+
+ _queue.add(new Dispatchable()
+ {
+ public void dispatch(AMQSession ssn)
+ {
+ signal.countDown();
+ }
+ });
+
+ try
+ {
+ signal.await();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
/**
* Resubscribes all producers and consumers. This is called when performing failover.
*
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 8395c8f4b7..dcabfecac3 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -1354,5 +1354,45 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
super.resubscribe();
getQpidSession().sync();
}
+
+ @Override
+ void stop() throws AMQException
+ {
+ super.stop();
+ synchronized (getMessageDeliveryLock())
+ {
+ for (BasicMessageConsumer consumer : _consumers.values())
+ {
+ List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags();
+ _prefetchedMessageTags.addAll(tags);
+ }
+ }
+ _usingDispatcherForCleanup = true;
+ drainDispatchQueue();
+ _usingDispatcherForCleanup = false;
+
+ RangeSet delivered = gatherRangeSet(_unacknowledgedMessageTags);
+ RangeSet prefetched = gatherRangeSet(_prefetchedMessageTags);
+ RangeSet all = RangeSetFactory.createRangeSet(delivered.size()
+ + prefetched.size());
+
+ for (Iterator<Range> deliveredIter = delivered.iterator(); deliveredIter.hasNext();)
+ {
+ Range range = deliveredIter.next();
+ all.add(range);
+ }
+
+ for (Iterator<Range> prefetchedIter = prefetched.iterator(); prefetchedIter.hasNext();)
+ {
+ Range range = prefetchedIter.next();
+ all.add(range);
+ }
+
+ flushProcessed(all, false);
+ getQpidSession().messageRelease(delivered,Option.SET_REDELIVERED);
+ getQpidSession().messageRelease(prefetched);
+ sync();
+ }
+
}