From dbb447da624494db2f376837feab22fb5db989fb Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Wed, 12 Aug 2009 21:08:51 +0000 Subject: Batch multiple events into a single CPG multicast. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@803713 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/cluster/Multicaster.cpp | 32 ++++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) (limited to 'cpp/src/qpid/cluster/Multicaster.cpp') diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp index 7e97963318..4123e11c92 100644 --- a/cpp/src/qpid/cluster/Multicaster.cpp +++ b/cpp/src/qpid/cluster/Multicaster.cpp @@ -24,10 +24,14 @@ #include "qpid/log/Statement.h" #include "qpid/framing/AMQBody.h" #include "qpid/framing/AMQFrame.h" +#include +#include namespace qpid { namespace cluster { +static const int MCAST_IOV_MAX=63; // Limit imposed by CPG + Multicaster::Multicaster(Cpg& cpg_, const boost::shared_ptr& poller, boost::function onError_) : @@ -36,7 +40,8 @@ Multicaster::Multicaster(Cpg& cpg_, #endif onError(onError_), cpg(cpg_), queue(boost::bind(&Multicaster::sendMcast, this, _1), poller), - holding(true) + holding(true), + ioVector(MCAST_IOV_MAX) { queue.start(); } @@ -70,26 +75,29 @@ void Multicaster::mcast(const Event& e) { queue.push(e); } - -Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast(const PollableEventQueue::Batch& values) { +Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast( + const PollableEventQueue::Batch& events) +{ + PollableEventQueue::Batch::const_iterator i = events.begin(); try { - PollableEventQueue::Batch::const_iterator i = values.begin(); - while( i != values.end()) { - iovec iov = i->toIovec(); - if (!cpg.mcast(&iov, 1)) { - // cpg didn't send because of CPG flow control. - break; + while (i < events.end()) { + size_t count = std::min(MCAST_IOV_MAX, int(events.end() - i)); + std::transform(i, i+count, ioVector.begin(), + boost::bind(&Event::toIovec, _1)); + if (!cpg.mcast(&ioVector.front(), count)) { + QPID_LOG(trace, "CPG flow control, will resend " + << events.end() - i << " events"); + break; } - ++i; + i += count; } - return i; } catch (const std::exception& e) { QPID_LOG(critical, "Multicast error: " << e.what()); queue.stop(); onError(); - return values.end(); } + return i; } void Multicaster::release() { -- cgit v1.2.1