From 11e35dd461a19e23053756cee5c4ec214fa5597f Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Fri, 9 Jan 2009 04:50:35 +0000 Subject: Added --cluster-read-max: max number of outstanding mcasts in CPG buffers. Work around problems with CPG flow control. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@732925 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/cluster/Multicaster.cpp | 32 ++++++++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) (limited to 'cpp/src/qpid/cluster/Multicaster.cpp') diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp index a106ec128b..b02fa16ae9 100644 --- a/cpp/src/qpid/cluster/Multicaster.cpp +++ b/cpp/src/qpid/cluster/Multicaster.cpp @@ -28,10 +28,14 @@ namespace qpid { namespace cluster { -Multicaster::Multicaster(Cpg& cpg_, const boost::shared_ptr& poller, boost::function onError_) : - onError(onError_), cpg(cpg_), +Multicaster::Multicaster(Cpg& cpg_, size_t mcastMax_, + const boost::shared_ptr& poller, + boost::function onError_) : + onError(onError_), cpg(cpg_), queue(boost::bind(&Multicaster::sendMcast, this, _1), poller), - holding(true) + holding(true), + mcastMax(mcastMax_), + pending(0) { queue.start(); } @@ -56,6 +60,7 @@ void Multicaster::mcast(const Event& e) { } } queue.push(e); + } @@ -64,9 +69,18 @@ void Multicaster::sendMcast(PollableEventQueue::Queue& values) { PollableEventQueue::Queue::iterator i = values.begin(); while( i != values.end()) { iovec iov = { const_cast(i->getStore()), i->getStoreSize() }; - if (!cpg.mcast(&iov, 1)) break; // returns false for flow control + if (!cpg.mcast(&iov, 1)) + break; // cpg.mcast returns false for flow control QPID_LOG(trace, " MCAST " << *i); ++i; + if (mcastMax) { + sys::Mutex::ScopedLock l(lock); + assert(pending < mcastMax); + if (++pending == mcastMax) { + queue.stop(); + break ; + } + } } values.erase(values.begin(), i); } @@ -84,4 +98,14 @@ void Multicaster::release() { holdingQueue.clear(); } +void Multicaster::delivered(const Event&) { + sys::Mutex::ScopedLock l(lock); + if (mcastMax) { + assert(pending <= mcastMax); + if (pending == mcastMax) + queue.start(); + --pending; + } +} + }} // namespace qpid::cluster -- cgit v1.2.1