diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cpg.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/Cpg.cpp | 40 |
1 files changed, 40 insertions, 0 deletions
diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp index 674781ac06..6b01d73197 100644 --- a/cpp/src/qpid/cluster/Cpg.cpp +++ b/cpp/src/qpid/cluster/Cpg.cpp @@ -77,6 +77,46 @@ Cpg::~Cpg() { } } +void Cpg::join(const Name& group) { + check(cpg_join(handle, const_cast<Name*>(&group)),cantJoinMsg(group)); +} + +void Cpg::leave(const Name& group) { + check(cpg_leave(handle,const_cast<Name*>(&group)),cantLeaveMsg(group)); +} + +bool Cpg::isFlowControlEnabled() { + cpg_flow_control_state_t flowState; + check(cpg_flow_control_state_get(handle, &flowState), "Cannot get CPG flow control status."); + return flowState == CPG_FLOW_CONTROL_ENABLED; +} + +// TODO aconway 2008-08-07: better handling of flow control. +// Wait for flow control to be disabled. +void Cpg::waitForFlowControl() { + int delayNs=1000; // one millisecond + int tries=8; // double the delay on each try. + while (isFlowControlEnabled() && tries > 0) { + QPID_LOG(warning, "CPG flow control enabled, retry in " << delayNs << "ns"); + ::usleep(delayNs); + --tries; + delayNs *= 2; + }; + if (tries == 0) { + // FIXME aconway 2008-08-07: this is a fatal leave-the-cluster condition. + throw Cpg::Exception("CPG flow control enabled, failed to send."); + } +} + +void Cpg::mcast(const Name& group, const iovec* iov, int iovLen) { + waitForFlowControl(); + cpg_error_t result; + do { + result = cpg_mcast_joined(handle, CPG_TYPE_AGREED, const_cast<iovec*>(iov), iovLen); + if (result != CPG_ERR_TRY_AGAIN) check(result, cantMcastMsg(group)); + } while(result == CPG_ERR_TRY_AGAIN); +} + void Cpg::shutdown() { if (!isShutdown) { QPID_LOG(debug,"Shutting down CPG"); |
