From b83d2c7f5d2cb6ef51459574f7fa4fe4bd629563 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Thu, 17 May 2007 12:12:34 +0000 Subject: Fix for broken CSDM message purging routine that was causing python test_get to fail. Replaced long while control with a method call that is easier to understand and has more comments. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@538882 13f79535-47bb-0310-9956-ffa450edef68 --- .../queue/ConcurrentSelectorDeliveryManager.java | 56 +++++++++++++++++++--- 1 file changed, 49 insertions(+), 7 deletions(-) (limited to 'java') diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index bdc2189676..0fb5e6d88a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -451,13 +451,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager AMQMessage message = messages.peek(); //while (we have a message) && ((The subscriber is not a browser or message is taken ) or we are clearing) && (Check message is taken.) - while (message != null - && ( - ((sub != null && !sub.isBrowser()) || message.isTaken(_queue)) - || sub == null) - && (message.taken(_queue, sub) // Message not taken by another consumer ... unless it is expired - || (sub == null || message.expired(sub.getChannel().getStoreContext(), _queue))) // Message not expired - ) + while (purgeMessage(message, sub)) { //remove the already taken message or expired AMQMessage removed = messages.poll(); @@ -478,6 +472,54 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager return message; } + /** + * + * @param message + * @param sub + * @return + * @throws AMQException + */ + private boolean purgeMessage(AMQMessage message, Subscription sub) throws AMQException + { + //Original.. complicated while loop control +// (message != null +// && ( +// ((sub != null && !sub.isBrowser()) || message.isTaken(_queue)) +// || sub == null) +// && message.taken(_queue, sub)); + + boolean purge = false; + + // if the message is null then don't purge as we have no messagse. + if (message != null) + { + // if we have a subscriber perform message checks + if (sub != null) + { + // Check that the message hasn't expired. + if (message.expired(sub.getChannel().getStoreContext(), _queue)) + { + return true; + } + + // if we have a queue browser(we don't purge) so check mark the message as taken + purge = ((!sub.isBrowser() || message.isTaken(_queue))); + } + else + { + // if there is no subscription we are doing + // a get or purging so mark message as taken. + message.isTaken(_queue); + // and then ensure that it gets purged + purge = true; + } + } + + // if we are purging then ensure we mark this message taken for the current subscriber + // the current subscriber may be null in the case of a get or a purge but this is ok. + return purge && message.taken(_queue, sub); + } + public void sendNextMessage(Subscription sub, AMQQueue queue)//Queue messageQueue) { -- cgit v1.2.1