diff options
| author | Alan Conway <aconway@apache.org> | 2009-01-09 14:47:21 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2009-01-09 14:47:21 +0000 |
| commit | bd5ae249ef74e1707eb05dd6cc70bb816e318757 (patch) | |
| tree | 8bddf4750138996b45b1c099b2619a9a836f2015 /cpp/src/qpid/cluster/Multicaster.cpp | |
| parent | 29efbadc913a3386ba5877512e0a8887504d4a42 (diff) | |
| download | qpid-python-bd5ae249ef74e1707eb05dd6cc70bb816e318757.tar.gz | |
Fix --cluster-mast-max: errors in last commit.
Work around problems with CPG flow control.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@733051 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Multicaster.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/Multicaster.cpp | 25 |
1 files changed, 16 insertions, 9 deletions
diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp index b02fa16ae9..34614dc1ef 100644 --- a/cpp/src/qpid/cluster/Multicaster.cpp +++ b/cpp/src/qpid/cluster/Multicaster.cpp @@ -68,21 +68,27 @@ void Multicaster::sendMcast(PollableEventQueue::Queue& values) { try { PollableEventQueue::Queue::iterator i = values.begin(); while( i != values.end()) { - iovec iov = { const_cast<char*>(i->getStore()), i->getStoreSize() }; - if (!cpg.mcast(&iov, 1)) - break; // cpg.mcast returns false for flow control - QPID_LOG(trace, " MCAST " << *i); - ++i; if (mcastMax) { sys::Mutex::ScopedLock l(lock); - assert(pending < mcastMax); - if (++pending == mcastMax) { + if (pending == mcastMax) { queue.stop(); break ; } + ++pending; + } + iovec iov = { const_cast<char*>(i->getStore()), i->getStoreSize() }; + if (!cpg.mcast(&iov, 1)) { + // cpg didn't send because of CPG flow control. + if (mcastMax) { + sys::Mutex::ScopedLock l(lock); + --pending; + } + break; } + QPID_LOG(trace, " MCAST " << *i); + ++i; } - values.erase(values.begin(), i); + values.erase(values.begin(), i); // Erase sent events. } catch (const std::exception& e) { QPID_LOG(critical, "Multicast error: " << e.what()); @@ -98,9 +104,10 @@ void Multicaster::release() { holdingQueue.clear(); } -void Multicaster::delivered(const Event&) { +void Multicaster::selfDeliver(const Event&) { sys::Mutex::ScopedLock l(lock); if (mcastMax) { + assert(pending > 0); assert(pending <= mcastMax); if (pending == mcastMax) queue.start(); |
