From bd5ae249ef74e1707eb05dd6cc70bb816e318757 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Fri, 9 Jan 2009 14:47:21 +0000 Subject: Fix --cluster-mast-max: errors in last commit. Work around problems with CPG flow control. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@733051 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/cluster/Multicaster.cpp | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) (limited to 'cpp/src/qpid/cluster/Multicaster.cpp') diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp index b02fa16ae9..34614dc1ef 100644 --- a/cpp/src/qpid/cluster/Multicaster.cpp +++ b/cpp/src/qpid/cluster/Multicaster.cpp @@ -68,21 +68,27 @@ void Multicaster::sendMcast(PollableEventQueue::Queue& values) { try { PollableEventQueue::Queue::iterator i = values.begin(); while( i != values.end()) { - iovec iov = { const_cast(i->getStore()), i->getStoreSize() }; - if (!cpg.mcast(&iov, 1)) - break; // cpg.mcast returns false for flow control - QPID_LOG(trace, " MCAST " << *i); - ++i; if (mcastMax) { sys::Mutex::ScopedLock l(lock); - assert(pending < mcastMax); - if (++pending == mcastMax) { + if (pending == mcastMax) { queue.stop(); break ; } + ++pending; + } + iovec iov = { const_cast(i->getStore()), i->getStoreSize() }; + if (!cpg.mcast(&iov, 1)) { + // cpg didn't send because of CPG flow control. + if (mcastMax) { + sys::Mutex::ScopedLock l(lock); + --pending; + } + break; } + QPID_LOG(trace, " MCAST " << *i); + ++i; } - values.erase(values.begin(), i); + values.erase(values.begin(), i); // Erase sent events. } catch (const std::exception& e) { QPID_LOG(critical, "Multicast error: " << e.what()); @@ -98,9 +104,10 @@ void Multicaster::release() { holdingQueue.clear(); } -void Multicaster::delivered(const Event&) { +void Multicaster::selfDeliver(const Event&) { sys::Mutex::ScopedLock l(lock); if (mcastMax) { + assert(pending > 0); assert(pending <= mcastMax); if (pending == mcastMax) queue.start(); -- cgit v1.2.1