summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cpg.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cpg.h')
-rw-r--r--cpp/src/qpid/cluster/Cpg.h35
1 files changed, 20 insertions, 15 deletions
diff --git a/cpp/src/qpid/cluster/Cpg.h b/cpp/src/qpid/cluster/Cpg.h
index 5ffd42e12a..2bd58cea1f 100644
--- a/cpp/src/qpid/cluster/Cpg.h
+++ b/cpp/src/qpid/cluster/Cpg.h
@@ -19,16 +19,15 @@
*
*/
-#include "qpid/cluster/types.h"
-#include "qpid/cluster/Dispatchable.h"
-
#include "qpid/Exception.h"
+#include "qpid/cluster/Dispatchable.h"
+#include "qpid/cluster/types.h"
#include "qpid/sys/IOHandle.h"
+#include "qpid/sys/Mutex.h"
#include <boost/scoped_ptr.hpp>
#include <cassert>
-
#include <string.h>
extern "C" {
@@ -38,7 +37,6 @@ extern "C" {
namespace qpid {
namespace cluster {
-
/**
* Lightweight C++ interface to cpg.h operations.
*
@@ -53,6 +51,7 @@ class Cpg : public sys::IOHandle {
};
struct Name : public cpg_name {
+ Name() { length = 0; }
Name(const char* s) { copy(s, strlen(s)); }
Name(const char* s, size_t n) { copy(s,n); }
Name(const std::string& s) { copy(s.data(), s.size()); }
@@ -105,17 +104,21 @@ class Cpg : public sys::IOHandle {
* - CPG_DISPATCH_ALL - dispatch all available events, don't wait.
* - CPG_DISPATCH_BLOCKING - blocking dispatch loop.
*/
- void dispatch(cpg_dispatch_t type) {
- check(cpg_dispatch(handle,type), "Error in CPG dispatch");
- }
+ void dispatch(cpg_dispatch_t type);
void dispatchOne() { dispatch(CPG_DISPATCH_ONE); }
void dispatchAll() { dispatch(CPG_DISPATCH_ALL); }
void dispatchBlocking() { dispatch(CPG_DISPATCH_BLOCKING); }
- void join(const Name& group);
- void leave(const Name& group);
- void mcast(const Name& group, const iovec* iov, int iovLen);
+ void join(const std::string& group);
+ void leave();
+
+ /** Multicast to the group. NB: must not be called concurrently.
+ *
+ *@return true if the message was multi-cast, false if
+ * it was not sent due to flow control.
+ */
+ bool mcast(const iovec* iov, int iovLen);
cpg_handle_t getHandle() const { return handle; }
@@ -123,10 +126,13 @@ class Cpg : public sys::IOHandle {
int getFd();
+ bool isFlowControlEnabled();
+
private:
static std::string errorStr(cpg_error_t err, const std::string& msg);
static std::string cantJoinMsg(const Name&);
- static std::string cantLeaveMsg(const Name&); std::string cantMcastMsg(const Name&);
+ static std::string cantLeaveMsg(const Name&);
+ static std::string cantMcastMsg(const Name&);
static void check(cpg_error_t result, const std::string& msg) {
if (result != CPG_OK) throw Exception(errorStr(result, msg));
@@ -150,12 +156,11 @@ class Cpg : public sys::IOHandle {
struct cpg_address *joined, int nJoined
);
- bool isFlowControlEnabled();
- void waitForFlowControl();
-
cpg_handle_t handle;
Handler& handler;
bool isShutdown;
+ Name group;
+ sys::Mutex dispatchLock;
};
inline bool operator==(const cpg_name& a, const cpg_name& b) {