From 0e107c3844c7078cf57212f16b1335dd50d4364c Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Fri, 7 Nov 2008 20:48:38 +0000 Subject: broker/Message, IncompleteMessageList: drop waitFor(De|En)Complete, replace with callbacks. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@712258 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/broker/IncompleteMessageList.cpp | 31 +++++++++++++++++++++------ 1 file changed, 24 insertions(+), 7 deletions(-) (limited to 'cpp/src/qpid/broker/IncompleteMessageList.cpp') diff --git a/cpp/src/qpid/broker/IncompleteMessageList.cpp b/cpp/src/qpid/broker/IncompleteMessageList.cpp index edb3721a40..64562dfb57 100644 --- a/cpp/src/qpid/broker/IncompleteMessageList.cpp +++ b/cpp/src/qpid/broker/IncompleteMessageList.cpp @@ -18,38 +18,55 @@ * under the License. * */ -#include "IncompleteMessageList.h" -#include "Message.h" +#include "IncompleteMessageList.h" namespace qpid { namespace broker { +IncompleteMessageList::IncompleteMessageList() : + callback(boost::bind(&IncompleteMessageList::enqueueComplete, this, _1)) +{} + void IncompleteMessageList::add(boost::intrusive_ptr msg) { + sys::Mutex::ScopedLock l(lock); + msg->setEnqueueCompleteCallback(callback); incomplete.push_back(msg); } -void IncompleteMessageList::process(const CompletionListener& l, bool sync) +void IncompleteMessageList::enqueueComplete(const boost::intrusive_ptr& ) { + sys::Mutex::ScopedLock l(lock); + lock.notify(); +} + +void IncompleteMessageList::process(const CompletionListener& listen, bool sync) { + sys::Mutex::ScopedLock l(lock); while (!incomplete.empty()) { boost::intrusive_ptr& msg = incomplete.front(); if (!msg->isEnqueueComplete()) { if (sync){ msg->flush(); - msg->waitForEnqueueComplete(); + while (!msg->isEnqueueComplete()) + lock.wait(); } else { //leave the message as incomplete for now return; } } - l(msg); + listen(msg); incomplete.pop_front(); } } -void IncompleteMessageList::each(const CompletionListener& l) { - std::for_each(incomplete.begin(), incomplete.end(), l); +void IncompleteMessageList::each(const CompletionListener& listen) { + Messages snapshot; + { + sys::Mutex::ScopedLock l(lock); + snapshot = incomplete; + } + std::for_each(incomplete.begin(), incomplete.end(), listen); // FIXME aconway 2008-11-07: passed by ref or value? } }} -- cgit v1.2.1