summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-04-19 15:07:54 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-04-19 15:07:54 +0000
commit281b4220b2c53c430542a152f1d925c4cc077305 (patch)
treef2f951a3211a6224e0ca4db2aa314ddc300f2241 /java
parent667a4db08873b4933870f8c989f32ead15dd648c (diff)
downloadqpid-python-281b4220b2c53c430542a152f1d925c4cc077305.tar.gz
QPID-459 - NoLocal broken when messages already exist on queue from consumer. With test.
ConcurrentSelectorDeliveryManager - method changes from hasFilter to filtersMessages. Forgot to include the file in the commit. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@530441 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java20
1 files changed, 17 insertions, 3 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 979f692361..1f92cee1df 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -28,7 +28,6 @@ import java.util.Set;
import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.Executor;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicInteger;
@@ -372,7 +371,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
for (Subscription sub : _subscriptions.getSubscriptions())
{
- if (!sub.isSuspended() && sub.hasFilters())
+ if (!sub.isSuspended() && sub.filtersMessages())
{
Queue<AMQMessage> preDeliveryQueue = sub.getPreDeliveryQueue();
for (AMQMessage msg : messageList)
@@ -613,6 +612,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_processingThreadName = Thread.currentThread().getName();
}
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(debugIdentity() + "Running process Queue." + currentStatus());
+ }
+
// Continue to process delivery while we haveSubscribers and messages
boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
@@ -633,11 +637,17 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
}
}
+
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(debugIdentity() + "Done process Queue." + currentStatus());
+ }
+
}
// private void sendNextMessage(Subscription sub)
// {
-// if (sub.hasFilters())
+// if (sub.filtersMessages())
// {
// sendNextMessage(sub, sub.getPreDeliveryQueue());
// if (sub.isAutoClose())
@@ -817,6 +827,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
//are we already running? if so, don't re-run
if (_processing.compareAndSet(false, true))
{
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(debugIdentity() + "Executing Async process.");
+ }
executor.execute(asyncDelivery);
}
}