diff options
Diffstat (limited to 'cpp/src/qpid/cluster/JoiningHandler.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/JoiningHandler.cpp | 10 |
1 files changed, 9 insertions, 1 deletions
diff --git a/cpp/src/qpid/cluster/JoiningHandler.cpp b/cpp/src/qpid/cluster/JoiningHandler.cpp index 75f6651b0a..664a8b38cd 100644 --- a/cpp/src/qpid/cluster/JoiningHandler.cpp +++ b/cpp/src/qpid/cluster/JoiningHandler.cpp @@ -37,6 +37,7 @@ void JoiningHandler::configChange( cpg_address */*left*/, int nLeft, cpg_address */*joined*/, int /*nJoined*/) { + // FIXME aconway 2008-09-24: Called with lock held - volatile if (nLeft == 0 && nCurrent == 1 && *current == cluster.self) { // First in cluster. QPID_LOG(notice, cluster.self << " first in cluster."); cluster.map.ready(cluster.self, cluster.url); @@ -53,9 +54,11 @@ void JoiningHandler::deliver(Event& e) { } void JoiningHandler::update(const MemberId&, const framing::FieldTable& members, uint64_t dumper) { + Mutex::ScopedLock l(cluster.lock); cluster.map.update(members, dumper); QPID_LOG(debug, "Cluster update: " << cluster.map); checkDumpRequest(); + cluster.updateMemberStats(); } void JoiningHandler::checkDumpRequest() { @@ -67,6 +70,7 @@ void JoiningHandler::checkDumpRequest() { } void JoiningHandler::dumpRequest(const MemberId& dumpee, const std::string& ) { + Mutex::ScopedLock l(cluster.lock); if (cluster.map.dumper) { // Already a dump in progress. if (dumpee == cluster.self && state == DUMP_REQUESTED) state = START; // Need to make another request. @@ -96,11 +100,13 @@ void JoiningHandler::dumpRequest(const MemberId& dumpee, const std::string& ) { } void JoiningHandler::ready(const MemberId& id, const std::string& url) { + Mutex::ScopedLock l(cluster.lock); cluster.map.ready(id, Url(url)); checkDumpRequest(); } void JoiningHandler::insert(const boost::intrusive_ptr<Connection>& c) { + Mutex::ScopedLock l(cluster.lock); if (c->isCatchUp()) { ++catchUpConnections; QPID_LOG(debug, "Catch-up connection " << *c << " started, total " << catchUpConnections); @@ -109,6 +115,7 @@ void JoiningHandler::insert(const boost::intrusive_ptr<Connection>& c) { } void JoiningHandler::catchUpClosed(const boost::intrusive_ptr<Connection>& c) { + Mutex::ScopedLock l(cluster.lock); QPID_LOG(debug, "Catch-up complete for " << *c << ", remaining catch-ups: " << catchUpConnections-1); if (c->isShadow()) cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c)); @@ -118,7 +125,7 @@ void JoiningHandler::catchUpClosed(const boost::intrusive_ptr<Connection>& c) { void JoiningHandler::dumpComplete() { // FIXME aconway 2008-09-18: need to detect incomplete dump. - // + // Called with lock - volatile? if (state == STALLED) { QPID_LOG(debug, cluster.self << " received dump and stalled at start point, unstalling."); cluster.ready(); @@ -130,4 +137,5 @@ void JoiningHandler::dumpComplete() { } } + }} // namespace qpid::cluster |
