From ea120fa96d784bcca44c92329a3f1598841522fb Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Wed, 17 Oct 2007 16:59:42 +0000 Subject: QPID-647 : Update to ConcurrentSelectorDeliveryManager to restart async process if a msg is queued that has the potential to be delivered. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@585575 13f79535-47bb-0310-9956-ffa450edef68 --- .../queue/ConcurrentSelectorDeliveryManager.java | 23 ++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) (limited to 'java/broker/src') diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 9564225190..d872b50458 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -211,6 +211,15 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } + /** + * + * @return the state of the async processor. + */ + public boolean isProcessingAsync() + { + return _processing.get(); + } + /** * Returns all the messages in the Queue * @@ -821,6 +830,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { addMessageToQueue(msg, deliverFirst); + //if we have a non-filtering subscriber but queued messages && we're not Async && we have other Active subs then something is wrong! + if ((s != null && hasQueuedMessages()) && !isProcessingAsync() && _subscriptions.hasActiveSubscribers()) + { + _queue.deliverAsync(); + } + //release lock now message is on queue. _lock.unlock(); @@ -975,6 +990,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { public void run() { + String startName = Thread.currentThread().getName(); + Thread.currentThread().setName("CSDM-AsyncDelivery:" + startName); boolean running = true; while (running && !_movingMessages.get()) { @@ -990,6 +1007,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _processing.set(false); } } + Thread.currentThread().setName(startName); } } @@ -1016,8 +1034,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager private String currentStatus() { - return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains(M:H)") + - "(" + _messages.size() + ":" + ((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize() + ") " + + return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains(H:M)") + + "(" + ((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize() + + ":" + (_messages.size() - ((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize()) + ") " + " Extra: " + (_hasContent.isEmpty() ? "Empty " : "Contains") + "(" + _hasContent.size() + ":" + _extraMessages.get() + ") " + " Active:" + _subscriptions.hasActiveSubscribers() + -- cgit v1.2.1