From b210ae04b88afa38238ca56e474ca60533f2768b Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Thu, 18 Sep 2008 20:18:29 +0000 Subject: Dump shared state to new cluster members. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@696788 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/cluster/JoiningHandler.cpp | 37 +++++++++++++++++++++++++++------ 1 file changed, 31 insertions(+), 6 deletions(-) (limited to 'cpp/src/qpid/cluster/JoiningHandler.cpp') diff --git a/cpp/src/qpid/cluster/JoiningHandler.cpp b/cpp/src/qpid/cluster/JoiningHandler.cpp index 3358e3404b..c188fe438e 100644 --- a/cpp/src/qpid/cluster/JoiningHandler.cpp +++ b/cpp/src/qpid/cluster/JoiningHandler.cpp @@ -30,7 +30,7 @@ namespace cluster { using namespace sys; using namespace framing; -JoiningHandler::JoiningHandler(Cluster& c) : ClusterHandler(c), state(START) {} +JoiningHandler::JoiningHandler(Cluster& c) : ClusterHandler(c), state(START), catchUpConnections(0) {} void JoiningHandler::configChange( cpg_address *current, int nCurrent, @@ -74,21 +74,17 @@ void JoiningHandler::dumpRequest(const MemberId& dumpee, const std::string& ) { else { // Start a new dump cluster.map.dumper = cluster.map.first(); if (dumpee == cluster.self) { // My turn - - state = DUMP_COMPLETE; // FIXME aconway 2008-09-18: bypass dump - - QPID_LOG(info, cluster.self << " receiving state dump from " << cluster.map.dumper); switch (state) { case START: case STALLED: assert(0); break; case DUMP_REQUESTED: + QPID_LOG(info, cluster.self << " stalling for dump from " << cluster.map.dumper); state = STALLED; cluster.stall(); break; - // FIXME aconway 2008-09-17: no transition to DUMP_COMPLETE state. case DUMP_COMPLETE: cluster.ready(); break; @@ -102,5 +98,34 @@ void JoiningHandler::ready(const MemberId& id, const std::string& url) { checkDumpRequest(); } +void JoiningHandler::insert(const boost::intrusive_ptr& c) { + if (c->isCatchUp()) { + ++catchUpConnections; + QPID_LOG(debug, "Received " << catchUpConnections << " catch-up connections."); + } + else if (c->isExCatchUp()) { + if (c->getId().getConnectionPtr() != c.get()) // become shadow connection + cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c)); + QPID_LOG(debug, "Catch-up connection terminated " << catchUpConnections-1 << " remaining"); + if (--catchUpConnections == 0) + dumpComplete(); + } + else // Local connection, will be stalled till dump complete. + cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c)); +} + +void JoiningHandler::dumpComplete() { + // FIXME aconway 2008-09-18: need to detect incomplete dump. + // + if (state == STALLED) { + QPID_LOG(debug, "Dump complete, unstalling."); + cluster.ready(); + } + else { + QPID_LOG(debug, "Dump complete, waiting for stall point."); + assert(state == DUMP_REQUESTED); + state = DUMP_COMPLETE; + } +} }} // namespace qpid::cluster -- cgit v1.2.1