diff options
Diffstat (limited to 'qpid/java/client/src')
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 3 | ||||
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java | 43 |
2 files changed, 8 insertions, 38 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 784b75af10..48c4e3e3e6 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -371,7 +371,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * Set when the dispatcher should direct incoming messages straight into the UnackedMessage list instead of * to the syncRecieveQueue or MessageListener. Used during cleanup, e.g. in Session.recover(). */ - protected volatile boolean _usingDispatcherForCleanup; + private volatile boolean _usingDispatcherForCleanup; /** Used to indicates that the connection to which this session belongs, has been stopped. */ private boolean _connectionStopped; @@ -3570,3 +3570,4 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } } + diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 49b77dcc7b..8395c8f4b7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -795,43 +795,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { if (suspend) { - synchronized (getMessageDeliveryLock()) - { - for (BasicMessageConsumer consumer : _consumers.values()) - { - getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()), - Option.UNRELIABLE); - sync(); - List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags(); - _prefetchedMessageTags.addAll(tags); - } - } - - _usingDispatcherForCleanup = true; - syncDispatchQueue(); - _usingDispatcherForCleanup = false; - - RangeSet delivered = gatherRangeSet(_unacknowledgedMessageTags); - RangeSet prefetched = gatherRangeSet(_prefetchedMessageTags); - RangeSet all = RangeSetFactory.createRangeSet(delivered.size() - + prefetched.size()); - - for (Iterator<Range> deliveredIter = delivered.iterator(); deliveredIter.hasNext();) - { - Range range = deliveredIter.next(); - all.add(range); - } - - for (Iterator<Range> prefetchedIter = prefetched.iterator(); prefetchedIter.hasNext();) - { - Range range = prefetchedIter.next(); - all.add(range); - } - - flushProcessed(all, false); - getQpidSession().messageRelease(delivered,Option.SET_REDELIVERED); - getQpidSession().messageRelease(prefetched); - sync(); + for (BasicMessageConsumer consumer : _consumers.values()) + { + getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()), + Option.UNRELIABLE); + } } else { @@ -1387,3 +1355,4 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic getQpidSession().sync(); } } + |
