diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cpg.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/Cpg.cpp | 48 |
1 files changed, 32 insertions, 16 deletions
diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp index a979ce1eeb..3148e52789 100644 --- a/cpp/src/qpid/cluster/Cpg.cpp +++ b/cpp/src/qpid/cluster/Cpg.cpp @@ -17,7 +17,10 @@ */ #include "Cpg.h" + #include "qpid/sys/Mutex.h" +#include "qpid/log/Statement.h" + #include <vector> #include <limits> #include <iterator> @@ -33,16 +36,15 @@ using namespace std; class Cpg::Handles { public: - void put(cpg_handle_t handle, Cpg* object) { + void put(cpg_handle_t handle, Cpg::Handler* handler) { sys::Mutex::ScopedLock l(lock); - assert(object); uint32_t index=uint32_t(handle); // Lower 32 bits is an array index. if (index >= handles.size()) handles.resize(index+1, 0); - handles[index] = object; + handles[index] = handler; } - Cpg* get(cpg_handle_t handle) { + Cpg::Handler* get(cpg_handle_t handle) { sys::Mutex::ScopedLock l(lock); uint32_t index=uint32_t(handle); // Lower 32 bits is an array index. assert(index < handles.size()); @@ -52,7 +54,7 @@ class Cpg::Handles private: sys::Mutex lock; - vector<Cpg*> handles; + vector<Cpg::Handler*> handles; }; Cpg::Handles Cpg::handles; @@ -66,7 +68,9 @@ void Cpg::globalDeliver ( void* msg, int msg_len) { - handles.get(handle)->deliver(handle, group, nodeid, pid, msg, msg_len); + Cpg::Handler* handler=handles.get(handle); + if (handler) + handler->deliver(handle, group, nodeid, pid, msg, msg_len); } void Cpg::globalConfigChange( @@ -77,23 +81,35 @@ void Cpg::globalConfigChange( struct cpg_address *joined, int nJoined ) { - handles.get(handle)->configChange(handle, group, members, nMembers, left, nLeft, joined, nJoined); + Cpg::Handler* handler=handles.get(handle); + if (handler) + handler->configChange(handle, group, members, nMembers, left, nLeft, joined, nJoined); } -Cpg::Cpg(DeliverFn d, ConfigChangeFn c) : deliver(d), configChange(c) -{ +Cpg::Cpg(Handler& h) : handler(h) { cpg_callbacks_t callbacks = { &globalDeliver, &globalConfigChange }; check(cpg_initialize(&handle, &callbacks), "Cannot initialize CPG"); - handles.put(handle, this); + handles.put(handle, &handler); } Cpg::~Cpg() { try { - check(cpg_finalize(handle), "Error in shutdown of CPG"); + shutdown(); + } catch (const std::exception& e) { + QPID_LOG(error, string("Exception in Cpg destructor: ")+e.what()); } - catch (...) { - handles.put(handle, 0); - throw; +} + +struct Cpg::ClearHandleOnExit { + ClearHandleOnExit(cpg_handle_t h) : handle(h) {} + ~ClearHandleOnExit() { Cpg::handles.put(handle, 0); } + cpg_handle_t handle; +}; + +void Cpg::shutdown() { + if (handles.get(handle)) { + ClearHandleOnExit guard(handle); // Exception safe + check(cpg_finalize(handle), "Error in shutdown of CPG"); } } @@ -102,11 +118,11 @@ string Cpg::errorStr(cpg_error_t err, const std::string& msg) { case CPG_OK: return msg+": ok"; case CPG_ERR_LIBRARY: return msg+": library"; case CPG_ERR_TIMEOUT: return msg+": timeout"; - case CPG_ERR_TRY_AGAIN: return msg+": try again"; + case CPG_ERR_TRY_AGAIN: return msg+": timeout. The aisexec daemon may not be running"; case CPG_ERR_INVALID_PARAM: return msg+": invalid param"; case CPG_ERR_NO_MEMORY: return msg+": no memory"; case CPG_ERR_BAD_HANDLE: return msg+": bad handle"; - case CPG_ERR_ACCESS: return msg+": access"; + case CPG_ERR_ACCESS: return msg+": access denied. You may need to set your group ID to 'ais'"; case CPG_ERR_NOT_EXIST: return msg+": not exist"; case CPG_ERR_EXIST: return msg+": exist"; case CPG_ERR_NOT_SUPPORTED: return msg+": not supported"; |
