summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp33
1 files changed, 21 insertions, 12 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 16e91fc1cf..9586f6b994 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -124,21 +124,20 @@ bool Queue::acquire(const QueuedMessage& msg) {
return false;
}
-void Queue::requestDispatch(Consumer* c, bool sync){
+void Queue::requestDispatch(Consumer* c){
if (!c || c->preAcquires()) {
- if (sync) {
- Mutex::ScopedLock locker(messageLock);
- dispatch();
- } else {
- serializer.execute(dispatchCallback);
- }
+ serializer.execute(dispatchCallback);
} else {
- //note: this is always done on the callers thread, regardless
- // of sync; browsers of large queues should use flow control!
serviceBrowser(c);
}
}
+void Queue::flush(DispatchCompletion& completion)
+{
+ DispatchFunctor f(*this, &completion);
+ serializer.execute(f);
+}
+
Consumer* Queue::allocate()
{
RWlock::ScopedWlock locker(consumerLock);
@@ -179,9 +178,18 @@ void Queue::dispatch(){
} else {
break;
}
- }
- RWlock::ScopedRlock locker(consumerLock);
- for (Consumers::iterator i = browsers.begin(); i != browsers.end(); i++) {
+ }
+ serviceAllBrowsers();
+}
+
+void Queue::serviceAllBrowsers()
+{
+ Consumers copy;
+ {
+ RWlock::ScopedRlock locker(consumerLock);
+ copy = browsers;
+ }
+ for (Consumers::iterator i = copy.begin(); i != copy.end(); i++) {
serviceBrowser(*i);
}
}
@@ -428,3 +436,4 @@ boost::shared_ptr<Exchange> Queue::getAlternateExchange()
{
return alternateExchange;
}
+