diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Multicaster.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/Multicaster.cpp | 25 |
1 files changed, 16 insertions, 9 deletions
diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp index b02fa16ae9..34614dc1ef 100644 --- a/cpp/src/qpid/cluster/Multicaster.cpp +++ b/cpp/src/qpid/cluster/Multicaster.cpp @@ -68,21 +68,27 @@ void Multicaster::sendMcast(PollableEventQueue::Queue& values) { try { PollableEventQueue::Queue::iterator i = values.begin(); while( i != values.end()) { - iovec iov = { const_cast<char*>(i->getStore()), i->getStoreSize() }; - if (!cpg.mcast(&iov, 1)) - break; // cpg.mcast returns false for flow control - QPID_LOG(trace, " MCAST " << *i); - ++i; if (mcastMax) { sys::Mutex::ScopedLock l(lock); - assert(pending < mcastMax); - if (++pending == mcastMax) { + if (pending == mcastMax) { queue.stop(); break ; } + ++pending; + } + iovec iov = { const_cast<char*>(i->getStore()), i->getStoreSize() }; + if (!cpg.mcast(&iov, 1)) { + // cpg didn't send because of CPG flow control. + if (mcastMax) { + sys::Mutex::ScopedLock l(lock); + --pending; + } + break; } + QPID_LOG(trace, " MCAST " << *i); + ++i; } - values.erase(values.begin(), i); + values.erase(values.begin(), i); // Erase sent events. } catch (const std::exception& e) { QPID_LOG(critical, "Multicast error: " << e.what()); @@ -98,9 +104,10 @@ void Multicaster::release() { holdingQueue.clear(); } -void Multicaster::delivered(const Event&) { +void Multicaster::selfDeliver(const Event&) { sys::Mutex::ScopedLock l(lock); if (mcastMax) { + assert(pending > 0); assert(pending <= mcastMax); if (pending == mcastMax) queue.start(); |
