summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java56
1 files changed, 49 insertions, 7 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index bdc2189676..0fb5e6d88a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -451,13 +451,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
AMQMessage message = messages.peek();
//while (we have a message) && ((The subscriber is not a browser or message is taken ) or we are clearing) && (Check message is taken.)
- while (message != null
- && (
- ((sub != null && !sub.isBrowser()) || message.isTaken(_queue))
- || sub == null)
- && (message.taken(_queue, sub) // Message not taken by another consumer ... unless it is expired
- || (sub == null || message.expired(sub.getChannel().getStoreContext(), _queue))) // Message not expired
- )
+ while (purgeMessage(message, sub))
{
//remove the already taken message or expired
AMQMessage removed = messages.poll();
@@ -478,6 +472,54 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
return message;
}
+ /**
+ *
+ * @param message
+ * @param sub
+ * @return
+ * @throws AMQException
+ */
+ private boolean purgeMessage(AMQMessage message, Subscription sub) throws AMQException
+ {
+ //Original.. complicated while loop control
+// (message != null
+// && (
+// ((sub != null && !sub.isBrowser()) || message.isTaken(_queue))
+// || sub == null)
+// && message.taken(_queue, sub));
+
+ boolean purge = false;
+
+ // if the message is null then don't purge as we have no messagse.
+ if (message != null)
+ {
+ // if we have a subscriber perform message checks
+ if (sub != null)
+ {
+ // Check that the message hasn't expired.
+ if (message.expired(sub.getChannel().getStoreContext(), _queue))
+ {
+ return true;
+ }
+
+ // if we have a queue browser(we don't purge) so check mark the message as taken
+ purge = ((!sub.isBrowser() || message.isTaken(_queue)));
+ }
+ else
+ {
+ // if there is no subscription we are doing
+ // a get or purging so mark message as taken.
+ message.isTaken(_queue);
+ // and then ensure that it gets purged
+ purge = true;
+ }
+ }
+
+ // if we are purging then ensure we mark this message taken for the current subscriber
+ // the current subscriber may be null in the case of a get or a purge but this is ok.
+ return purge && message.taken(_queue, sub);
+ }
+
public void sendNextMessage(Subscription sub, AMQQueue queue)//Queue<AMQMessage> messageQueue)
{