diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cpg.h')
| -rw-r--r-- | cpp/src/qpid/cluster/Cpg.h | 64 |
1 files changed, 64 insertions, 0 deletions
diff --git a/cpp/src/qpid/cluster/Cpg.h b/cpp/src/qpid/cluster/Cpg.h index cffbf0bdb3..6b81c602bd 100644 --- a/cpp/src/qpid/cluster/Cpg.h +++ b/cpp/src/qpid/cluster/Cpg.h @@ -39,6 +39,7 @@ namespace cluster { * On error all functions throw Cpg::Exception. * */ + class Cpg : public sys::IOHandle { public: struct Exception : public ::qpid::Exception { @@ -114,10 +115,73 @@ class Cpg : public sys::IOHandle { int getFd(); private: + + // Maximum number of retries for cog functions that can tell + // us to "try again later". + static const unsigned int cpgRetries = 5; + + // Don't let sleep-time between cpg retries to go above 0.1 second. + static const unsigned int maxCpgRetrySleep = 100000; + + + // Base class for the Cpg operations that need retry capability. + struct CpgOp { + std::string opName; + + CpgOp ( std::string opName ) + : opName(opName) { } + + virtual cpg_error_t op ( cpg_handle_t handle, struct cpg_name * ) = 0; + virtual std::string msg(const Name&) = 0; + virtual ~CpgOp ( ) { } + }; + + + struct CpgJoinOp : public CpgOp { + CpgJoinOp ( ) + : CpgOp ( std::string("cpg_join") ) { } + + cpg_error_t op(cpg_handle_t handle, struct cpg_name * group) { + return cpg_join ( handle, group ); + } + + std::string msg(const Name& name) { return cantJoinMsg(name); } + }; + + struct CpgLeaveOp : public CpgOp { + CpgLeaveOp ( ) + : CpgOp ( std::string("cpg_leave") ) { } + + cpg_error_t op(cpg_handle_t handle, struct cpg_name * group) { + return cpg_leave ( handle, group ); + } + + std::string msg(const Name& name) { return cantLeaveMsg(name); } + }; + + struct CpgFinalizeOp : public CpgOp { + CpgFinalizeOp ( ) + : CpgOp ( std::string("cpg_finalize") ) { } + + cpg_error_t op(cpg_handle_t handle, struct cpg_name *) { + return cpg_finalize ( handle ); + } + + std::string msg(const Name& name) { return cantFinalizeMsg(name); } + }; + + // This fn standardizes retry policy across all Cpg ops that need it. + void callCpg ( CpgOp & ); + + CpgJoinOp cpgJoinOp; + CpgLeaveOp cpgLeaveOp; + CpgFinalizeOp cpgFinalizeOp; + static std::string errorStr(cpg_error_t err, const std::string& msg); static std::string cantJoinMsg(const Name&); static std::string cantLeaveMsg(const Name&); static std::string cantMcastMsg(const Name&); + static std::string cantFinalizeMsg(const Name&); static Cpg* cpgFromHandle(cpg_handle_t); |
