From 2100b4498daa1d89e2850196dca19bae1b4a6151 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Thu, 7 Aug 2008 20:46:18 +0000 Subject: Check CPG flow control. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@683711 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/cluster/Cpg.cpp | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) (limited to 'cpp/src/qpid/cluster/Cpg.cpp') diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp index 674781ac06..6b01d73197 100644 --- a/cpp/src/qpid/cluster/Cpg.cpp +++ b/cpp/src/qpid/cluster/Cpg.cpp @@ -77,6 +77,46 @@ Cpg::~Cpg() { } } +void Cpg::join(const Name& group) { + check(cpg_join(handle, const_cast(&group)),cantJoinMsg(group)); +} + +void Cpg::leave(const Name& group) { + check(cpg_leave(handle,const_cast(&group)),cantLeaveMsg(group)); +} + +bool Cpg::isFlowControlEnabled() { + cpg_flow_control_state_t flowState; + check(cpg_flow_control_state_get(handle, &flowState), "Cannot get CPG flow control status."); + return flowState == CPG_FLOW_CONTROL_ENABLED; +} + +// TODO aconway 2008-08-07: better handling of flow control. +// Wait for flow control to be disabled. +void Cpg::waitForFlowControl() { + int delayNs=1000; // one millisecond + int tries=8; // double the delay on each try. + while (isFlowControlEnabled() && tries > 0) { + QPID_LOG(warning, "CPG flow control enabled, retry in " << delayNs << "ns"); + ::usleep(delayNs); + --tries; + delayNs *= 2; + }; + if (tries == 0) { + // FIXME aconway 2008-08-07: this is a fatal leave-the-cluster condition. + throw Cpg::Exception("CPG flow control enabled, failed to send."); + } +} + +void Cpg::mcast(const Name& group, const iovec* iov, int iovLen) { + waitForFlowControl(); + cpg_error_t result; + do { + result = cpg_mcast_joined(handle, CPG_TYPE_AGREED, const_cast(iov), iovLen); + if (result != CPG_ERR_TRY_AGAIN) check(result, cantMcastMsg(group)); + } while(result == CPG_ERR_TRY_AGAIN); +} + void Cpg::shutdown() { if (!isShutdown) { QPID_LOG(debug,"Shutting down CPG"); -- cgit v1.2.1