diff options
Diffstat (limited to 'cpp/src/qpid/cluster/JoiningHandler.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/JoiningHandler.cpp | 124 |
1 files changed, 0 insertions, 124 deletions
diff --git a/cpp/src/qpid/cluster/JoiningHandler.cpp b/cpp/src/qpid/cluster/JoiningHandler.cpp deleted file mode 100644 index dbee0ece61..0000000000 --- a/cpp/src/qpid/cluster/JoiningHandler.cpp +++ /dev/null @@ -1,124 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "JoiningHandler.h" -#include "Cluster.h" -#include "qpid/framing/ClusterDumpRequestBody.h" -#include "qpid/framing/ClusterReadyBody.h" -#include "qpid/log/Statement.h" - -namespace qpid { -namespace cluster { - -using namespace sys; -using namespace framing; - -JoiningHandler::JoiningHandler(Cluster& c) : ClusterHandler(c), state(START), catchUpConnections(0) {} - -void JoiningHandler::configChange( - cpg_address *current, int nCurrent, - cpg_address */*left*/, int nLeft, - cpg_address */*joined*/, int /*nJoined*/) -{ - // FIXME aconway 2008-09-24: Called with lock held - volatile - if (nLeft == 0 && nCurrent == 1 && *current == cluster.self) { // First in cluster. - QPID_LOG(notice, cluster.self << " first in cluster."); - cluster.map.ready(cluster.self, cluster.url); - cluster.updateMemberStats(); - cluster.unstall(); - } -} - -void JoiningHandler::deliver(Event& e) { - Mutex::ScopedLock l(cluster.lock); - // Discard connection events unless we are stalled to receive a dump. - if (state == STALLED) - cluster.connectionEventQueue.push(e); - else - QPID_LOG(trace, "Discarded pre-join event " << e); -} - -void JoiningHandler::update(const MemberId&, const framing::FieldTable& members, uint64_t dumper) { - Mutex::ScopedLock l(cluster.lock); - if (cluster.map.update(members, dumper)) cluster.updateMemberStats(); - checkDumpRequest(); -} - -void JoiningHandler::checkDumpRequest() { // Call with lock held - if (state == START && !cluster.map.dumper) { - cluster.broker.getPort(); // ensure the broker is listening. - state = DUMP_REQUESTED; - cluster.mcastControl(ClusterDumpRequestBody(framing::ProtocolVersion(), cluster.url.str()), 0); - } -} - -void JoiningHandler::dumpRequest(const MemberId& dumpee, const std::string& ) { - Mutex::ScopedLock l(cluster.lock); - if (cluster.map.dumper) { // Already a dump in progress. - if (dumpee == cluster.self && state == DUMP_REQUESTED) - state = START; // Need to make another request. - } - else { // Start a new dump - cluster.map.dumper = cluster.map.first(); - QPID_LOG(debug, "Starting dump, dumper=" << cluster.map.dumper << " dumpee=" << dumpee); - if (dumpee == cluster.self) { // My turn - switch (state) { - case START: - case STALLED: - assert(0); break; - - case DUMP_REQUESTED: - QPID_LOG(debug, cluster.self << " stalling for dump from " << cluster.map.dumper); - state = STALLED; - cluster.stall(); - break; - - case DUMP_COMPLETE: - QPID_LOG(debug, cluster.self << " at start point and dump complete, ready."); - cluster.ready(); - break; - } - } - } -} - -void JoiningHandler::ready(const MemberId& id, const std::string& urlStr) { - Mutex::ScopedLock l(cluster.lock); - if (cluster.map.ready(id, Url(urlStr))) - cluster.updateMemberStats(); - checkDumpRequest(); -} - -void JoiningHandler::dumpComplete() { - Mutex::ScopedLock l(cluster.lock); - if (state == STALLED) { - QPID_LOG(debug, cluster.self << " received dump and stalled at start point, unstalling."); - cluster.ready(); - } - else { - QPID_LOG(debug, cluster.self << " received dump, waiting for start point."); - assert(state == DUMP_REQUESTED); - state = DUMP_COMPLETE; - } - // FIXME aconway 2008-09-18: need to detect incomplete dump. -} - - -}} // namespace qpid::cluster |
