From 281b4220b2c53c430542a152f1d925c4cc077305 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Thu, 19 Apr 2007 15:07:54 +0000 Subject: QPID-459 - NoLocal broken when messages already exist on queue from consumer. With test. ConcurrentSelectorDeliveryManager - method changes from hasFilter to filtersMessages. Forgot to include the file in the commit. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@530441 13f79535-47bb-0310-9956-ffa450edef68 --- .../queue/ConcurrentSelectorDeliveryManager.java | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) (limited to 'java') diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 979f692361..1f92cee1df 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -28,7 +28,6 @@ import java.util.Set; import java.util.Collections; import java.util.HashSet; import java.util.concurrent.Executor; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicInteger; @@ -372,7 +371,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { for (Subscription sub : _subscriptions.getSubscriptions()) { - if (!sub.isSuspended() && sub.hasFilters()) + if (!sub.isSuspended() && sub.filtersMessages()) { Queue preDeliveryQueue = sub.getPreDeliveryQueue(); for (AMQMessage msg : messageList) @@ -613,6 +612,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _processingThreadName = Thread.currentThread().getName(); } + if (_log.isDebugEnabled()) + { + _log.debug(debugIdentity() + "Running process Queue." + currentStatus()); + } + // Continue to process delivery while we haveSubscribers and messages boolean hasSubscribers = _subscriptions.hasActiveSubscribers(); @@ -633,11 +637,17 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } } + + if (_log.isDebugEnabled()) + { + _log.debug(debugIdentity() + "Done process Queue." + currentStatus()); + } + } // private void sendNextMessage(Subscription sub) // { -// if (sub.hasFilters()) +// if (sub.filtersMessages()) // { // sendNextMessage(sub, sub.getPreDeliveryQueue()); // if (sub.isAutoClose()) @@ -817,6 +827,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager //are we already running? if so, don't re-run if (_processing.compareAndSet(false, true)) { + if (_log.isDebugEnabled()) + { + _log.debug(debugIdentity() + "Executing Async process."); + } executor.execute(asyncDelivery); } } -- cgit v1.2.1