summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/JoiningHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/JoiningHandler.cpp')
-rw-r--r--cpp/src/qpid/cluster/JoiningHandler.cpp124
1 files changed, 0 insertions, 124 deletions
diff --git a/cpp/src/qpid/cluster/JoiningHandler.cpp b/cpp/src/qpid/cluster/JoiningHandler.cpp
deleted file mode 100644
index dbee0ece61..0000000000
--- a/cpp/src/qpid/cluster/JoiningHandler.cpp
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "JoiningHandler.h"
-#include "Cluster.h"
-#include "qpid/framing/ClusterDumpRequestBody.h"
-#include "qpid/framing/ClusterReadyBody.h"
-#include "qpid/log/Statement.h"
-
-namespace qpid {
-namespace cluster {
-
-using namespace sys;
-using namespace framing;
-
-JoiningHandler::JoiningHandler(Cluster& c) : ClusterHandler(c), state(START), catchUpConnections(0) {}
-
-void JoiningHandler::configChange(
- cpg_address *current, int nCurrent,
- cpg_address */*left*/, int nLeft,
- cpg_address */*joined*/, int /*nJoined*/)
-{
- // FIXME aconway 2008-09-24: Called with lock held - volatile
- if (nLeft == 0 && nCurrent == 1 && *current == cluster.self) { // First in cluster.
- QPID_LOG(notice, cluster.self << " first in cluster.");
- cluster.map.ready(cluster.self, cluster.url);
- cluster.updateMemberStats();
- cluster.unstall();
- }
-}
-
-void JoiningHandler::deliver(Event& e) {
- Mutex::ScopedLock l(cluster.lock);
- // Discard connection events unless we are stalled to receive a dump.
- if (state == STALLED)
- cluster.connectionEventQueue.push(e);
- else
- QPID_LOG(trace, "Discarded pre-join event " << e);
-}
-
-void JoiningHandler::update(const MemberId&, const framing::FieldTable& members, uint64_t dumper) {
- Mutex::ScopedLock l(cluster.lock);
- if (cluster.map.update(members, dumper)) cluster.updateMemberStats();
- checkDumpRequest();
-}
-
-void JoiningHandler::checkDumpRequest() { // Call with lock held
- if (state == START && !cluster.map.dumper) {
- cluster.broker.getPort(); // ensure the broker is listening.
- state = DUMP_REQUESTED;
- cluster.mcastControl(ClusterDumpRequestBody(framing::ProtocolVersion(), cluster.url.str()), 0);
- }
-}
-
-void JoiningHandler::dumpRequest(const MemberId& dumpee, const std::string& ) {
- Mutex::ScopedLock l(cluster.lock);
- if (cluster.map.dumper) { // Already a dump in progress.
- if (dumpee == cluster.self && state == DUMP_REQUESTED)
- state = START; // Need to make another request.
- }
- else { // Start a new dump
- cluster.map.dumper = cluster.map.first();
- QPID_LOG(debug, "Starting dump, dumper=" << cluster.map.dumper << " dumpee=" << dumpee);
- if (dumpee == cluster.self) { // My turn
- switch (state) {
- case START:
- case STALLED:
- assert(0); break;
-
- case DUMP_REQUESTED:
- QPID_LOG(debug, cluster.self << " stalling for dump from " << cluster.map.dumper);
- state = STALLED;
- cluster.stall();
- break;
-
- case DUMP_COMPLETE:
- QPID_LOG(debug, cluster.self << " at start point and dump complete, ready.");
- cluster.ready();
- break;
- }
- }
- }
-}
-
-void JoiningHandler::ready(const MemberId& id, const std::string& urlStr) {
- Mutex::ScopedLock l(cluster.lock);
- if (cluster.map.ready(id, Url(urlStr)))
- cluster.updateMemberStats();
- checkDumpRequest();
-}
-
-void JoiningHandler::dumpComplete() {
- Mutex::ScopedLock l(cluster.lock);
- if (state == STALLED) {
- QPID_LOG(debug, cluster.self << " received dump and stalled at start point, unstalling.");
- cluster.ready();
- }
- else {
- QPID_LOG(debug, cluster.self << " received dump, waiting for start point.");
- assert(state == DUMP_REQUESTED);
- state = DUMP_COMPLETE;
- }
- // FIXME aconway 2008-09-18: need to detect incomplete dump.
-}
-
-
-}} // namespace qpid::cluster