diff options
| author | Alan Conway <aconway@apache.org> | 2007-06-07 14:29:24 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-06-07 14:29:24 +0000 |
| commit | 50cd0cd44deb7b5c90e71eed1c9f3cf1666a70fe (patch) | |
| tree | 640da8126854a4e07c11ca73216b932c84e9d445 /cpp/src/qpid | |
| parent | fca1397c9ee37e92b57fd419186182cbec567435 (diff) | |
| download | qpid-python-50cd0cd44deb7b5c90e71eed1c9f3cf1666a70fe.tar.gz | |
Build support for clustering, initial CPG wrapper & tests.
- src/qpid/cluster/Cpg.cpp: C++ wrapper for the openais CPG library.
- src/tests/unit/Cpg.cpp: verify CPG functions in make check.
- makefiles etc.: build cluster stuff only if openais is installed.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@545190 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/cluster/Cpg.cpp | 62 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cpg.h | 122 |
2 files changed, 184 insertions, 0 deletions
diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp new file mode 100644 index 0000000000..858d25f37c --- /dev/null +++ b/cpp/src/qpid/cluster/Cpg.cpp @@ -0,0 +1,62 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "Cpg.h" + +namespace qpid { +namespace cluster { + +using namespace std; + +string Cpg::errorStr(cpg_error_t err, const std::string& msg) { + switch (err) { + case CPG_OK: return msg+": ok"; + case CPG_ERR_LIBRARY: return msg+": library"; + case CPG_ERR_TIMEOUT: return msg+": timeout"; + case CPG_ERR_TRY_AGAIN: return msg+": try again"; + case CPG_ERR_INVALID_PARAM: return msg+": invalid param"; + case CPG_ERR_NO_MEMORY: return msg+": no memory"; + case CPG_ERR_BAD_HANDLE: return msg+": bad handle"; + case CPG_ERR_ACCESS: return msg+": access"; + case CPG_ERR_NOT_EXIST: return msg+": not exist"; + case CPG_ERR_EXIST: return msg+": exist"; + case CPG_ERR_NOT_SUPPORTED: return msg+": not supported"; + case CPG_ERR_SECURITY: return msg+": security"; + case CPG_ERR_TOO_MANY_GROUPS: return msg+": too many groups"; + default: + assert(0); + return ": unknown"; + }; +} + +std::string Cpg::cantJoinMsg(const Name& group) { + return "Cannot join CPG group "+group.str(); +} + +std::string Cpg::cantLeaveMsg(const Name& group) { + return "Cannot leave CPG group "+group.str(); +} + +std::string Cpg::cantMcastMsg(const Name& group) { + return "Cannot mcast to CPG group "+group.str(); +} + +}} // namespace qpid::cpg + + + diff --git a/cpp/src/qpid/cluster/Cpg.h b/cpp/src/qpid/cluster/Cpg.h new file mode 100644 index 0000000000..6e61fa8a6e --- /dev/null +++ b/cpp/src/qpid/cluster/Cpg.h @@ -0,0 +1,122 @@ +#ifndef CPG_H +#define CPG_H + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <stdexcept> +#include <cassert> +#ifdef CLUSTER +extern "C" { +#include <openais/cpg.h> +} +#endif +namespace qpid { +namespace cluster { + +/** + * Lightweight C++ interface to cpg.h operations. + * Manages a single CPG handle, initialized in ctor, finialzed in destructor. + * On error all functions throw Cpg::Exception + */ +class Cpg { + public: + // FIXME aconway 2007-06-01: qpid::Exception + struct Exception : public std::runtime_error { + Exception(const std::string& msg) : runtime_error(msg) {} + }; + + struct Name : public cpg_name { + Name(const char* s) { copy(s, strlen(s)); } + Name(const char* s, size_t n) { copy(s,n); } + Name(const std::string& s) { copy(s.data(), s.size()); } + void copy(const char* s, size_t n) { + assert(n < CPG_MAX_NAME_LENGTH); + memcpy(value, s, n); + length=n; + } + + std::string str() const { return std::string(value, length); } + }; + + static inline std::string str(const cpg_name& n) { + return std::string(n.value, n.length); + } + + // TODO aconway 2007-06-01: when cpg handle supports a context pointer + // use callback objects (boost::function) instead of free functions. + // + /** Open a CPG handle. + *@param deliver - free function called when a message is delivered. + *@param reconfig - free function called when CPG configuration changes. + */ + Cpg(cpg_deliver_fn_t deliver, cpg_confchg_fn_t reconfig) { + cpg_callbacks_t callbacks = { deliver, reconfig }; + check(cpg_initialize(&handle, &callbacks), "Cannot initialize CPG"); + } + + /** Disconnect */ + ~Cpg() { + check(cpg_finalize(handle), "Cannot finalize CPG"); + } + + /** Dispatch CPG events. + *@param type one of + * - CPG_DISPATCH_ONE - dispatch exactly one event. + * - CPG_DISPATCH_ALL - dispatch all available events, don't wait. + * - CPG_DISPATCH_BLOCKING - blocking dispatch loop. + */ + void dispatch(cpg_dispatch_t type) { + check(cpg_dispatch(handle,type), "Error in CPG dispatch"); + } + + void join(const Name& group) { + check(cpg_join(handle, const_cast<Name*>(&group)),cantJoinMsg(group)); + }; + + void leave(const Name& group) { + check(cpg_leave(handle,const_cast<Name*>(&group)),cantLeaveMsg(group)); + } + + void mcast(const Name& group, const iovec* iov, int iovLen) { + check(cpg_mcast_joined( + handle, CPG_TYPE_AGREED, const_cast<iovec*>(iov), iovLen), + cantMcastMsg(group)); + } + + private: + static std::string errorStr(cpg_error_t err, const std::string& msg); + static std::string cantJoinMsg(const Name&); + static std::string cantLeaveMsg(const Name&); + static std::string cantMcastMsg(const Name&); + + static void check(cpg_error_t result, const std::string& msg) { + // TODO aconway 2007-06-01: Logging and exceptions. + if (result != CPG_OK) + throw Exception(errorStr(result, msg)); + } + cpg_handle_t handle; +}; + + + +}} // namespace qpid::cluster + + + +#endif /*!CPG_H*/ |
