summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-07-23 13:08:16 +0000
committerAlan Conway <aconway@apache.org>2007-07-23 13:08:16 +0000
commit1a469b992ef2f28d98f43e63cf4d520c1bf830a4 (patch)
treec743052e2d2ab10a28960234e3efd3534cdb14c1 /cpp/src/qpid/cluster
parent4ab144d3d0a48a4abc1814e3244ef830344f19b2 (diff)
downloadqpid-python-1a469b992ef2f28d98f43e63cf4d520c1bf830a4.tar.gz
* src/tests/cluster.mk: Enable cluster test.
* src/tests/Cluster.h (class TestHandler): Fixed race in TestHandler::waitFor * src/tests/Cluster.cpp - Allow separate start of parent and child processes. * src/qpid/Options.cpp (parse): Skip argv parsing if argc=0. * src/qpid/cluster/Cluster.cpp (configChange): assert group name. * src/qpid/cluster/Cpg.cpp, .h: Additional logging * src/qpid/framing/AMQFrame.cpp: Initialize all fields in ctor, avoid valgrind warning. * src/qpid/log/Logger.cpp: Initialize singleton automatically from environment so logging can be used on tests. * src/qpid/sys/Time.h: Avoid overflow in AbsTime(t, TIME_INFINITE) git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@558710 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp34
-rw-r--r--cpp/src/qpid/cluster/Cpg.cpp18
-rw-r--r--cpp/src/qpid/cluster/Cpg.h5
3 files changed, 32 insertions, 25 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 256378ccd5..b59bfe878d 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -115,13 +115,14 @@ Cluster::MemberList Cluster::getMembers() const {
}
void Cluster::deliver(
- cpg_handle_t /*handle*/,
- struct cpg_name* /* group */,
- uint32_t nodeid,
- uint32_t pid,
- void* msg,
- int msg_len)
+ cpg_handle_t /*handle*/,
+ cpg_name* group,
+ uint32_t nodeid,
+ uint32_t pid,
+ void* msg,
+ int msg_len)
{
+ assert(name == *group);
Id from(nodeid, pid);
Buffer buf(static_cast<char*>(msg), msg_len);
SessionFrame frame;
@@ -149,26 +150,27 @@ void Cluster::handleClusterFrame(Id from, AMQFrame& frame) {
ClusterNotifyBody* notifyIn=
dynamic_cast<ClusterNotifyBody*>(frame.getBody().get());
assert(notifyIn);
- MemberList list;
- {
- Mutex::ScopedLock l(lock);
+ MemberList list;
+ {
+ Mutex::ScopedLock l(lock);
shared_ptr<Member>& member=members[from];
if (!member)
member.reset(new Member(notifyIn->getUrl()));
- else
+ else
member->url = notifyIn->getUrl();
- lock.notifyAll();
+ lock.notifyAll();
QPID_LOG(trace, *this << ": members joined: " << members);
- }
+ }
}
void Cluster::configChange(
cpg_handle_t /*handle*/,
- struct cpg_name */*group*/,
- struct cpg_address */*current*/, int /*nCurrent*/,
- struct cpg_address *left, int nLeft,
- struct cpg_address *joined, int nJoined)
+ cpg_name *group,
+ cpg_address */*current*/, int /*nCurrent*/,
+ cpg_address *left, int nLeft,
+ cpg_address *joined, int nJoined)
{
+ assert(name == *group);
bool newMembers=false;
MemberList updated;
{
diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp
index 3148e52789..87e483141e 100644
--- a/cpp/src/qpid/cluster/Cpg.cpp
+++ b/cpp/src/qpid/cluster/Cpg.cpp
@@ -90,25 +90,22 @@ Cpg::Cpg(Handler& h) : handler(h) {
cpg_callbacks_t callbacks = { &globalDeliver, &globalConfigChange };
check(cpg_initialize(&handle, &callbacks), "Cannot initialize CPG");
handles.put(handle, &handler);
+ QPID_LOG(debug, "Initialize CPG handle " << handle);
}
Cpg::~Cpg() {
try {
shutdown();
} catch (const std::exception& e) {
- QPID_LOG(error, string("Exception in Cpg destructor: ")+e.what());
+ QPID_LOG(error, "Exception in Cpg destructor: " << e.what());
}
}
-struct Cpg::ClearHandleOnExit {
- ClearHandleOnExit(cpg_handle_t h) : handle(h) {}
- ~ClearHandleOnExit() { Cpg::handles.put(handle, 0); }
- cpg_handle_t handle;
-};
-
void Cpg::shutdown() {
+ QPID_LOG(debug, "Shutdown CPG handle " << handle);
if (handles.get(handle)) {
- ClearHandleOnExit guard(handle); // Exception safe
+ QPID_LOG(debug, "Finalize CPG handle " << handle);
+ handles.put(handle, 0);
check(cpg_finalize(handle), "Error in shutdown of CPG");
}
}
@@ -173,8 +170,11 @@ ostream& operator <<(ostream& out, const Cpg::Id& id) {
return out << ":" << id.pid();
}
+ostream& operator <<(ostream& out, const cpg_name& name) {
+ return out << string(name.value, name.length);
+}
-}} // namespace qpid::cpg
+}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Cpg.h b/cpp/src/qpid/cluster/Cpg.h
index d616be74e2..351b65d56a 100644
--- a/cpp/src/qpid/cluster/Cpg.h
+++ b/cpp/src/qpid/cluster/Cpg.h
@@ -170,9 +170,14 @@ class Cpg : public Dispatchable {
Handler& handler;
};
+std::ostream& operator <<(std::ostream& out, const cpg_name& name);
std::ostream& operator <<(std::ostream& out, const Cpg::Id& id);
std::ostream& operator <<(std::ostream& out, const std::pair<cpg_address*,int> addresses);
+inline bool operator==(const cpg_name& a, const cpg_name& b) {
+ return a.length==b.length && strncmp(a.value, b.value, a.length) == 0;
+}
+inline bool operator!=(const cpg_name& a, const cpg_name& b) { return !(a == b); }
}} // namespace qpid::cluster