diff options
Diffstat (limited to 'java')
| -rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java | 23 |
1 files changed, 21 insertions, 2 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 9564225190..d872b50458 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -212,6 +212,15 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } /** + * + * @return the state of the async processor. + */ + public boolean isProcessingAsync() + { + return _processing.get(); + } + + /** * Returns all the messages in the Queue * * @return List of messages @@ -821,6 +830,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { addMessageToQueue(msg, deliverFirst); + //if we have a non-filtering subscriber but queued messages && we're not Async && we have other Active subs then something is wrong! + if ((s != null && hasQueuedMessages()) && !isProcessingAsync() && _subscriptions.hasActiveSubscribers()) + { + _queue.deliverAsync(); + } + //release lock now message is on queue. _lock.unlock(); @@ -975,6 +990,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { public void run() { + String startName = Thread.currentThread().getName(); + Thread.currentThread().setName("CSDM-AsyncDelivery:" + startName); boolean running = true; while (running && !_movingMessages.get()) { @@ -990,6 +1007,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _processing.set(false); } } + Thread.currentThread().setName(startName); } } @@ -1016,8 +1034,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager private String currentStatus() { - return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains(M:H)") + - "(" + _messages.size() + ":" + ((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize() + ") " + + return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains(H:M)") + + "(" + ((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize() + + ":" + (_messages.size() - ((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize()) + ") " + " Extra: " + (_hasContent.isEmpty() ? "Empty " : "Contains") + "(" + _hasContent.size() + ":" + _extraMessages.get() + ") " + " Active:" + _subscriptions.hasActiveSubscribers() + |
