summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cpg.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cpg.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cpg.cpp40
1 files changed, 40 insertions, 0 deletions
diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp
index 674781ac06..6b01d73197 100644
--- a/cpp/src/qpid/cluster/Cpg.cpp
+++ b/cpp/src/qpid/cluster/Cpg.cpp
@@ -77,6 +77,46 @@ Cpg::~Cpg() {
}
}
+void Cpg::join(const Name& group) {
+ check(cpg_join(handle, const_cast<Name*>(&group)),cantJoinMsg(group));
+}
+
+void Cpg::leave(const Name& group) {
+ check(cpg_leave(handle,const_cast<Name*>(&group)),cantLeaveMsg(group));
+}
+
+bool Cpg::isFlowControlEnabled() {
+ cpg_flow_control_state_t flowState;
+ check(cpg_flow_control_state_get(handle, &flowState), "Cannot get CPG flow control status.");
+ return flowState == CPG_FLOW_CONTROL_ENABLED;
+}
+
+// TODO aconway 2008-08-07: better handling of flow control.
+// Wait for flow control to be disabled.
+void Cpg::waitForFlowControl() {
+ int delayNs=1000; // one millisecond
+ int tries=8; // double the delay on each try.
+ while (isFlowControlEnabled() && tries > 0) {
+ QPID_LOG(warning, "CPG flow control enabled, retry in " << delayNs << "ns");
+ ::usleep(delayNs);
+ --tries;
+ delayNs *= 2;
+ };
+ if (tries == 0) {
+ // FIXME aconway 2008-08-07: this is a fatal leave-the-cluster condition.
+ throw Cpg::Exception("CPG flow control enabled, failed to send.");
+ }
+}
+
+void Cpg::mcast(const Name& group, const iovec* iov, int iovLen) {
+ waitForFlowControl();
+ cpg_error_t result;
+ do {
+ result = cpg_mcast_joined(handle, CPG_TYPE_AGREED, const_cast<iovec*>(iov), iovLen);
+ if (result != CPG_ERR_TRY_AGAIN) check(result, cantMcastMsg(group));
+ } while(result == CPG_ERR_TRY_AGAIN);
+}
+
void Cpg::shutdown() {
if (!isShutdown) {
QPID_LOG(debug,"Shutting down CPG");