summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java23
1 files changed, 21 insertions, 2 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 9564225190..d872b50458 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -212,6 +212,15 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
/**
+ *
+ * @return the state of the async processor.
+ */
+ public boolean isProcessingAsync()
+ {
+ return _processing.get();
+ }
+
+ /**
* Returns all the messages in the Queue
*
* @return List of messages
@@ -821,6 +830,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
addMessageToQueue(msg, deliverFirst);
+ //if we have a non-filtering subscriber but queued messages && we're not Async && we have other Active subs then something is wrong!
+ if ((s != null && hasQueuedMessages()) && !isProcessingAsync() && _subscriptions.hasActiveSubscribers())
+ {
+ _queue.deliverAsync();
+ }
+
//release lock now message is on queue.
_lock.unlock();
@@ -975,6 +990,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
public void run()
{
+ String startName = Thread.currentThread().getName();
+ Thread.currentThread().setName("CSDM-AsyncDelivery:" + startName);
boolean running = true;
while (running && !_movingMessages.get())
{
@@ -990,6 +1007,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_processing.set(false);
}
}
+ Thread.currentThread().setName(startName);
}
}
@@ -1016,8 +1034,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
private String currentStatus()
{
- return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains(M:H)") +
- "(" + _messages.size() + ":" + ((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize() + ") " +
+ return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains(H:M)") +
+ "(" + ((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize() +
+ ":" + (_messages.size() - ((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize()) + ") " +
" Extra: " + (_hasContent.isEmpty() ? "Empty " : "Contains") +
"(" + _hasContent.size() + ":" + _extraMessages.get() + ") " +
" Active:" + _subscriptions.hasActiveSubscribers() +