summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/JoiningHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/JoiningHandler.cpp')
-rw-r--r--cpp/src/qpid/cluster/JoiningHandler.cpp10
1 files changed, 9 insertions, 1 deletions
diff --git a/cpp/src/qpid/cluster/JoiningHandler.cpp b/cpp/src/qpid/cluster/JoiningHandler.cpp
index 75f6651b0a..664a8b38cd 100644
--- a/cpp/src/qpid/cluster/JoiningHandler.cpp
+++ b/cpp/src/qpid/cluster/JoiningHandler.cpp
@@ -37,6 +37,7 @@ void JoiningHandler::configChange(
cpg_address */*left*/, int nLeft,
cpg_address */*joined*/, int /*nJoined*/)
{
+ // FIXME aconway 2008-09-24: Called with lock held - volatile
if (nLeft == 0 && nCurrent == 1 && *current == cluster.self) { // First in cluster.
QPID_LOG(notice, cluster.self << " first in cluster.");
cluster.map.ready(cluster.self, cluster.url);
@@ -53,9 +54,11 @@ void JoiningHandler::deliver(Event& e) {
}
void JoiningHandler::update(const MemberId&, const framing::FieldTable& members, uint64_t dumper) {
+ Mutex::ScopedLock l(cluster.lock);
cluster.map.update(members, dumper);
QPID_LOG(debug, "Cluster update: " << cluster.map);
checkDumpRequest();
+ cluster.updateMemberStats();
}
void JoiningHandler::checkDumpRequest() {
@@ -67,6 +70,7 @@ void JoiningHandler::checkDumpRequest() {
}
void JoiningHandler::dumpRequest(const MemberId& dumpee, const std::string& ) {
+ Mutex::ScopedLock l(cluster.lock);
if (cluster.map.dumper) { // Already a dump in progress.
if (dumpee == cluster.self && state == DUMP_REQUESTED)
state = START; // Need to make another request.
@@ -96,11 +100,13 @@ void JoiningHandler::dumpRequest(const MemberId& dumpee, const std::string& ) {
}
void JoiningHandler::ready(const MemberId& id, const std::string& url) {
+ Mutex::ScopedLock l(cluster.lock);
cluster.map.ready(id, Url(url));
checkDumpRequest();
}
void JoiningHandler::insert(const boost::intrusive_ptr<Connection>& c) {
+ Mutex::ScopedLock l(cluster.lock);
if (c->isCatchUp()) {
++catchUpConnections;
QPID_LOG(debug, "Catch-up connection " << *c << " started, total " << catchUpConnections);
@@ -109,6 +115,7 @@ void JoiningHandler::insert(const boost::intrusive_ptr<Connection>& c) {
}
void JoiningHandler::catchUpClosed(const boost::intrusive_ptr<Connection>& c) {
+ Mutex::ScopedLock l(cluster.lock);
QPID_LOG(debug, "Catch-up complete for " << *c << ", remaining catch-ups: " << catchUpConnections-1);
if (c->isShadow())
cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c));
@@ -118,7 +125,7 @@ void JoiningHandler::catchUpClosed(const boost::intrusive_ptr<Connection>& c) {
void JoiningHandler::dumpComplete() {
// FIXME aconway 2008-09-18: need to detect incomplete dump.
- //
+ // Called with lock - volatile?
if (state == STALLED) {
QPID_LOG(debug, cluster.self << " received dump and stalled at start point, unstalling.");
cluster.ready();
@@ -130,4 +137,5 @@ void JoiningHandler::dumpComplete() {
}
}
+
}} // namespace qpid::cluster