diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cpg.h')
| -rw-r--r-- | cpp/src/qpid/cluster/Cpg.h | 35 |
1 files changed, 20 insertions, 15 deletions
diff --git a/cpp/src/qpid/cluster/Cpg.h b/cpp/src/qpid/cluster/Cpg.h index 5ffd42e12a..2bd58cea1f 100644 --- a/cpp/src/qpid/cluster/Cpg.h +++ b/cpp/src/qpid/cluster/Cpg.h @@ -19,16 +19,15 @@ * */ -#include "qpid/cluster/types.h" -#include "qpid/cluster/Dispatchable.h" - #include "qpid/Exception.h" +#include "qpid/cluster/Dispatchable.h" +#include "qpid/cluster/types.h" #include "qpid/sys/IOHandle.h" +#include "qpid/sys/Mutex.h" #include <boost/scoped_ptr.hpp> #include <cassert> - #include <string.h> extern "C" { @@ -38,7 +37,6 @@ extern "C" { namespace qpid { namespace cluster { - /** * Lightweight C++ interface to cpg.h operations. * @@ -53,6 +51,7 @@ class Cpg : public sys::IOHandle { }; struct Name : public cpg_name { + Name() { length = 0; } Name(const char* s) { copy(s, strlen(s)); } Name(const char* s, size_t n) { copy(s,n); } Name(const std::string& s) { copy(s.data(), s.size()); } @@ -105,17 +104,21 @@ class Cpg : public sys::IOHandle { * - CPG_DISPATCH_ALL - dispatch all available events, don't wait. * - CPG_DISPATCH_BLOCKING - blocking dispatch loop. */ - void dispatch(cpg_dispatch_t type) { - check(cpg_dispatch(handle,type), "Error in CPG dispatch"); - } + void dispatch(cpg_dispatch_t type); void dispatchOne() { dispatch(CPG_DISPATCH_ONE); } void dispatchAll() { dispatch(CPG_DISPATCH_ALL); } void dispatchBlocking() { dispatch(CPG_DISPATCH_BLOCKING); } - void join(const Name& group); - void leave(const Name& group); - void mcast(const Name& group, const iovec* iov, int iovLen); + void join(const std::string& group); + void leave(); + + /** Multicast to the group. NB: must not be called concurrently. + * + *@return true if the message was multi-cast, false if + * it was not sent due to flow control. + */ + bool mcast(const iovec* iov, int iovLen); cpg_handle_t getHandle() const { return handle; } @@ -123,10 +126,13 @@ class Cpg : public sys::IOHandle { int getFd(); + bool isFlowControlEnabled(); + private: static std::string errorStr(cpg_error_t err, const std::string& msg); static std::string cantJoinMsg(const Name&); - static std::string cantLeaveMsg(const Name&); std::string cantMcastMsg(const Name&); + static std::string cantLeaveMsg(const Name&); + static std::string cantMcastMsg(const Name&); static void check(cpg_error_t result, const std::string& msg) { if (result != CPG_OK) throw Exception(errorStr(result, msg)); @@ -150,12 +156,11 @@ class Cpg : public sys::IOHandle { struct cpg_address *joined, int nJoined ); - bool isFlowControlEnabled(); - void waitForFlowControl(); - cpg_handle_t handle; Handler& handler; bool isShutdown; + Name group; + sys::Mutex dispatchLock; }; inline bool operator==(const cpg_name& a, const cpg_name& b) { |
