summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cpg.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-07-02 22:35:33 +0000
committerAlan Conway <aconway@apache.org>2007-07-02 22:35:33 +0000
commit83b4417af81df92cb640de1694488156ba29d85f (patch)
tree630449e321fb571476080b737febd841e605ff2d /cpp/src/qpid/cluster/Cpg.cpp
parenta36bef1975b1d273a65dd0e74994106fbaad4389 (diff)
downloadqpid-python-83b4417af81df92cb640de1694488156ba29d85f.tar.gz
2007-06-30 <aconway@redhat.com>
* src/qpid/cluster/Cluster.cpp: Refactor - expose 4 handler points for all traffic to/from cluster. Removed HandlerUpdater functionality, separate class. Cluster only deals with membership and connecting the 4 handler points to CPG multicast. * src/tests/cluster.mk: Dropped newgrp ais wrapper scripts, its much simpler if the user just does "newgrp ais" before building. * src/tests/ais_check: Test script to check if users gid is ais and give clear notice if not. * src/tests/Cluster.cpp: Updated for changes to Cluster. * src/qpid/cluster/Cpg.cpp: Better messages for common errors. * Handler.h: Remove nextHandler() minor convenience is outweighted by risk of undetected errors if handlers that expect next() to be set are called when it's not set. * src/qpid/cluster/Cpg.cpp: Added logging. Replaced boost::function with traditional virtual interface (nasty stack traces.) git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@552614 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cpg.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cpg.cpp48
1 files changed, 32 insertions, 16 deletions
diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp
index a979ce1eeb..3148e52789 100644
--- a/cpp/src/qpid/cluster/Cpg.cpp
+++ b/cpp/src/qpid/cluster/Cpg.cpp
@@ -17,7 +17,10 @@
*/
#include "Cpg.h"
+
#include "qpid/sys/Mutex.h"
+#include "qpid/log/Statement.h"
+
#include <vector>
#include <limits>
#include <iterator>
@@ -33,16 +36,15 @@ using namespace std;
class Cpg::Handles
{
public:
- void put(cpg_handle_t handle, Cpg* object) {
+ void put(cpg_handle_t handle, Cpg::Handler* handler) {
sys::Mutex::ScopedLock l(lock);
- assert(object);
uint32_t index=uint32_t(handle); // Lower 32 bits is an array index.
if (index >= handles.size())
handles.resize(index+1, 0);
- handles[index] = object;
+ handles[index] = handler;
}
- Cpg* get(cpg_handle_t handle) {
+ Cpg::Handler* get(cpg_handle_t handle) {
sys::Mutex::ScopedLock l(lock);
uint32_t index=uint32_t(handle); // Lower 32 bits is an array index.
assert(index < handles.size());
@@ -52,7 +54,7 @@ class Cpg::Handles
private:
sys::Mutex lock;
- vector<Cpg*> handles;
+ vector<Cpg::Handler*> handles;
};
Cpg::Handles Cpg::handles;
@@ -66,7 +68,9 @@ void Cpg::globalDeliver (
void* msg,
int msg_len)
{
- handles.get(handle)->deliver(handle, group, nodeid, pid, msg, msg_len);
+ Cpg::Handler* handler=handles.get(handle);
+ if (handler)
+ handler->deliver(handle, group, nodeid, pid, msg, msg_len);
}
void Cpg::globalConfigChange(
@@ -77,23 +81,35 @@ void Cpg::globalConfigChange(
struct cpg_address *joined, int nJoined
)
{
- handles.get(handle)->configChange(handle, group, members, nMembers, left, nLeft, joined, nJoined);
+ Cpg::Handler* handler=handles.get(handle);
+ if (handler)
+ handler->configChange(handle, group, members, nMembers, left, nLeft, joined, nJoined);
}
-Cpg::Cpg(DeliverFn d, ConfigChangeFn c) : deliver(d), configChange(c)
-{
+Cpg::Cpg(Handler& h) : handler(h) {
cpg_callbacks_t callbacks = { &globalDeliver, &globalConfigChange };
check(cpg_initialize(&handle, &callbacks), "Cannot initialize CPG");
- handles.put(handle, this);
+ handles.put(handle, &handler);
}
Cpg::~Cpg() {
try {
- check(cpg_finalize(handle), "Error in shutdown of CPG");
+ shutdown();
+ } catch (const std::exception& e) {
+ QPID_LOG(error, string("Exception in Cpg destructor: ")+e.what());
}
- catch (...) {
- handles.put(handle, 0);
- throw;
+}
+
+struct Cpg::ClearHandleOnExit {
+ ClearHandleOnExit(cpg_handle_t h) : handle(h) {}
+ ~ClearHandleOnExit() { Cpg::handles.put(handle, 0); }
+ cpg_handle_t handle;
+};
+
+void Cpg::shutdown() {
+ if (handles.get(handle)) {
+ ClearHandleOnExit guard(handle); // Exception safe
+ check(cpg_finalize(handle), "Error in shutdown of CPG");
}
}
@@ -102,11 +118,11 @@ string Cpg::errorStr(cpg_error_t err, const std::string& msg) {
case CPG_OK: return msg+": ok";
case CPG_ERR_LIBRARY: return msg+": library";
case CPG_ERR_TIMEOUT: return msg+": timeout";
- case CPG_ERR_TRY_AGAIN: return msg+": try again";
+ case CPG_ERR_TRY_AGAIN: return msg+": timeout. The aisexec daemon may not be running";
case CPG_ERR_INVALID_PARAM: return msg+": invalid param";
case CPG_ERR_NO_MEMORY: return msg+": no memory";
case CPG_ERR_BAD_HANDLE: return msg+": bad handle";
- case CPG_ERR_ACCESS: return msg+": access";
+ case CPG_ERR_ACCESS: return msg+": access denied. You may need to set your group ID to 'ais'";
case CPG_ERR_NOT_EXIST: return msg+": not exist";
case CPG_ERR_EXIST: return msg+": exist";
case CPG_ERR_NOT_SUPPORTED: return msg+": not supported";