diff options
author | Samuel Just <sam.just@inktank.com> | 2013-03-21 18:46:50 -0700 |
---|---|---|
committer | Samuel Just <sam.just@inktank.com> | 2013-03-21 18:46:50 -0700 |
commit | f00f3bc4e5db04be036ec737e4ed9d9281f64eb3 (patch) | |
tree | cb8a6e7d0af49fe06a4ba6243d0ce0088f835ae5 | |
parent | 6740d512ac12263f7bee370bc14b1179f83af5be (diff) | |
parent | fab0be1ffe308c46957d1b3ae3461b33ea7048e3 (diff) | |
download | ceph-f00f3bc4e5db04be036ec737e4ed9d9281f64eb3.tar.gz |
Merge remote-tracking branch 'upstream/wip_osd_shutdown_notification'
Fixes: #1857
Fixes: #4267
Reviewed-by: Sage Weil <sage@inktank.com>
-rw-r--r-- | src/Makefile.am | 1 | ||||
-rw-r--r-- | src/ceph_osd.cc | 11 | ||||
-rw-r--r-- | src/common/config_opts.h | 3 | ||||
-rw-r--r-- | src/common/shared_cache.hpp | 6 | ||||
-rw-r--r-- | src/messages/MOSDMarkMeDown.h | 69 | ||||
-rw-r--r-- | src/mon/Monitor.cc | 1 | ||||
-rw-r--r-- | src/mon/OSDMonitor.cc | 105 | ||||
-rw-r--r-- | src/mon/OSDMonitor.h | 6 | ||||
-rw-r--r-- | src/msg/Message.cc | 4 | ||||
-rw-r--r-- | src/msg/Message.h | 1 | ||||
-rw-r--r-- | src/os/FileStore.cc | 9 | ||||
-rw-r--r-- | src/osd/OSD.cc | 311 | ||||
-rw-r--r-- | src/osd/OSD.h | 29 | ||||
-rw-r--r-- | src/osd/OpRequest.cc | 19 | ||||
-rw-r--r-- | src/osd/OpRequest.h | 23 | ||||
-rw-r--r-- | src/osd/PG.h | 2 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.cc | 29 |
17 files changed, 463 insertions, 166 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index bcf899b432f..72037726c3d 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1815,6 +1815,7 @@ noinst_HEADERS = \ messages/MOSDAlive.h\ messages/MOSDBoot.h\ messages/MOSDFailure.h\ + messages/MOSDMarkMeDown.h\ messages/MOSDMap.h\ messages/MOSDOp.h\ messages/MOSDOpReply.h\ diff --git a/src/ceph_osd.cc b/src/ceph_osd.cc index 2965221d2b9..5a90abd6125 100644 --- a/src/ceph_osd.cc +++ b/src/ceph_osd.cc @@ -16,6 +16,7 @@ #include <sys/stat.h> #include <fcntl.h> #include <uuid/uuid.h> +#include <boost/scoped_ptr.hpp> #include <iostream> #include <string> @@ -337,8 +338,9 @@ int main(int argc, const char **argv) "(no journal)" : g_conf->osd_journal) << std::endl; - Throttle client_throttler(g_ceph_context, "osd_client_bytes", - g_conf->osd_client_message_size_cap); + boost::scoped_ptr<Throttle> client_throttler( + new Throttle(g_ceph_context, "osd_client_bytes", + g_conf->osd_client_message_size_cap)); uint64_t supported = CEPH_FEATURE_UID | @@ -347,7 +349,9 @@ int main(int argc, const char **argv) CEPH_FEATURE_MSG_AUTH; client_messenger->set_default_policy(Messenger::Policy::stateless_server(supported, 0)); - client_messenger->set_policy_throttler(entity_name_t::TYPE_CLIENT, &client_throttler); // default, actually + client_messenger->set_policy_throttler( + entity_name_t::TYPE_CLIENT, + client_throttler.get()); // default, actually client_messenger->set_policy(entity_name_t::TYPE_MON, Messenger::Policy::lossy_client(supported, CEPH_FEATURE_UID | @@ -458,6 +462,7 @@ int main(int argc, const char **argv) delete messenger_hbclient; delete messenger_hbserver; delete cluster_messenger; + client_throttler.reset(); g_ceph_context->put(); // cd on exit, so that gmon.out (if any) goes into a separate directory for each node. diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 52880f59feb..f012091f870 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -436,6 +436,9 @@ OPTION(osd_failsafe_nearfull_ratio, OPT_FLOAT, .90) // what % full makes an OSD OPTION(osd_client_op_priority, OPT_INT, 63) OPTION(osd_recovery_op_priority, OPT_INT, 10) +// Max time to wait between notifying mon of shutdown and shutting down +OPTION(osd_mon_shutdown_timeout, OPT_DOUBLE, 5) + OPTION(filestore, OPT_BOOL, false) // Tests index failure paths diff --git a/src/common/shared_cache.hpp b/src/common/shared_cache.hpp index 1bbfa0e354f..69a4c06dfbf 100644 --- a/src/common/shared_cache.hpp +++ b/src/common/shared_cache.hpp @@ -78,6 +78,12 @@ class SharedLRU { public: SharedLRU(size_t max_size = 20) : lock("SharedLRU::lock"), max_size(max_size) {} + + ~SharedLRU() { + contents.clear(); + lru.clear(); + assert(weak_refs.empty()); + } void set_size(size_t new_size) { list<VPtr> to_release; diff --git a/src/messages/MOSDMarkMeDown.h b/src/messages/MOSDMarkMeDown.h new file mode 100644 index 00000000000..e99c83d18dd --- /dev/null +++ b/src/messages/MOSDMarkMeDown.h @@ -0,0 +1,69 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2013 Inktank Storage, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_MOSDMARKMEDOWN_H +#define CEPH_MOSDMARKMEDOWN_H + +#include "messages/PaxosServiceMessage.h" + +class MOSDMarkMeDown : public PaxosServiceMessage { + + static const int HEAD_VERSION = 1; + + public: + uuid_d fsid; + entity_inst_t target_osd; + epoch_t e; + bool ack; + + MOSDMarkMeDown() + : PaxosServiceMessage(MSG_OSD_MARK_ME_DOWN, 0, HEAD_VERSION) { } + MOSDMarkMeDown(const uuid_d &fs, const entity_inst_t& f, + epoch_t e, bool ack) + : PaxosServiceMessage(MSG_OSD_MARK_ME_DOWN, e, HEAD_VERSION), + fsid(fs), target_osd(f), ack(ack) {} + private: + ~MOSDMarkMeDown() {} + +public: + entity_inst_t get_target() { return target_osd; } + epoch_t get_epoch() { return e; } + + void decode_payload() { + bufferlist::iterator p = payload.begin(); + paxos_decode(p); + ::decode(fsid, p); + ::decode(target_osd, p); + ::decode(e, p); + ::decode(ack, p); + } + void encode_payload(uint64_t features) { + paxos_encode(); + ::encode(fsid, payload); + ::encode(target_osd, payload); + ::encode(e, payload); + ::encode(ack, payload); + } + + const char *get_type_name() const { return "osd_mark_me_down"; } + void print(ostream& out) const { + out << "osd_mark_me_down(" + << "ack=" << ack + << ", target_osd=" << target_osd + << ", fsid=" << fsid + << ")"; + } +}; + +#endif diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index bcebe3a1ccb..93f55607137 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -3071,6 +3071,7 @@ bool Monitor::_ms_dispatch(Message *m) break; // OSDs + case MSG_OSD_MARK_ME_DOWN: case MSG_OSD_FAILURE: case MSG_OSD_BOOT: case MSG_OSD_ALIVE: diff --git a/src/mon/OSDMonitor.cc b/src/mon/OSDMonitor.cc index 8c4d1a43c77..d3cefae3803 100644 --- a/src/mon/OSDMonitor.cc +++ b/src/mon/OSDMonitor.cc @@ -25,6 +25,7 @@ #include "crush/CrushTester.h" #include "messages/MOSDFailure.h" +#include "messages/MOSDMarkMeDown.h" #include "messages/MOSDMap.h" #include "messages/MOSDBoot.h" #include "messages/MOSDAlive.h" @@ -540,6 +541,8 @@ bool OSDMonitor::preprocess_query(PaxosServiceMessage *m) return preprocess_command(static_cast<MMonCommand*>(m)); // damp updates + case MSG_OSD_MARK_ME_DOWN: + return preprocess_mark_me_down(static_cast<MOSDMarkMeDown*>(m)); case MSG_OSD_FAILURE: return preprocess_failure(static_cast<MOSDFailure*>(m)); case MSG_OSD_BOOT: @@ -568,6 +571,8 @@ bool OSDMonitor::prepare_update(PaxosServiceMessage *m) switch (m->get_type()) { // damp updates + case MSG_OSD_MARK_ME_DOWN: + return prepare_mark_me_down(static_cast<MOSDMarkMeDown*>(m)); case MSG_OSD_FAILURE: return prepare_failure(static_cast<MOSDFailure*>(m)); case MSG_OSD_BOOT: @@ -625,25 +630,33 @@ bool OSDMonitor::should_propose(double& delay) // failure -- -bool OSDMonitor::preprocess_failure(MOSDFailure *m) -{ - // who is target_osd - int badboy = m->get_target().name.num(); - +bool OSDMonitor::check_source(PaxosServiceMessage *m, uuid_d fsid) { // check permissions MonSession *session = m->get_session(); if (!session) - goto didit; + return true; if (!session->caps.check_privileges(PAXOS_OSDMAP, MON_CAP_X)) { - dout(0) << "got MOSDFailure from entity with insufficient caps " + dout(0) << "got osdmap change request from entity with insufficient caps " << session->caps << dendl; - goto didit; + return true; + } + if (fsid != mon->monmap->fsid) { + dout(0) << "check_source: on fsid " << fsid + << " != " << mon->monmap->fsid << dendl; + return true; } + return false; +} + - if (m->fsid != mon->monmap->fsid) { - dout(0) << "preprocess_failure on fsid " << m->fsid << " != " << mon->monmap->fsid << dendl; +bool OSDMonitor::preprocess_failure(MOSDFailure *m) +{ + // who is target_osd + int badboy = m->get_target().name.num(); + + // check permissions + if (check_source(m, m->fsid)) goto didit; - } // first, verify the reporting host is valid if (m->get_orig_source().is_osd()) { @@ -656,7 +669,6 @@ bool OSDMonitor::preprocess_failure(MOSDFailure *m) goto didit; } } - // weird? if (!osdmap.have_inst(badboy)) { @@ -694,6 +706,75 @@ bool OSDMonitor::preprocess_failure(MOSDFailure *m) return true; } +class C_AckMarkedDown : public Context { + OSDMonitor *osdmon; + MOSDMarkMeDown *m; +public: + C_AckMarkedDown( + OSDMonitor *osdmon, + MOSDMarkMeDown *m) + : osdmon(osdmon), m(m) {} + + void finish(int) { + osdmon->mon->send_reply( + m, + new MOSDMarkMeDown( + m->fsid, + m->get_target(), + m->get_epoch(), + m->ack)); + } + ~C_AckMarkedDown() { + m->put(); + } +}; + +bool OSDMonitor::preprocess_mark_me_down(MOSDMarkMeDown *m) +{ + int requesting_down = m->get_target().name.num(); + + // check permissions + if (check_source(m, m->fsid)) + goto didit; + + // first, verify the reporting host is valid + if (m->get_orig_source().is_osd()) { + int from = m->get_orig_source().num(); + if (!osdmap.exists(from) || + osdmap.get_addr(from) != m->get_orig_source_inst().addr || + osdmap.is_down(from)) { + dout(5) << "preprocess_mark_me_down from dead osd." + << from << ", ignoring" << dendl; + send_incremental(m, m->get_epoch()+1); + goto didit; + } + } + + // no down might be set + if (!can_mark_down(requesting_down)) + goto didit; + + dout(10) << "MOSDMarkMeDown for: " << m->get_target() << dendl; + return false; + + didit: + Context *c(new C_AckMarkedDown(this, m)); + c->complete(0); + return true; +} + +bool OSDMonitor::prepare_mark_me_down(MOSDMarkMeDown *m) +{ + int target_osd = m->get_target().name.num(); + + assert(osdmap.is_up(target_osd)); + assert(osdmap.get_addr(target_osd) == m->get_target().addr); + + pending_inc.new_state[target_osd] = CEPH_OSD_UP; + wait_for_finished_proposal(new C_AckMarkedDown(this, m)); + return true; +} + bool OSDMonitor::can_mark_down(int i) { if (osdmap.test_flag(CEPH_OSDMAP_NODOWN)) { diff --git a/src/mon/OSDMonitor.h b/src/mon/OSDMonitor.h index 42d241ec2b1..15eb469c129 100644 --- a/src/mon/OSDMonitor.h +++ b/src/mon/OSDMonitor.h @@ -171,9 +171,15 @@ private: void remove_redundant_pg_temp(); void remove_down_pg_temp(); int reweight_by_utilization(int oload, std::string& out_str); + + bool check_source(PaxosServiceMessage *m, uuid_d fsid); + bool preprocess_mark_me_down(class MOSDMarkMeDown *m); + + friend class C_AckMarkedDown; bool preprocess_failure(class MOSDFailure *m); bool prepare_failure(class MOSDFailure *m); + bool prepare_mark_me_down(class MOSDMarkMeDown *m); void process_failures(); void kick_all_failures(); diff --git a/src/msg/Message.cc b/src/msg/Message.cc index 4b3c16b49da..77be03a590b 100644 --- a/src/msg/Message.cc +++ b/src/msg/Message.cc @@ -58,6 +58,7 @@ using namespace std; #include "messages/MOSDAlive.h" #include "messages/MOSDPGTemp.h" #include "messages/MOSDFailure.h" +#include "messages/MOSDMarkMeDown.h" #include "messages/MOSDPing.h" #include "messages/MOSDOp.h" #include "messages/MOSDOpReply.h" @@ -371,6 +372,9 @@ Message *decode_message(CephContext *cct, ceph_msg_header& header, ceph_msg_foot case MSG_OSD_FAILURE: m = new MOSDFailure(); break; + case MSG_OSD_MARK_ME_DOWN: + m = new MOSDMarkMeDown(); + break; case MSG_OSD_PING: m = new MOSDPing(); break; diff --git a/src/msg/Message.h b/src/msg/Message.h index 274537db7e6..1bf28e36f2d 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -68,6 +68,7 @@ #define MSG_OSD_BOOT 71 #define MSG_OSD_FAILURE 72 #define MSG_OSD_ALIVE 73 +#define MSG_OSD_MARK_ME_DOWN 74 #define MSG_OSD_SUBOP 76 #define MSG_OSD_SUBOPREPLY 77 diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc index 7888c9fa395..5d3fd2e8e3c 100644 --- a/src/os/FileStore.cc +++ b/src/os/FileStore.cc @@ -2715,7 +2715,14 @@ bool FileStore::exists(coll_t cid, const hobject_t& oid) int FileStore::stat(coll_t cid, const hobject_t& oid, struct stat *st) { int r = lfn_stat(cid, oid, st); - dout(10) << "stat " << cid << "/" << oid << " = " << r << " (size " << st->st_size << ")" << dendl; + if (r < 0) { + dout(10) << "stat " << cid << "/" << oid + << " = " << r << dendl; + } else { + dout(10) << "stat " << cid << "/" << oid + << " = " << r + << " (size " << st->st_size << ")" << dendl; + } return r; } diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 576aeed08e2..978c24056f5 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -55,6 +55,7 @@ #include "messages/MPing.h" #include "messages/MOSDPing.h" #include "messages/MOSDFailure.h" +#include "messages/MOSDMarkMeDown.h" #include "messages/MOSDOp.h" #include "messages/MOSDOpReply.h" #include "messages/MOSDSubOp.h" @@ -184,7 +185,9 @@ OSDService::OSDService(OSD *osd) : in_progress_split_lock("OSDService::in_progress_split_lock"), full_status_lock("OSDService::full_status_lock"), cur_state(NONE), - last_msg(0) + last_msg(0), + is_stopping_lock("OSDService::is_stopping_lock"), + state(NOT_STOPPING) {} void OSDService::_start_split(const set<pg_t> &pgs) @@ -258,9 +261,16 @@ void OSDService::pg_stat_queue_dequeue(PG *pg) void OSDService::shutdown() { reserver_finisher.stop(); - watch_lock.Lock(); - watch_timer.shutdown(); - watch_lock.Unlock(); + { + Mutex::Locker l(watch_lock); + watch_timer.shutdown(); + } + { + Mutex::Locker l(backfill_request_lock); + backfill_request_timer.shutdown(); + } + osdmap = OSDMapRef(); + next_osdmap = OSDMapRef(); } void OSDService::init() @@ -753,7 +763,7 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger, const std::string &dev, const std::string &jdev) : Dispatcher(external_messenger->cct), osd_lock("OSD::osd_lock"), - timer(external_messenger->cct, osd_lock), + tick_timer(external_messenger->cct, osd_lock), authorize_handler_cluster_registry(new AuthAuthorizeHandlerRegistry(external_messenger->cct, cct->_conf->auth_supported.length() ? cct->_conf->auth_supported : @@ -831,12 +841,14 @@ void OSD::handle_signal(int signum) assert(signum == SIGINT || signum == SIGTERM); derr << "*** Got signal " << sys_siglist[signum] << " ***" << dendl; //suicide(128 + signum); - suicide(0); + shutdown(); } int OSD::pre_init() { Mutex::Locker lock(osd_lock); + if (is_stopping()) + return 0; assert(!store); store = create_object_store(dev_path, journal_path); @@ -906,8 +918,10 @@ public: int OSD::init() { Mutex::Locker lock(osd_lock); + if (is_stopping()) + return 0; - timer.init(); + tick_timer.init(); service.backfill_request_timer.init(); // mount. @@ -1015,7 +1029,7 @@ int OSD::init() heartbeat_thread.create(); // tick - timer.add_event_after(g_conf->osd_heartbeat_interval, new C_Tick(this)); + tick_timer.add_event_after(g_conf->osd_heartbeat_interval, new C_Tick(this)); AdminSocket *admin_socket = cct->get_admin_socket(); asok_hook = new OSDSocketHook(this); @@ -1053,6 +1067,8 @@ int OSD::init() monc->shutdown(); store->umount(); osd_lock.Lock(); // locker is going to unlock this on function exit + if (is_stopping()) + return 0; return r; } @@ -1061,6 +1077,8 @@ int OSD::init() } osd_lock.Lock(); + if (is_stopping()) + return 0; dout(10) << "ensuring pgs have consumed prior maps" << dendl; consume_map(); @@ -1167,37 +1185,47 @@ void OSD::suicide(int exitcode) int OSD::shutdown() { - g_conf->remove_observer(this); + if (!service.prepare_to_stop()) + return 0; // already shutting down + osd_lock.Lock(); + if (is_stopping()) { + osd_lock.Unlock(); + return 0; + } + derr << "shutdown" << dendl; - service.shutdown(); + heartbeat_lock.Lock(); + state = STATE_STOPPING; + heartbeat_lock.Unlock(); + + // Debugging g_ceph_context->_conf->set_val("debug_osd", "100"); g_ceph_context->_conf->set_val("debug_journal", "100"); g_ceph_context->_conf->set_val("debug_filestore", "100"); g_ceph_context->_conf->set_val("debug_ms", "100"); g_ceph_context->_conf->apply_changes(NULL); - derr << "shutdown" << dendl; - - state = STATE_STOPPING; - - timer.shutdown(); - - service.backfill_request_lock.Lock(); - service.backfill_request_timer.shutdown(); - service.backfill_request_lock.Unlock(); - - heartbeat_lock.Lock(); - heartbeat_stop = true; - heartbeat_cond.Signal(); - heartbeat_lock.Unlock(); - heartbeat_thread.join(); - - command_tp.stop(); - + // Remove PGs + for (hash_map<pg_t, PG*>::iterator p = pg_map.begin(); + p != pg_map.end(); + ++p) { + dout(20) << " kicking pg " << p->first << dendl; + p->second->lock(); + p->second->on_shutdown(); + p->second->kick(); + p->second->unlock(); + p->second->put(); + } + pg_map.clear(); + // finish ops - op_wq.drain(); - dout(10) << "no ops" << dendl; + op_wq.drain(); // should already be empty except for lagard PGs + { + Mutex::Locker l(finished_lock); + finished.clear(); // zap waiters (bleh, this is messy) + } + // unregister commands cct->get_admin_socket()->unregister_command("dump_ops_in_flight"); cct->get_admin_socket()->unregister_command("dump_historic_ops"); cct->get_admin_socket()->unregister_command("dump_op_pq_state"); @@ -1211,48 +1239,33 @@ int OSD::shutdown() delete test_ops_hook; test_ops_hook = NULL; + osd_lock.Unlock(); + + heartbeat_lock.Lock(); + heartbeat_stop = true; + heartbeat_cond.Signal(); + heartbeat_lock.Unlock(); + heartbeat_thread.join(); + + recovery_tp.drain(); recovery_tp.stop(); dout(10) << "recovery tp stopped" << dendl; + + op_tp.drain(); op_tp.stop(); dout(10) << "op tp stopped" << dendl; - // pause _new_ disk work first (to avoid racing with thread pool), - disk_tp.pause_new(); - dout(10) << "disk tp paused (new), kicking all pgs" << dendl; - - // then kick all pgs, - for (hash_map<pg_t, PG*>::iterator p = pg_map.begin(); - p != pg_map.end(); - ++p) { - dout(20) << " kicking pg " << p->first << dendl; - p->second->lock(); - p->second->kick(); - p->second->unlock(); - } - dout(20) << " kicked all pgs" << dendl; + command_tp.drain(); + command_tp.stop(); + dout(10) << "command tp stopped" << dendl; - // then stop thread. + disk_tp.drain(); disk_tp.stop(); - dout(10) << "disk tp stopped" << dendl; - - // tell pgs we're shutting down - for (hash_map<pg_t, PG*>::iterator p = pg_map.begin(); - p != pg_map.end(); - ++p) { - p->second->lock(); - p->second->on_shutdown(); - p->second->unlock(); - } + dout(10) << "disk tp paused (new), kicking all pgs" << dendl; - osd_lock.Unlock(); - store->sync(); - store->flush(); osd_lock.Lock(); - // zap waiters (bleh, this is messy) - finished_lock.Lock(); - finished.clear(); - finished_lock.Unlock(); + tick_timer.shutdown(); // note unmount epoch dout(10) << "noting clean unmount in epoch " << osdmap->get_epoch() << dendl; @@ -1266,33 +1279,32 @@ int OSD::shutdown() << cpp_strerror(r) << dendl; } - // flush data to disk - osd_lock.Unlock(); - dout(10) << "sync" << dendl; + dout(10) << "syncing store" << dendl; + store->flush(); store->sync(); - r = store->umount(); + store->umount(); delete store; store = 0; - dout(10) << "sync done" << dendl; - osd_lock.Lock(); - - clear_pg_stat_queue(); + dout(10) << "Store synced" << dendl; - // close pgs - for (hash_map<pg_t, PG*>::iterator p = pg_map.begin(); - p != pg_map.end(); - ++p) { - PG *pg = p->second; - pg->put(); + { + Mutex::Locker l(pg_stat_queue_lock); + assert(pg_stat_queue.empty()); } - pg_map.clear(); + + g_conf->remove_observer(this); + + monc->shutdown(); + osd_lock.Unlock(); + + osdmap = OSDMapRef(); + service.shutdown(); + op_tracker.on_shutdown(); client_messenger->shutdown(); cluster_messenger->shutdown(); hbclient_messenger->shutdown(); hbserver_messenger->shutdown(); - - monc->shutdown(); return r; } @@ -1522,20 +1534,6 @@ PG *OSD::_lookup_lock_pg_with_map_lock_held(pg_t pgid) return pg; } -PG *OSD::lookup_lock_raw_pg(pg_t pgid) -{ - Mutex::Locker l(osd_lock); - if (osdmap->have_pg_pool(pgid.pool())) { - pgid = osdmap->raw_pg_to_pg(pgid); - } - if (!_have_pg(pgid)) { - return NULL; - } - PG *pg = _lookup_lock_pg(pgid); - return pg; -} - - void OSD::load_pgs() { assert(osd_lock.is_locked()); @@ -2075,10 +2073,11 @@ void OSD::_add_heartbeat_peer(int p) void OSD::need_heartbeat_peer_update() { - heartbeat_lock.Lock(); + Mutex::Locker l(heartbeat_lock); + if (is_stopping()) + return; dout(20) << "need_heartbeat_peer_update" << dendl; heartbeat_need_update = true; - heartbeat_lock.Unlock(); } void OSD::maybe_update_heartbeat_peers() @@ -2130,15 +2129,15 @@ void OSD::maybe_update_heartbeat_peers() void OSD::reset_heartbeat_peers() { + assert(osd_lock.is_locked()); dout(10) << "reset_heartbeat_peers" << dendl; - heartbeat_lock.Lock(); + Mutex::Locker l(heartbeat_lock); while (!heartbeat_peers.empty()) { hbclient_messenger->mark_down(heartbeat_peers.begin()->second.con); heartbeat_peers.begin()->second.con->put(); heartbeat_peers.erase(heartbeat_peers.begin()); } failure_queue.clear(); - heartbeat_lock.Unlock(); } void OSD::handle_osd_ping(MOSDPing *m) @@ -2153,6 +2152,10 @@ void OSD::handle_osd_ping(MOSDPing *m) int from = m->get_source().num(); heartbeat_lock.Lock(); + if (is_stopping()) { + heartbeat_lock.Unlock(); + return; + } OSDMapRef curmap = service.get_osdmap(); @@ -2252,7 +2255,9 @@ void OSD::handle_osd_ping(MOSDPing *m) void OSD::heartbeat_entry() { - heartbeat_lock.Lock(); + Mutex::Locker l(heartbeat_lock); + if (is_stopping()) + return; while (!heartbeat_stop) { heartbeat(); @@ -2261,9 +2266,10 @@ void OSD::heartbeat_entry() w.set_from_double(wait); dout(30) << "heartbeat_entry sleeping for " << wait << dendl; heartbeat_cond.WaitInterval(g_ceph_context, heartbeat_lock, w); + if (is_stopping()) + return; dout(30) << "heartbeat_entry woke up" << dendl; } - heartbeat_lock.Unlock(); } void OSD::heartbeat_check() @@ -2367,6 +2373,10 @@ bool OSD::heartbeat_reset(Connection *con) HeartbeatSession *s = static_cast<HeartbeatSession*>(con->get_priv()); if (s) { heartbeat_lock.Lock(); + if (is_stopping()) { + heartbeat_lock.Unlock(); + return true; + } map<int,HeartbeatInfo>::iterator p = heartbeat_peers.find(s->peer); if (p != heartbeat_peers.end() && p->second.con == con) { @@ -2458,7 +2468,7 @@ void OSD::tick() check_ops_in_flight(); - timer.add_event_after(1.0, new C_Tick(this)); + tick_timer.add_event_after(1.0, new C_Tick(this)); } void OSD::check_ops_in_flight() @@ -2646,6 +2656,8 @@ void OSD::ms_handle_connect(Connection *con) { if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) { Mutex::Locker l(osd_lock); + if (is_stopping()) + return; dout(10) << "ms_handle_connect on mon" << dendl; if (is_booting()) { start_boot(); @@ -2693,6 +2705,8 @@ void OSD::start_boot() void OSD::_maybe_boot(epoch_t oldest, epoch_t newest) { Mutex::Locker l(osd_lock); + if (is_stopping()) + return; dout(10) << "_maybe_boot mon has osdmaps " << oldest << ".." << newest << dendl; if (is_initializing()) { @@ -2855,6 +2869,7 @@ void OSDService::send_pg_temp() void OSD::send_failures() { + assert(osd_lock.is_locked()); bool locked = false; if (!failure_queue.empty()) { heartbeat_lock.Lock(); @@ -3359,7 +3374,6 @@ void OSD::_share_map_outgoing(int peer, Connection *con, OSDMapRef map) bool OSD::heartbeat_dispatch(Message *m) { dout(30) << "heartbeat_dispatch " << m << dendl; - switch (m->get_type()) { case CEPH_MSG_PING: @@ -3389,9 +3403,18 @@ bool OSD::heartbeat_dispatch(Message *m) bool OSD::ms_dispatch(Message *m) { + if (m->get_type() == MSG_OSD_MARK_ME_DOWN) { + service.got_stop_ack(); + return true; + } + // lock! osd_lock.Lock(); + if (is_stopping()) { + osd_lock.Unlock(); + return true; + } while (dispatch_running) { dout(10) << "ms_dispatch waiting for other dispatch thread to complete" << dendl; @@ -3845,6 +3868,37 @@ void OSDService::dec_scrubs_active() sched_scrub_lock.Unlock(); } +bool OSDService::prepare_to_stop() { + Mutex::Locker l(is_stopping_lock); + if (state != NOT_STOPPING) + return false; + + state = PREPARING_TO_STOP; + monc->send_mon_message( + new MOSDMarkMeDown( + monc->get_fsid(), + get_osdmap()->get_inst(whoami), + get_osdmap()->get_epoch(), + false + )); + utime_t now = ceph_clock_now(g_ceph_context); + utime_t timeout; + timeout.set_from_double( + now + g_conf->osd_mon_shutdown_timeout); + while ((ceph_clock_now(g_ceph_context) < timeout) && + (state != STOPPING)) { + is_stopping_cond.WaitUntil(is_stopping_lock, timeout); + } + state = STOPPING; + return true; +} + +void OSDService::got_stop_ack() { + Mutex::Locker l(is_stopping_lock); + dout(10) << "Got stop ack" << dendl; + state = STOPPING; + is_stopping_cond.Signal(); +} // ===================================================== // MAP @@ -3867,6 +3921,7 @@ void OSD::wait_for_new_map(OpRequestRef op) void OSD::note_down_osd(int peer) { + assert(osd_lock.is_locked()); cluster_messenger->mark_down(osdmap->get_cluster_addr(peer)); heartbeat_lock.Lock(); @@ -4090,9 +4145,14 @@ void OSD::handle_osd_map(MOSDMap *m) !osdmap->get_addr(whoami).probably_equals(client_messenger->get_myaddr()) || !osdmap->get_cluster_addr(whoami).probably_equals(cluster_messenger->get_myaddr()) || !osdmap->get_hb_addr(whoami).probably_equals(hbserver_messenger->get_myaddr())) { - if (!osdmap->is_up(whoami)) - clog.warn() << "map e" << osdmap->get_epoch() - << " wrongly marked me down"; + if (!osdmap->is_up(whoami)) { + if (service.is_preparing_to_stop()) { + service.got_stop_ack(); + } else { + clog.warn() << "map e" << osdmap->get_epoch() + << " wrongly marked me down"; + } + } else if (!osdmap->get_addr(whoami).probably_equals(client_messenger->get_myaddr())) clog.error() << "map e" << osdmap->get_epoch() << " had wrong client addr (" << osdmap->get_addr(whoami) @@ -4106,25 +4166,27 @@ void OSD::handle_osd_map(MOSDMap *m) << " had wrong hb addr (" << osdmap->get_hb_addr(whoami) << " != my " << hbserver_messenger->get_myaddr() << ")"; - state = STATE_BOOTING; - up_epoch = 0; - do_restart = true; - bind_epoch = osdmap->get_epoch(); + if (!service.is_stopping()) { + state = STATE_BOOTING; + up_epoch = 0; + do_restart = true; + bind_epoch = osdmap->get_epoch(); - int cport = cluster_messenger->get_myaddr().get_port(); - int hbport = hbserver_messenger->get_myaddr().get_port(); + int cport = cluster_messenger->get_myaddr().get_port(); + int hbport = hbserver_messenger->get_myaddr().get_port(); - int r = cluster_messenger->rebind(hbport); - if (r != 0) - do_shutdown = true; // FIXME: do_restart? + int r = cluster_messenger->rebind(hbport); + if (r != 0) + do_shutdown = true; // FIXME: do_restart? - r = hbserver_messenger->rebind(cport); - if (r != 0) - do_shutdown = true; // FIXME: do_restart? + r = hbserver_messenger->rebind(cport); + if (r != 0) + do_shutdown = true; // FIXME: do_restart? - hbclient_messenger->mark_down_all(); + hbclient_messenger->mark_down_all(); - reset_heartbeat_peers(); + reset_heartbeat_peers(); + } } } @@ -5733,17 +5795,6 @@ void OSD::_remove_pg(PG *pg) *i, pg->osr, deleting)); } - recovery_wq.dequeue(pg); - scrub_wq.dequeue(pg); - scrub_finalize_wq.dequeue(pg); - snap_trim_wq.dequeue(pg); - pg_stat_queue_dequeue(pg); - op_wq.dequeue(pg); - peering_wq.dequeue(pg); - - pg->deleting = true; - - pg->unreg_next_scrub(); // remove from map pg_map.erase(pg->info.pgid); @@ -6309,6 +6360,8 @@ struct C_CompleteSplits : public Context { : osd(osd), pgs(in) {} void finish(int r) { Mutex::Locker l(osd->osd_lock); + if (osd->is_stopping()) + return; PG::RecoveryCtx rctx = osd->create_context(); set<pg_t> to_complete; for (set<boost::intrusive_ptr<PG> >::iterator i = pgs.begin(); diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 1ce80e1414e..148b761f532 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -406,6 +406,24 @@ public: void check_nearfull_warning(const osd_stat_t &stat); bool check_failsafe_full(); + // -- stopping -- + Mutex is_stopping_lock; + Cond is_stopping_cond; + enum { + NOT_STOPPING, + PREPARING_TO_STOP, + STOPPING } state; + bool is_stopping() { + Mutex::Locker l(is_stopping_lock); + return state == STOPPING; + } + bool is_preparing_to_stop() { + Mutex::Locker l(is_stopping_lock); + return state == PREPARING_TO_STOP; + } + bool prepare_to_stop(); + void got_stop_ack(); + OSDService(OSD *osd); }; class OSD : public Dispatcher, @@ -419,7 +437,7 @@ public: protected: Mutex osd_lock; // global lock - SafeTimer timer; // safe timer (osd_lock) + SafeTimer tick_timer; // safe timer (osd_lock) AuthAuthorizeHandlerRegistry *authorize_handler_cluster_registry; AuthAuthorizeHandlerRegistry *authorize_handler_service_registry; @@ -890,7 +908,6 @@ protected: ObjectStore::Transaction& t); PG *_lookup_qlock_pg(pg_t pgid); - PG *lookup_lock_raw_pg(pg_t pgid); PG* _make_pg(OSDMapRef createmap, pg_t pgid); void add_newly_split_pg(PG *pg, PG::RecoveryCtx *rctx); @@ -1100,6 +1117,10 @@ protected: } void _process(Command *c) { osd->osd_lock.Lock(); + if (osd->is_stopping()) { + delete c; + return; + } osd->do_command(c->con, c->tid, c->cmd, c->indata); osd->osd_lock.Unlock(); delete c; @@ -1354,6 +1375,10 @@ protected: } void _process(MOSDRepScrub *msg) { osd->osd_lock.Lock(); + if (osd->is_stopping()) { + osd->osd_lock.Unlock(); + return; + } if (osd->_have_pg(msg->pgid)) { PG *pg = osd->_lookup_lock_pg(msg->pgid); osd->osd_lock.Unlock(); diff --git a/src/osd/OpRequest.cc b/src/osd/OpRequest.cc index b3d95367ec5..d5ce8bbc749 100644 --- a/src/osd/OpRequest.cc +++ b/src/osd/OpRequest.cc @@ -20,8 +20,16 @@ static ostream& _prefix(std::ostream* _dout) return *_dout << "--OSD::tracker-- "; } -void OpHistory::insert(utime_t now, OpRequest *op) +void OpHistory::on_shutdown() { + arrived.clear(); + duration.clear(); + shutdown = true; +} + +void OpHistory::insert(utime_t now, OpRequestRef op) +{ + assert(!shutdown); duration.insert(make_pair(op->get_duration(), op)); arrived.insert(make_pair(op->get_arrived(), op)); cleanup(now); @@ -30,11 +38,11 @@ void OpHistory::insert(utime_t now, OpRequest *op) void OpHistory::cleanup(utime_t now) { while (arrived.size() && - now - arrived.begin()->first > (double)(g_conf->osd_op_history_duration)) { + (now - arrived.begin()->first > + (double)(g_conf->osd_op_history_duration))) { duration.erase(make_pair( arrived.begin()->second->get_duration(), arrived.begin()->second)); - delete arrived.begin()->second; arrived.erase(arrived.begin()); } @@ -42,7 +50,6 @@ void OpHistory::cleanup(utime_t now) arrived.erase(make_pair( duration.begin()->second->get_arrived(), duration.begin()->second)); - delete duration.begin()->second; duration.erase(duration.begin()); } } @@ -55,7 +62,7 @@ void OpHistory::dump_ops(utime_t now, Formatter *f) f->dump_int("duration to keep", g_conf->osd_op_history_duration); { f->open_array_section("Ops"); - for (set<pair<utime_t, const OpRequest *> >::const_iterator i = + for (set<pair<utime_t, OpRequestRef> >::const_iterator i = arrived.begin(); i != arrived.end(); ++i) { @@ -109,7 +116,7 @@ void OpTracker::unregister_inflight_op(OpRequest *i) utime_t now = ceph_clock_now(g_ceph_context); i->xitem.remove_myself(); i->request->clear_data(); - history.insert(now, i); + history.insert(now, OpRequestRef(i)); } bool OpTracker::check_ops_in_flight(std::vector<string> &warning_vector) diff --git a/src/osd/OpRequest.h b/src/osd/OpRequest.h index e12fbe00c53..ca419f34ff8 100644 --- a/src/osd/OpRequest.h +++ b/src/osd/OpRequest.h @@ -26,18 +26,24 @@ #include "osd/osd_types.h" class OpRequest; +typedef std::tr1::shared_ptr<OpRequest> OpRequestRef; class OpHistory { - set<pair<utime_t, const OpRequest *> > arrived; - set<pair<double, const OpRequest *> > duration; + set<pair<utime_t, OpRequestRef> > arrived; + set<pair<double, OpRequestRef> > duration; void cleanup(utime_t now); + bool shutdown; public: - void insert(utime_t now, OpRequest *op); + OpHistory() : shutdown(false) {} + ~OpHistory() { + assert(arrived.empty()); + assert(duration.empty()); + } + void insert(utime_t now, OpRequestRef op); void dump_ops(utime_t now, Formatter *f); + void on_shutdown(); }; -class OpRequest; -typedef std::tr1::shared_ptr<OpRequest> OpRequestRef; class OpTracker { class RemoveOnDelete { OpTracker *tracker; @@ -70,6 +76,13 @@ public: void mark_event(OpRequest *op, const string &evt); void _mark_event(OpRequest *op, const string &evt, utime_t now); OpRequestRef create_request(Message *req); + void on_shutdown() { + Mutex::Locker l(ops_in_flight_lock); + history.on_shutdown(); + } + ~OpTracker() { + assert(ops_in_flight.empty()); + } }; /** diff --git a/src/osd/PG.h b/src/osd/PG.h index 658eb2d0ac9..b24c74f8bf5 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -389,7 +389,7 @@ protected: atomic_t ref; public: - bool deleting; // true while RemoveWQ should be chewing on us + bool deleting; // true while in removing or OSD is shutting down void lock(bool no_lockdep = false); void unlock() { diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index f87e334f48f..50318fc5869 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -6162,21 +6162,36 @@ void ReplicatedPG::apply_and_flush_repops(bool requeue) void ReplicatedPG::on_removal() { dout(10) << "on_removal" << dendl; - apply_and_flush_repops(false); - context_registry_on_change(); - clear_primary_state(); - osd->remove_want_pg_temp(info.pgid); - cancel_recovery(); - osd->remote_reserver.cancel_reservation(info.pgid); - osd->local_reserver.cancel_reservation(info.pgid); + on_shutdown(); } void ReplicatedPG::on_shutdown() { dout(10) << "on_shutdown" << dendl; + + // remove from queues + osd->recovery_wq.dequeue(this); + osd->scrub_wq.dequeue(this); + osd->scrub_finalize_wq.dequeue(this); + osd->snap_trim_wq.dequeue(this); + osd->pg_stat_queue_dequeue(this); + osd->dequeue_pg(this, 0); + osd->peering_wq.dequeue(this); + + // handles queue races + deleting = true; + + unreg_next_scrub(); apply_and_flush_repops(false); context_registry_on_change(); + + osd->remote_reserver.cancel_reservation(info.pgid); + osd->local_reserver.cancel_reservation(info.pgid); + + clear_primary_state(); + osd->remove_want_pg_temp(info.pgid); + cancel_recovery(); } void ReplicatedPG::on_flushed() |