diff options
Diffstat (limited to 'java/client')
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 54 | ||||
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java | 40 |
2 files changed, 93 insertions, 1 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 48c4e3e3e6..90e3100690 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -371,7 +371,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * Set when the dispatcher should direct incoming messages straight into the UnackedMessage list instead of * to the syncRecieveQueue or MessageListener. Used during cleanup, e.g. in Session.recover(). */ - private volatile boolean _usingDispatcherForCleanup; + protected volatile boolean _usingDispatcherForCleanup; /** Used to indicates that the connection to which this session belongs, has been stopped. */ private boolean _connectionStopped; @@ -2247,6 +2247,58 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } } + void drainDispatchQueue() + { + if (Thread.currentThread() == _dispatcherThread) + { + while (!_closed.get() && !_queue.isEmpty()) + { + Dispatchable disp; + try + { + disp = (Dispatchable) _queue.take(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + + // Check just in case _queue becomes empty, it shouldn't but + // better than an NPE. + if (disp == null) + { + _logger.debug("_queue became empty during sync."); + break; + } + + disp.dispatch(AMQSession.this); + } + } + else + { + startDispatcherIfNecessary(false); + + final CountDownLatch signal = new CountDownLatch(1); + + _queue.add(new Dispatchable() + { + public void dispatch(AMQSession ssn) + { + signal.countDown(); + } + }); + + try + { + signal.await(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + } + /** * Resubscribes all producers and consumers. This is called when performing failover. * diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 8395c8f4b7..dcabfecac3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -1354,5 +1354,45 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic super.resubscribe(); getQpidSession().sync(); } + + @Override + void stop() throws AMQException + { + super.stop(); + synchronized (getMessageDeliveryLock()) + { + for (BasicMessageConsumer consumer : _consumers.values()) + { + List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags(); + _prefetchedMessageTags.addAll(tags); + } + } + _usingDispatcherForCleanup = true; + drainDispatchQueue(); + _usingDispatcherForCleanup = false; + + RangeSet delivered = gatherRangeSet(_unacknowledgedMessageTags); + RangeSet prefetched = gatherRangeSet(_prefetchedMessageTags); + RangeSet all = RangeSetFactory.createRangeSet(delivered.size() + + prefetched.size()); + + for (Iterator<Range> deliveredIter = delivered.iterator(); deliveredIter.hasNext();) + { + Range range = deliveredIter.next(); + all.add(range); + } + + for (Iterator<Range> prefetchedIter = prefetched.iterator(); prefetchedIter.hasNext();) + { + Range range = prefetchedIter.next(); + all.add(range); + } + + flushProcessed(all, false); + getQpidSession().messageRelease(delivered,Option.SET_REDELIVERED); + getQpidSession().messageRelease(prefetched); + sync(); + } + } |
