diff options
Diffstat (limited to 'java')
| -rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java | 56 |
1 files changed, 49 insertions, 7 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index bdc2189676..0fb5e6d88a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -451,13 +451,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager AMQMessage message = messages.peek(); //while (we have a message) && ((The subscriber is not a browser or message is taken ) or we are clearing) && (Check message is taken.) - while (message != null - && ( - ((sub != null && !sub.isBrowser()) || message.isTaken(_queue)) - || sub == null) - && (message.taken(_queue, sub) // Message not taken by another consumer ... unless it is expired - || (sub == null || message.expired(sub.getChannel().getStoreContext(), _queue))) // Message not expired - ) + while (purgeMessage(message, sub)) { //remove the already taken message or expired AMQMessage removed = messages.poll(); @@ -478,6 +472,54 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager return message; } + /** + * + * @param message + * @param sub + * @return + * @throws AMQException + */ + private boolean purgeMessage(AMQMessage message, Subscription sub) throws AMQException + { + //Original.. complicated while loop control +// (message != null +// && ( +// ((sub != null && !sub.isBrowser()) || message.isTaken(_queue)) +// || sub == null) +// && message.taken(_queue, sub)); + + boolean purge = false; + + // if the message is null then don't purge as we have no messagse. + if (message != null) + { + // if we have a subscriber perform message checks + if (sub != null) + { + // Check that the message hasn't expired. + if (message.expired(sub.getChannel().getStoreContext(), _queue)) + { + return true; + } + + // if we have a queue browser(we don't purge) so check mark the message as taken + purge = ((!sub.isBrowser() || message.isTaken(_queue))); + } + else + { + // if there is no subscription we are doing + // a get or purging so mark message as taken. + message.isTaken(_queue); + // and then ensure that it gets purged + purge = true; + } + } + + // if we are purging then ensure we mark this message taken for the current subscriber + // the current subscriber may be null in the case of a get or a purge but this is ok. + return purge && message.taken(_queue, sub); + } + public void sendNextMessage(Subscription sub, AMQQueue queue)//Queue<AMQMessage> messageQueue) { |
