summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cpg.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cpg.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cpg.cpp48
1 files changed, 32 insertions, 16 deletions
diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp
index a979ce1eeb..3148e52789 100644
--- a/cpp/src/qpid/cluster/Cpg.cpp
+++ b/cpp/src/qpid/cluster/Cpg.cpp
@@ -17,7 +17,10 @@
*/
#include "Cpg.h"
+
#include "qpid/sys/Mutex.h"
+#include "qpid/log/Statement.h"
+
#include <vector>
#include <limits>
#include <iterator>
@@ -33,16 +36,15 @@ using namespace std;
class Cpg::Handles
{
public:
- void put(cpg_handle_t handle, Cpg* object) {
+ void put(cpg_handle_t handle, Cpg::Handler* handler) {
sys::Mutex::ScopedLock l(lock);
- assert(object);
uint32_t index=uint32_t(handle); // Lower 32 bits is an array index.
if (index >= handles.size())
handles.resize(index+1, 0);
- handles[index] = object;
+ handles[index] = handler;
}
- Cpg* get(cpg_handle_t handle) {
+ Cpg::Handler* get(cpg_handle_t handle) {
sys::Mutex::ScopedLock l(lock);
uint32_t index=uint32_t(handle); // Lower 32 bits is an array index.
assert(index < handles.size());
@@ -52,7 +54,7 @@ class Cpg::Handles
private:
sys::Mutex lock;
- vector<Cpg*> handles;
+ vector<Cpg::Handler*> handles;
};
Cpg::Handles Cpg::handles;
@@ -66,7 +68,9 @@ void Cpg::globalDeliver (
void* msg,
int msg_len)
{
- handles.get(handle)->deliver(handle, group, nodeid, pid, msg, msg_len);
+ Cpg::Handler* handler=handles.get(handle);
+ if (handler)
+ handler->deliver(handle, group, nodeid, pid, msg, msg_len);
}
void Cpg::globalConfigChange(
@@ -77,23 +81,35 @@ void Cpg::globalConfigChange(
struct cpg_address *joined, int nJoined
)
{
- handles.get(handle)->configChange(handle, group, members, nMembers, left, nLeft, joined, nJoined);
+ Cpg::Handler* handler=handles.get(handle);
+ if (handler)
+ handler->configChange(handle, group, members, nMembers, left, nLeft, joined, nJoined);
}
-Cpg::Cpg(DeliverFn d, ConfigChangeFn c) : deliver(d), configChange(c)
-{
+Cpg::Cpg(Handler& h) : handler(h) {
cpg_callbacks_t callbacks = { &globalDeliver, &globalConfigChange };
check(cpg_initialize(&handle, &callbacks), "Cannot initialize CPG");
- handles.put(handle, this);
+ handles.put(handle, &handler);
}
Cpg::~Cpg() {
try {
- check(cpg_finalize(handle), "Error in shutdown of CPG");
+ shutdown();
+ } catch (const std::exception& e) {
+ QPID_LOG(error, string("Exception in Cpg destructor: ")+e.what());
}
- catch (...) {
- handles.put(handle, 0);
- throw;
+}
+
+struct Cpg::ClearHandleOnExit {
+ ClearHandleOnExit(cpg_handle_t h) : handle(h) {}
+ ~ClearHandleOnExit() { Cpg::handles.put(handle, 0); }
+ cpg_handle_t handle;
+};
+
+void Cpg::shutdown() {
+ if (handles.get(handle)) {
+ ClearHandleOnExit guard(handle); // Exception safe
+ check(cpg_finalize(handle), "Error in shutdown of CPG");
}
}
@@ -102,11 +118,11 @@ string Cpg::errorStr(cpg_error_t err, const std::string& msg) {
case CPG_OK: return msg+": ok";
case CPG_ERR_LIBRARY: return msg+": library";
case CPG_ERR_TIMEOUT: return msg+": timeout";
- case CPG_ERR_TRY_AGAIN: return msg+": try again";
+ case CPG_ERR_TRY_AGAIN: return msg+": timeout. The aisexec daemon may not be running";
case CPG_ERR_INVALID_PARAM: return msg+": invalid param";
case CPG_ERR_NO_MEMORY: return msg+": no memory";
case CPG_ERR_BAD_HANDLE: return msg+": bad handle";
- case CPG_ERR_ACCESS: return msg+": access";
+ case CPG_ERR_ACCESS: return msg+": access denied. You may need to set your group ID to 'ais'";
case CPG_ERR_NOT_EXIST: return msg+": not exist";
case CPG_ERR_EXIST: return msg+": exist";
case CPG_ERR_NOT_SUPPORTED: return msg+": not supported";