summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSamuel Just <sam.just@inktank.com>2013-03-21 18:46:50 -0700
committerSamuel Just <sam.just@inktank.com>2013-03-21 18:46:50 -0700
commitf00f3bc4e5db04be036ec737e4ed9d9281f64eb3 (patch)
treecb8a6e7d0af49fe06a4ba6243d0ce0088f835ae5
parent6740d512ac12263f7bee370bc14b1179f83af5be (diff)
parentfab0be1ffe308c46957d1b3ae3461b33ea7048e3 (diff)
downloadceph-f00f3bc4e5db04be036ec737e4ed9d9281f64eb3.tar.gz
Merge remote-tracking branch 'upstream/wip_osd_shutdown_notification'
Fixes: #1857 Fixes: #4267 Reviewed-by: Sage Weil <sage@inktank.com>
-rw-r--r--src/Makefile.am1
-rw-r--r--src/ceph_osd.cc11
-rw-r--r--src/common/config_opts.h3
-rw-r--r--src/common/shared_cache.hpp6
-rw-r--r--src/messages/MOSDMarkMeDown.h69
-rw-r--r--src/mon/Monitor.cc1
-rw-r--r--src/mon/OSDMonitor.cc105
-rw-r--r--src/mon/OSDMonitor.h6
-rw-r--r--src/msg/Message.cc4
-rw-r--r--src/msg/Message.h1
-rw-r--r--src/os/FileStore.cc9
-rw-r--r--src/osd/OSD.cc311
-rw-r--r--src/osd/OSD.h29
-rw-r--r--src/osd/OpRequest.cc19
-rw-r--r--src/osd/OpRequest.h23
-rw-r--r--src/osd/PG.h2
-rw-r--r--src/osd/ReplicatedPG.cc29
17 files changed, 463 insertions, 166 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index bcf899b432f..72037726c3d 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1815,6 +1815,7 @@ noinst_HEADERS = \
messages/MOSDAlive.h\
messages/MOSDBoot.h\
messages/MOSDFailure.h\
+ messages/MOSDMarkMeDown.h\
messages/MOSDMap.h\
messages/MOSDOp.h\
messages/MOSDOpReply.h\
diff --git a/src/ceph_osd.cc b/src/ceph_osd.cc
index 2965221d2b9..5a90abd6125 100644
--- a/src/ceph_osd.cc
+++ b/src/ceph_osd.cc
@@ -16,6 +16,7 @@
#include <sys/stat.h>
#include <fcntl.h>
#include <uuid/uuid.h>
+#include <boost/scoped_ptr.hpp>
#include <iostream>
#include <string>
@@ -337,8 +338,9 @@ int main(int argc, const char **argv)
"(no journal)" : g_conf->osd_journal)
<< std::endl;
- Throttle client_throttler(g_ceph_context, "osd_client_bytes",
- g_conf->osd_client_message_size_cap);
+ boost::scoped_ptr<Throttle> client_throttler(
+ new Throttle(g_ceph_context, "osd_client_bytes",
+ g_conf->osd_client_message_size_cap));
uint64_t supported =
CEPH_FEATURE_UID |
@@ -347,7 +349,9 @@ int main(int argc, const char **argv)
CEPH_FEATURE_MSG_AUTH;
client_messenger->set_default_policy(Messenger::Policy::stateless_server(supported, 0));
- client_messenger->set_policy_throttler(entity_name_t::TYPE_CLIENT, &client_throttler); // default, actually
+ client_messenger->set_policy_throttler(
+ entity_name_t::TYPE_CLIENT,
+ client_throttler.get()); // default, actually
client_messenger->set_policy(entity_name_t::TYPE_MON,
Messenger::Policy::lossy_client(supported,
CEPH_FEATURE_UID |
@@ -458,6 +462,7 @@ int main(int argc, const char **argv)
delete messenger_hbclient;
delete messenger_hbserver;
delete cluster_messenger;
+ client_throttler.reset();
g_ceph_context->put();
// cd on exit, so that gmon.out (if any) goes into a separate directory for each node.
diff --git a/src/common/config_opts.h b/src/common/config_opts.h
index 52880f59feb..f012091f870 100644
--- a/src/common/config_opts.h
+++ b/src/common/config_opts.h
@@ -436,6 +436,9 @@ OPTION(osd_failsafe_nearfull_ratio, OPT_FLOAT, .90) // what % full makes an OSD
OPTION(osd_client_op_priority, OPT_INT, 63)
OPTION(osd_recovery_op_priority, OPT_INT, 10)
+// Max time to wait between notifying mon of shutdown and shutting down
+OPTION(osd_mon_shutdown_timeout, OPT_DOUBLE, 5)
+
OPTION(filestore, OPT_BOOL, false)
// Tests index failure paths
diff --git a/src/common/shared_cache.hpp b/src/common/shared_cache.hpp
index 1bbfa0e354f..69a4c06dfbf 100644
--- a/src/common/shared_cache.hpp
+++ b/src/common/shared_cache.hpp
@@ -78,6 +78,12 @@ class SharedLRU {
public:
SharedLRU(size_t max_size = 20) : lock("SharedLRU::lock"), max_size(max_size) {}
+
+ ~SharedLRU() {
+ contents.clear();
+ lru.clear();
+ assert(weak_refs.empty());
+ }
void set_size(size_t new_size) {
list<VPtr> to_release;
diff --git a/src/messages/MOSDMarkMeDown.h b/src/messages/MOSDMarkMeDown.h
new file mode 100644
index 00000000000..e99c83d18dd
--- /dev/null
+++ b/src/messages/MOSDMarkMeDown.h
@@ -0,0 +1,69 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank Storage, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_MOSDMARKMEDOWN_H
+#define CEPH_MOSDMARKMEDOWN_H
+
+#include "messages/PaxosServiceMessage.h"
+
+class MOSDMarkMeDown : public PaxosServiceMessage {
+
+ static const int HEAD_VERSION = 1;
+
+ public:
+ uuid_d fsid;
+ entity_inst_t target_osd;
+ epoch_t e;
+ bool ack;
+
+ MOSDMarkMeDown()
+ : PaxosServiceMessage(MSG_OSD_MARK_ME_DOWN, 0, HEAD_VERSION) { }
+ MOSDMarkMeDown(const uuid_d &fs, const entity_inst_t& f,
+ epoch_t e, bool ack)
+ : PaxosServiceMessage(MSG_OSD_MARK_ME_DOWN, e, HEAD_VERSION),
+ fsid(fs), target_osd(f), ack(ack) {}
+ private:
+ ~MOSDMarkMeDown() {}
+
+public:
+ entity_inst_t get_target() { return target_osd; }
+ epoch_t get_epoch() { return e; }
+
+ void decode_payload() {
+ bufferlist::iterator p = payload.begin();
+ paxos_decode(p);
+ ::decode(fsid, p);
+ ::decode(target_osd, p);
+ ::decode(e, p);
+ ::decode(ack, p);
+ }
+ void encode_payload(uint64_t features) {
+ paxos_encode();
+ ::encode(fsid, payload);
+ ::encode(target_osd, payload);
+ ::encode(e, payload);
+ ::encode(ack, payload);
+ }
+
+ const char *get_type_name() const { return "osd_mark_me_down"; }
+ void print(ostream& out) const {
+ out << "osd_mark_me_down("
+ << "ack=" << ack
+ << ", target_osd=" << target_osd
+ << ", fsid=" << fsid
+ << ")";
+ }
+};
+
+#endif
diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc
index bcebe3a1ccb..93f55607137 100644
--- a/src/mon/Monitor.cc
+++ b/src/mon/Monitor.cc
@@ -3071,6 +3071,7 @@ bool Monitor::_ms_dispatch(Message *m)
break;
// OSDs
+ case MSG_OSD_MARK_ME_DOWN:
case MSG_OSD_FAILURE:
case MSG_OSD_BOOT:
case MSG_OSD_ALIVE:
diff --git a/src/mon/OSDMonitor.cc b/src/mon/OSDMonitor.cc
index 8c4d1a43c77..d3cefae3803 100644
--- a/src/mon/OSDMonitor.cc
+++ b/src/mon/OSDMonitor.cc
@@ -25,6 +25,7 @@
#include "crush/CrushTester.h"
#include "messages/MOSDFailure.h"
+#include "messages/MOSDMarkMeDown.h"
#include "messages/MOSDMap.h"
#include "messages/MOSDBoot.h"
#include "messages/MOSDAlive.h"
@@ -540,6 +541,8 @@ bool OSDMonitor::preprocess_query(PaxosServiceMessage *m)
return preprocess_command(static_cast<MMonCommand*>(m));
// damp updates
+ case MSG_OSD_MARK_ME_DOWN:
+ return preprocess_mark_me_down(static_cast<MOSDMarkMeDown*>(m));
case MSG_OSD_FAILURE:
return preprocess_failure(static_cast<MOSDFailure*>(m));
case MSG_OSD_BOOT:
@@ -568,6 +571,8 @@ bool OSDMonitor::prepare_update(PaxosServiceMessage *m)
switch (m->get_type()) {
// damp updates
+ case MSG_OSD_MARK_ME_DOWN:
+ return prepare_mark_me_down(static_cast<MOSDMarkMeDown*>(m));
case MSG_OSD_FAILURE:
return prepare_failure(static_cast<MOSDFailure*>(m));
case MSG_OSD_BOOT:
@@ -625,25 +630,33 @@ bool OSDMonitor::should_propose(double& delay)
// failure --
-bool OSDMonitor::preprocess_failure(MOSDFailure *m)
-{
- // who is target_osd
- int badboy = m->get_target().name.num();
-
+bool OSDMonitor::check_source(PaxosServiceMessage *m, uuid_d fsid) {
// check permissions
MonSession *session = m->get_session();
if (!session)
- goto didit;
+ return true;
if (!session->caps.check_privileges(PAXOS_OSDMAP, MON_CAP_X)) {
- dout(0) << "got MOSDFailure from entity with insufficient caps "
+ dout(0) << "got osdmap change request from entity with insufficient caps "
<< session->caps << dendl;
- goto didit;
+ return true;
+ }
+ if (fsid != mon->monmap->fsid) {
+ dout(0) << "check_source: on fsid " << fsid
+ << " != " << mon->monmap->fsid << dendl;
+ return true;
}
+ return false;
+}
+
- if (m->fsid != mon->monmap->fsid) {
- dout(0) << "preprocess_failure on fsid " << m->fsid << " != " << mon->monmap->fsid << dendl;
+bool OSDMonitor::preprocess_failure(MOSDFailure *m)
+{
+ // who is target_osd
+ int badboy = m->get_target().name.num();
+
+ // check permissions
+ if (check_source(m, m->fsid))
goto didit;
- }
// first, verify the reporting host is valid
if (m->get_orig_source().is_osd()) {
@@ -656,7 +669,6 @@ bool OSDMonitor::preprocess_failure(MOSDFailure *m)
goto didit;
}
}
-
// weird?
if (!osdmap.have_inst(badboy)) {
@@ -694,6 +706,75 @@ bool OSDMonitor::preprocess_failure(MOSDFailure *m)
return true;
}
+class C_AckMarkedDown : public Context {
+ OSDMonitor *osdmon;
+ MOSDMarkMeDown *m;
+public:
+ C_AckMarkedDown(
+ OSDMonitor *osdmon,
+ MOSDMarkMeDown *m)
+ : osdmon(osdmon), m(m) {}
+
+ void finish(int) {
+ osdmon->mon->send_reply(
+ m,
+ new MOSDMarkMeDown(
+ m->fsid,
+ m->get_target(),
+ m->get_epoch(),
+ m->ack));
+ }
+ ~C_AckMarkedDown() {
+ m->put();
+ }
+};
+
+bool OSDMonitor::preprocess_mark_me_down(MOSDMarkMeDown *m)
+{
+ int requesting_down = m->get_target().name.num();
+
+ // check permissions
+ if (check_source(m, m->fsid))
+ goto didit;
+
+ // first, verify the reporting host is valid
+ if (m->get_orig_source().is_osd()) {
+ int from = m->get_orig_source().num();
+ if (!osdmap.exists(from) ||
+ osdmap.get_addr(from) != m->get_orig_source_inst().addr ||
+ osdmap.is_down(from)) {
+ dout(5) << "preprocess_mark_me_down from dead osd."
+ << from << ", ignoring" << dendl;
+ send_incremental(m, m->get_epoch()+1);
+ goto didit;
+ }
+ }
+
+ // no down might be set
+ if (!can_mark_down(requesting_down))
+ goto didit;
+
+ dout(10) << "MOSDMarkMeDown for: " << m->get_target() << dendl;
+ return false;
+
+ didit:
+ Context *c(new C_AckMarkedDown(this, m));
+ c->complete(0);
+ return true;
+}
+
+bool OSDMonitor::prepare_mark_me_down(MOSDMarkMeDown *m)
+{
+ int target_osd = m->get_target().name.num();
+
+ assert(osdmap.is_up(target_osd));
+ assert(osdmap.get_addr(target_osd) == m->get_target().addr);
+
+ pending_inc.new_state[target_osd] = CEPH_OSD_UP;
+ wait_for_finished_proposal(new C_AckMarkedDown(this, m));
+ return true;
+}
+
bool OSDMonitor::can_mark_down(int i)
{
if (osdmap.test_flag(CEPH_OSDMAP_NODOWN)) {
diff --git a/src/mon/OSDMonitor.h b/src/mon/OSDMonitor.h
index 42d241ec2b1..15eb469c129 100644
--- a/src/mon/OSDMonitor.h
+++ b/src/mon/OSDMonitor.h
@@ -171,9 +171,15 @@ private:
void remove_redundant_pg_temp();
void remove_down_pg_temp();
int reweight_by_utilization(int oload, std::string& out_str);
+
+ bool check_source(PaxosServiceMessage *m, uuid_d fsid);
+ bool preprocess_mark_me_down(class MOSDMarkMeDown *m);
+
+ friend class C_AckMarkedDown;
bool preprocess_failure(class MOSDFailure *m);
bool prepare_failure(class MOSDFailure *m);
+ bool prepare_mark_me_down(class MOSDMarkMeDown *m);
void process_failures();
void kick_all_failures();
diff --git a/src/msg/Message.cc b/src/msg/Message.cc
index 4b3c16b49da..77be03a590b 100644
--- a/src/msg/Message.cc
+++ b/src/msg/Message.cc
@@ -58,6 +58,7 @@ using namespace std;
#include "messages/MOSDAlive.h"
#include "messages/MOSDPGTemp.h"
#include "messages/MOSDFailure.h"
+#include "messages/MOSDMarkMeDown.h"
#include "messages/MOSDPing.h"
#include "messages/MOSDOp.h"
#include "messages/MOSDOpReply.h"
@@ -371,6 +372,9 @@ Message *decode_message(CephContext *cct, ceph_msg_header& header, ceph_msg_foot
case MSG_OSD_FAILURE:
m = new MOSDFailure();
break;
+ case MSG_OSD_MARK_ME_DOWN:
+ m = new MOSDMarkMeDown();
+ break;
case MSG_OSD_PING:
m = new MOSDPing();
break;
diff --git a/src/msg/Message.h b/src/msg/Message.h
index 274537db7e6..1bf28e36f2d 100644
--- a/src/msg/Message.h
+++ b/src/msg/Message.h
@@ -68,6 +68,7 @@
#define MSG_OSD_BOOT 71
#define MSG_OSD_FAILURE 72
#define MSG_OSD_ALIVE 73
+#define MSG_OSD_MARK_ME_DOWN 74
#define MSG_OSD_SUBOP 76
#define MSG_OSD_SUBOPREPLY 77
diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc
index 7888c9fa395..5d3fd2e8e3c 100644
--- a/src/os/FileStore.cc
+++ b/src/os/FileStore.cc
@@ -2715,7 +2715,14 @@ bool FileStore::exists(coll_t cid, const hobject_t& oid)
int FileStore::stat(coll_t cid, const hobject_t& oid, struct stat *st)
{
int r = lfn_stat(cid, oid, st);
- dout(10) << "stat " << cid << "/" << oid << " = " << r << " (size " << st->st_size << ")" << dendl;
+ if (r < 0) {
+ dout(10) << "stat " << cid << "/" << oid
+ << " = " << r << dendl;
+ } else {
+ dout(10) << "stat " << cid << "/" << oid
+ << " = " << r
+ << " (size " << st->st_size << ")" << dendl;
+ }
return r;
}
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc
index 576aeed08e2..978c24056f5 100644
--- a/src/osd/OSD.cc
+++ b/src/osd/OSD.cc
@@ -55,6 +55,7 @@
#include "messages/MPing.h"
#include "messages/MOSDPing.h"
#include "messages/MOSDFailure.h"
+#include "messages/MOSDMarkMeDown.h"
#include "messages/MOSDOp.h"
#include "messages/MOSDOpReply.h"
#include "messages/MOSDSubOp.h"
@@ -184,7 +185,9 @@ OSDService::OSDService(OSD *osd) :
in_progress_split_lock("OSDService::in_progress_split_lock"),
full_status_lock("OSDService::full_status_lock"),
cur_state(NONE),
- last_msg(0)
+ last_msg(0),
+ is_stopping_lock("OSDService::is_stopping_lock"),
+ state(NOT_STOPPING)
{}
void OSDService::_start_split(const set<pg_t> &pgs)
@@ -258,9 +261,16 @@ void OSDService::pg_stat_queue_dequeue(PG *pg)
void OSDService::shutdown()
{
reserver_finisher.stop();
- watch_lock.Lock();
- watch_timer.shutdown();
- watch_lock.Unlock();
+ {
+ Mutex::Locker l(watch_lock);
+ watch_timer.shutdown();
+ }
+ {
+ Mutex::Locker l(backfill_request_lock);
+ backfill_request_timer.shutdown();
+ }
+ osdmap = OSDMapRef();
+ next_osdmap = OSDMapRef();
}
void OSDService::init()
@@ -753,7 +763,7 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger,
const std::string &dev, const std::string &jdev) :
Dispatcher(external_messenger->cct),
osd_lock("OSD::osd_lock"),
- timer(external_messenger->cct, osd_lock),
+ tick_timer(external_messenger->cct, osd_lock),
authorize_handler_cluster_registry(new AuthAuthorizeHandlerRegistry(external_messenger->cct,
cct->_conf->auth_supported.length() ?
cct->_conf->auth_supported :
@@ -831,12 +841,14 @@ void OSD::handle_signal(int signum)
assert(signum == SIGINT || signum == SIGTERM);
derr << "*** Got signal " << sys_siglist[signum] << " ***" << dendl;
//suicide(128 + signum);
- suicide(0);
+ shutdown();
}
int OSD::pre_init()
{
Mutex::Locker lock(osd_lock);
+ if (is_stopping())
+ return 0;
assert(!store);
store = create_object_store(dev_path, journal_path);
@@ -906,8 +918,10 @@ public:
int OSD::init()
{
Mutex::Locker lock(osd_lock);
+ if (is_stopping())
+ return 0;
- timer.init();
+ tick_timer.init();
service.backfill_request_timer.init();
// mount.
@@ -1015,7 +1029,7 @@ int OSD::init()
heartbeat_thread.create();
// tick
- timer.add_event_after(g_conf->osd_heartbeat_interval, new C_Tick(this));
+ tick_timer.add_event_after(g_conf->osd_heartbeat_interval, new C_Tick(this));
AdminSocket *admin_socket = cct->get_admin_socket();
asok_hook = new OSDSocketHook(this);
@@ -1053,6 +1067,8 @@ int OSD::init()
monc->shutdown();
store->umount();
osd_lock.Lock(); // locker is going to unlock this on function exit
+ if (is_stopping())
+ return 0;
return r;
}
@@ -1061,6 +1077,8 @@ int OSD::init()
}
osd_lock.Lock();
+ if (is_stopping())
+ return 0;
dout(10) << "ensuring pgs have consumed prior maps" << dendl;
consume_map();
@@ -1167,37 +1185,47 @@ void OSD::suicide(int exitcode)
int OSD::shutdown()
{
- g_conf->remove_observer(this);
+ if (!service.prepare_to_stop())
+ return 0; // already shutting down
+ osd_lock.Lock();
+ if (is_stopping()) {
+ osd_lock.Unlock();
+ return 0;
+ }
+ derr << "shutdown" << dendl;
- service.shutdown();
+ heartbeat_lock.Lock();
+ state = STATE_STOPPING;
+ heartbeat_lock.Unlock();
+
+ // Debugging
g_ceph_context->_conf->set_val("debug_osd", "100");
g_ceph_context->_conf->set_val("debug_journal", "100");
g_ceph_context->_conf->set_val("debug_filestore", "100");
g_ceph_context->_conf->set_val("debug_ms", "100");
g_ceph_context->_conf->apply_changes(NULL);
- derr << "shutdown" << dendl;
-
- state = STATE_STOPPING;
-
- timer.shutdown();
-
- service.backfill_request_lock.Lock();
- service.backfill_request_timer.shutdown();
- service.backfill_request_lock.Unlock();
-
- heartbeat_lock.Lock();
- heartbeat_stop = true;
- heartbeat_cond.Signal();
- heartbeat_lock.Unlock();
- heartbeat_thread.join();
-
- command_tp.stop();
-
+ // Remove PGs
+ for (hash_map<pg_t, PG*>::iterator p = pg_map.begin();
+ p != pg_map.end();
+ ++p) {
+ dout(20) << " kicking pg " << p->first << dendl;
+ p->second->lock();
+ p->second->on_shutdown();
+ p->second->kick();
+ p->second->unlock();
+ p->second->put();
+ }
+ pg_map.clear();
+
// finish ops
- op_wq.drain();
- dout(10) << "no ops" << dendl;
+ op_wq.drain(); // should already be empty except for lagard PGs
+ {
+ Mutex::Locker l(finished_lock);
+ finished.clear(); // zap waiters (bleh, this is messy)
+ }
+ // unregister commands
cct->get_admin_socket()->unregister_command("dump_ops_in_flight");
cct->get_admin_socket()->unregister_command("dump_historic_ops");
cct->get_admin_socket()->unregister_command("dump_op_pq_state");
@@ -1211,48 +1239,33 @@ int OSD::shutdown()
delete test_ops_hook;
test_ops_hook = NULL;
+ osd_lock.Unlock();
+
+ heartbeat_lock.Lock();
+ heartbeat_stop = true;
+ heartbeat_cond.Signal();
+ heartbeat_lock.Unlock();
+ heartbeat_thread.join();
+
+ recovery_tp.drain();
recovery_tp.stop();
dout(10) << "recovery tp stopped" << dendl;
+
+ op_tp.drain();
op_tp.stop();
dout(10) << "op tp stopped" << dendl;
- // pause _new_ disk work first (to avoid racing with thread pool),
- disk_tp.pause_new();
- dout(10) << "disk tp paused (new), kicking all pgs" << dendl;
-
- // then kick all pgs,
- for (hash_map<pg_t, PG*>::iterator p = pg_map.begin();
- p != pg_map.end();
- ++p) {
- dout(20) << " kicking pg " << p->first << dendl;
- p->second->lock();
- p->second->kick();
- p->second->unlock();
- }
- dout(20) << " kicked all pgs" << dendl;
+ command_tp.drain();
+ command_tp.stop();
+ dout(10) << "command tp stopped" << dendl;
- // then stop thread.
+ disk_tp.drain();
disk_tp.stop();
- dout(10) << "disk tp stopped" << dendl;
-
- // tell pgs we're shutting down
- for (hash_map<pg_t, PG*>::iterator p = pg_map.begin();
- p != pg_map.end();
- ++p) {
- p->second->lock();
- p->second->on_shutdown();
- p->second->unlock();
- }
+ dout(10) << "disk tp paused (new), kicking all pgs" << dendl;
- osd_lock.Unlock();
- store->sync();
- store->flush();
osd_lock.Lock();
- // zap waiters (bleh, this is messy)
- finished_lock.Lock();
- finished.clear();
- finished_lock.Unlock();
+ tick_timer.shutdown();
// note unmount epoch
dout(10) << "noting clean unmount in epoch " << osdmap->get_epoch() << dendl;
@@ -1266,33 +1279,32 @@ int OSD::shutdown()
<< cpp_strerror(r) << dendl;
}
- // flush data to disk
- osd_lock.Unlock();
- dout(10) << "sync" << dendl;
+ dout(10) << "syncing store" << dendl;
+ store->flush();
store->sync();
- r = store->umount();
+ store->umount();
delete store;
store = 0;
- dout(10) << "sync done" << dendl;
- osd_lock.Lock();
-
- clear_pg_stat_queue();
+ dout(10) << "Store synced" << dendl;
- // close pgs
- for (hash_map<pg_t, PG*>::iterator p = pg_map.begin();
- p != pg_map.end();
- ++p) {
- PG *pg = p->second;
- pg->put();
+ {
+ Mutex::Locker l(pg_stat_queue_lock);
+ assert(pg_stat_queue.empty());
}
- pg_map.clear();
+
+ g_conf->remove_observer(this);
+
+ monc->shutdown();
+ osd_lock.Unlock();
+
+ osdmap = OSDMapRef();
+ service.shutdown();
+ op_tracker.on_shutdown();
client_messenger->shutdown();
cluster_messenger->shutdown();
hbclient_messenger->shutdown();
hbserver_messenger->shutdown();
-
- monc->shutdown();
return r;
}
@@ -1522,20 +1534,6 @@ PG *OSD::_lookup_lock_pg_with_map_lock_held(pg_t pgid)
return pg;
}
-PG *OSD::lookup_lock_raw_pg(pg_t pgid)
-{
- Mutex::Locker l(osd_lock);
- if (osdmap->have_pg_pool(pgid.pool())) {
- pgid = osdmap->raw_pg_to_pg(pgid);
- }
- if (!_have_pg(pgid)) {
- return NULL;
- }
- PG *pg = _lookup_lock_pg(pgid);
- return pg;
-}
-
-
void OSD::load_pgs()
{
assert(osd_lock.is_locked());
@@ -2075,10 +2073,11 @@ void OSD::_add_heartbeat_peer(int p)
void OSD::need_heartbeat_peer_update()
{
- heartbeat_lock.Lock();
+ Mutex::Locker l(heartbeat_lock);
+ if (is_stopping())
+ return;
dout(20) << "need_heartbeat_peer_update" << dendl;
heartbeat_need_update = true;
- heartbeat_lock.Unlock();
}
void OSD::maybe_update_heartbeat_peers()
@@ -2130,15 +2129,15 @@ void OSD::maybe_update_heartbeat_peers()
void OSD::reset_heartbeat_peers()
{
+ assert(osd_lock.is_locked());
dout(10) << "reset_heartbeat_peers" << dendl;
- heartbeat_lock.Lock();
+ Mutex::Locker l(heartbeat_lock);
while (!heartbeat_peers.empty()) {
hbclient_messenger->mark_down(heartbeat_peers.begin()->second.con);
heartbeat_peers.begin()->second.con->put();
heartbeat_peers.erase(heartbeat_peers.begin());
}
failure_queue.clear();
- heartbeat_lock.Unlock();
}
void OSD::handle_osd_ping(MOSDPing *m)
@@ -2153,6 +2152,10 @@ void OSD::handle_osd_ping(MOSDPing *m)
int from = m->get_source().num();
heartbeat_lock.Lock();
+ if (is_stopping()) {
+ heartbeat_lock.Unlock();
+ return;
+ }
OSDMapRef curmap = service.get_osdmap();
@@ -2252,7 +2255,9 @@ void OSD::handle_osd_ping(MOSDPing *m)
void OSD::heartbeat_entry()
{
- heartbeat_lock.Lock();
+ Mutex::Locker l(heartbeat_lock);
+ if (is_stopping())
+ return;
while (!heartbeat_stop) {
heartbeat();
@@ -2261,9 +2266,10 @@ void OSD::heartbeat_entry()
w.set_from_double(wait);
dout(30) << "heartbeat_entry sleeping for " << wait << dendl;
heartbeat_cond.WaitInterval(g_ceph_context, heartbeat_lock, w);
+ if (is_stopping())
+ return;
dout(30) << "heartbeat_entry woke up" << dendl;
}
- heartbeat_lock.Unlock();
}
void OSD::heartbeat_check()
@@ -2367,6 +2373,10 @@ bool OSD::heartbeat_reset(Connection *con)
HeartbeatSession *s = static_cast<HeartbeatSession*>(con->get_priv());
if (s) {
heartbeat_lock.Lock();
+ if (is_stopping()) {
+ heartbeat_lock.Unlock();
+ return true;
+ }
map<int,HeartbeatInfo>::iterator p = heartbeat_peers.find(s->peer);
if (p != heartbeat_peers.end() &&
p->second.con == con) {
@@ -2458,7 +2468,7 @@ void OSD::tick()
check_ops_in_flight();
- timer.add_event_after(1.0, new C_Tick(this));
+ tick_timer.add_event_after(1.0, new C_Tick(this));
}
void OSD::check_ops_in_flight()
@@ -2646,6 +2656,8 @@ void OSD::ms_handle_connect(Connection *con)
{
if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
Mutex::Locker l(osd_lock);
+ if (is_stopping())
+ return;
dout(10) << "ms_handle_connect on mon" << dendl;
if (is_booting()) {
start_boot();
@@ -2693,6 +2705,8 @@ void OSD::start_boot()
void OSD::_maybe_boot(epoch_t oldest, epoch_t newest)
{
Mutex::Locker l(osd_lock);
+ if (is_stopping())
+ return;
dout(10) << "_maybe_boot mon has osdmaps " << oldest << ".." << newest << dendl;
if (is_initializing()) {
@@ -2855,6 +2869,7 @@ void OSDService::send_pg_temp()
void OSD::send_failures()
{
+ assert(osd_lock.is_locked());
bool locked = false;
if (!failure_queue.empty()) {
heartbeat_lock.Lock();
@@ -3359,7 +3374,6 @@ void OSD::_share_map_outgoing(int peer, Connection *con, OSDMapRef map)
bool OSD::heartbeat_dispatch(Message *m)
{
dout(30) << "heartbeat_dispatch " << m << dendl;
-
switch (m->get_type()) {
case CEPH_MSG_PING:
@@ -3389,9 +3403,18 @@ bool OSD::heartbeat_dispatch(Message *m)
bool OSD::ms_dispatch(Message *m)
{
+ if (m->get_type() == MSG_OSD_MARK_ME_DOWN) {
+ service.got_stop_ack();
+ return true;
+ }
+
// lock!
osd_lock.Lock();
+ if (is_stopping()) {
+ osd_lock.Unlock();
+ return true;
+ }
while (dispatch_running) {
dout(10) << "ms_dispatch waiting for other dispatch thread to complete" << dendl;
@@ -3845,6 +3868,37 @@ void OSDService::dec_scrubs_active()
sched_scrub_lock.Unlock();
}
+bool OSDService::prepare_to_stop() {
+ Mutex::Locker l(is_stopping_lock);
+ if (state != NOT_STOPPING)
+ return false;
+
+ state = PREPARING_TO_STOP;
+ monc->send_mon_message(
+ new MOSDMarkMeDown(
+ monc->get_fsid(),
+ get_osdmap()->get_inst(whoami),
+ get_osdmap()->get_epoch(),
+ false
+ ));
+ utime_t now = ceph_clock_now(g_ceph_context);
+ utime_t timeout;
+ timeout.set_from_double(
+ now + g_conf->osd_mon_shutdown_timeout);
+ while ((ceph_clock_now(g_ceph_context) < timeout) &&
+ (state != STOPPING)) {
+ is_stopping_cond.WaitUntil(is_stopping_lock, timeout);
+ }
+ state = STOPPING;
+ return true;
+}
+
+void OSDService::got_stop_ack() {
+ Mutex::Locker l(is_stopping_lock);
+ dout(10) << "Got stop ack" << dendl;
+ state = STOPPING;
+ is_stopping_cond.Signal();
+}
// =====================================================
// MAP
@@ -3867,6 +3921,7 @@ void OSD::wait_for_new_map(OpRequestRef op)
void OSD::note_down_osd(int peer)
{
+ assert(osd_lock.is_locked());
cluster_messenger->mark_down(osdmap->get_cluster_addr(peer));
heartbeat_lock.Lock();
@@ -4090,9 +4145,14 @@ void OSD::handle_osd_map(MOSDMap *m)
!osdmap->get_addr(whoami).probably_equals(client_messenger->get_myaddr()) ||
!osdmap->get_cluster_addr(whoami).probably_equals(cluster_messenger->get_myaddr()) ||
!osdmap->get_hb_addr(whoami).probably_equals(hbserver_messenger->get_myaddr())) {
- if (!osdmap->is_up(whoami))
- clog.warn() << "map e" << osdmap->get_epoch()
- << " wrongly marked me down";
+ if (!osdmap->is_up(whoami)) {
+ if (service.is_preparing_to_stop()) {
+ service.got_stop_ack();
+ } else {
+ clog.warn() << "map e" << osdmap->get_epoch()
+ << " wrongly marked me down";
+ }
+ }
else if (!osdmap->get_addr(whoami).probably_equals(client_messenger->get_myaddr()))
clog.error() << "map e" << osdmap->get_epoch()
<< " had wrong client addr (" << osdmap->get_addr(whoami)
@@ -4106,25 +4166,27 @@ void OSD::handle_osd_map(MOSDMap *m)
<< " had wrong hb addr (" << osdmap->get_hb_addr(whoami)
<< " != my " << hbserver_messenger->get_myaddr() << ")";
- state = STATE_BOOTING;
- up_epoch = 0;
- do_restart = true;
- bind_epoch = osdmap->get_epoch();
+ if (!service.is_stopping()) {
+ state = STATE_BOOTING;
+ up_epoch = 0;
+ do_restart = true;
+ bind_epoch = osdmap->get_epoch();
- int cport = cluster_messenger->get_myaddr().get_port();
- int hbport = hbserver_messenger->get_myaddr().get_port();
+ int cport = cluster_messenger->get_myaddr().get_port();
+ int hbport = hbserver_messenger->get_myaddr().get_port();
- int r = cluster_messenger->rebind(hbport);
- if (r != 0)
- do_shutdown = true; // FIXME: do_restart?
+ int r = cluster_messenger->rebind(hbport);
+ if (r != 0)
+ do_shutdown = true; // FIXME: do_restart?
- r = hbserver_messenger->rebind(cport);
- if (r != 0)
- do_shutdown = true; // FIXME: do_restart?
+ r = hbserver_messenger->rebind(cport);
+ if (r != 0)
+ do_shutdown = true; // FIXME: do_restart?
- hbclient_messenger->mark_down_all();
+ hbclient_messenger->mark_down_all();
- reset_heartbeat_peers();
+ reset_heartbeat_peers();
+ }
}
}
@@ -5733,17 +5795,6 @@ void OSD::_remove_pg(PG *pg)
*i, pg->osr, deleting));
}
- recovery_wq.dequeue(pg);
- scrub_wq.dequeue(pg);
- scrub_finalize_wq.dequeue(pg);
- snap_trim_wq.dequeue(pg);
- pg_stat_queue_dequeue(pg);
- op_wq.dequeue(pg);
- peering_wq.dequeue(pg);
-
- pg->deleting = true;
-
- pg->unreg_next_scrub();
// remove from map
pg_map.erase(pg->info.pgid);
@@ -6309,6 +6360,8 @@ struct C_CompleteSplits : public Context {
: osd(osd), pgs(in) {}
void finish(int r) {
Mutex::Locker l(osd->osd_lock);
+ if (osd->is_stopping())
+ return;
PG::RecoveryCtx rctx = osd->create_context();
set<pg_t> to_complete;
for (set<boost::intrusive_ptr<PG> >::iterator i = pgs.begin();
diff --git a/src/osd/OSD.h b/src/osd/OSD.h
index 1ce80e1414e..148b761f532 100644
--- a/src/osd/OSD.h
+++ b/src/osd/OSD.h
@@ -406,6 +406,24 @@ public:
void check_nearfull_warning(const osd_stat_t &stat);
bool check_failsafe_full();
+ // -- stopping --
+ Mutex is_stopping_lock;
+ Cond is_stopping_cond;
+ enum {
+ NOT_STOPPING,
+ PREPARING_TO_STOP,
+ STOPPING } state;
+ bool is_stopping() {
+ Mutex::Locker l(is_stopping_lock);
+ return state == STOPPING;
+ }
+ bool is_preparing_to_stop() {
+ Mutex::Locker l(is_stopping_lock);
+ return state == PREPARING_TO_STOP;
+ }
+ bool prepare_to_stop();
+ void got_stop_ack();
+
OSDService(OSD *osd);
};
class OSD : public Dispatcher,
@@ -419,7 +437,7 @@ public:
protected:
Mutex osd_lock; // global lock
- SafeTimer timer; // safe timer (osd_lock)
+ SafeTimer tick_timer; // safe timer (osd_lock)
AuthAuthorizeHandlerRegistry *authorize_handler_cluster_registry;
AuthAuthorizeHandlerRegistry *authorize_handler_service_registry;
@@ -890,7 +908,6 @@ protected:
ObjectStore::Transaction& t);
PG *_lookup_qlock_pg(pg_t pgid);
- PG *lookup_lock_raw_pg(pg_t pgid);
PG* _make_pg(OSDMapRef createmap, pg_t pgid);
void add_newly_split_pg(PG *pg,
PG::RecoveryCtx *rctx);
@@ -1100,6 +1117,10 @@ protected:
}
void _process(Command *c) {
osd->osd_lock.Lock();
+ if (osd->is_stopping()) {
+ delete c;
+ return;
+ }
osd->do_command(c->con, c->tid, c->cmd, c->indata);
osd->osd_lock.Unlock();
delete c;
@@ -1354,6 +1375,10 @@ protected:
}
void _process(MOSDRepScrub *msg) {
osd->osd_lock.Lock();
+ if (osd->is_stopping()) {
+ osd->osd_lock.Unlock();
+ return;
+ }
if (osd->_have_pg(msg->pgid)) {
PG *pg = osd->_lookup_lock_pg(msg->pgid);
osd->osd_lock.Unlock();
diff --git a/src/osd/OpRequest.cc b/src/osd/OpRequest.cc
index b3d95367ec5..d5ce8bbc749 100644
--- a/src/osd/OpRequest.cc
+++ b/src/osd/OpRequest.cc
@@ -20,8 +20,16 @@ static ostream& _prefix(std::ostream* _dout)
return *_dout << "--OSD::tracker-- ";
}
-void OpHistory::insert(utime_t now, OpRequest *op)
+void OpHistory::on_shutdown()
{
+ arrived.clear();
+ duration.clear();
+ shutdown = true;
+}
+
+void OpHistory::insert(utime_t now, OpRequestRef op)
+{
+ assert(!shutdown);
duration.insert(make_pair(op->get_duration(), op));
arrived.insert(make_pair(op->get_arrived(), op));
cleanup(now);
@@ -30,11 +38,11 @@ void OpHistory::insert(utime_t now, OpRequest *op)
void OpHistory::cleanup(utime_t now)
{
while (arrived.size() &&
- now - arrived.begin()->first > (double)(g_conf->osd_op_history_duration)) {
+ (now - arrived.begin()->first >
+ (double)(g_conf->osd_op_history_duration))) {
duration.erase(make_pair(
arrived.begin()->second->get_duration(),
arrived.begin()->second));
- delete arrived.begin()->second;
arrived.erase(arrived.begin());
}
@@ -42,7 +50,6 @@ void OpHistory::cleanup(utime_t now)
arrived.erase(make_pair(
duration.begin()->second->get_arrived(),
duration.begin()->second));
- delete duration.begin()->second;
duration.erase(duration.begin());
}
}
@@ -55,7 +62,7 @@ void OpHistory::dump_ops(utime_t now, Formatter *f)
f->dump_int("duration to keep", g_conf->osd_op_history_duration);
{
f->open_array_section("Ops");
- for (set<pair<utime_t, const OpRequest *> >::const_iterator i =
+ for (set<pair<utime_t, OpRequestRef> >::const_iterator i =
arrived.begin();
i != arrived.end();
++i) {
@@ -109,7 +116,7 @@ void OpTracker::unregister_inflight_op(OpRequest *i)
utime_t now = ceph_clock_now(g_ceph_context);
i->xitem.remove_myself();
i->request->clear_data();
- history.insert(now, i);
+ history.insert(now, OpRequestRef(i));
}
bool OpTracker::check_ops_in_flight(std::vector<string> &warning_vector)
diff --git a/src/osd/OpRequest.h b/src/osd/OpRequest.h
index e12fbe00c53..ca419f34ff8 100644
--- a/src/osd/OpRequest.h
+++ b/src/osd/OpRequest.h
@@ -26,18 +26,24 @@
#include "osd/osd_types.h"
class OpRequest;
+typedef std::tr1::shared_ptr<OpRequest> OpRequestRef;
class OpHistory {
- set<pair<utime_t, const OpRequest *> > arrived;
- set<pair<double, const OpRequest *> > duration;
+ set<pair<utime_t, OpRequestRef> > arrived;
+ set<pair<double, OpRequestRef> > duration;
void cleanup(utime_t now);
+ bool shutdown;
public:
- void insert(utime_t now, OpRequest *op);
+ OpHistory() : shutdown(false) {}
+ ~OpHistory() {
+ assert(arrived.empty());
+ assert(duration.empty());
+ }
+ void insert(utime_t now, OpRequestRef op);
void dump_ops(utime_t now, Formatter *f);
+ void on_shutdown();
};
-class OpRequest;
-typedef std::tr1::shared_ptr<OpRequest> OpRequestRef;
class OpTracker {
class RemoveOnDelete {
OpTracker *tracker;
@@ -70,6 +76,13 @@ public:
void mark_event(OpRequest *op, const string &evt);
void _mark_event(OpRequest *op, const string &evt, utime_t now);
OpRequestRef create_request(Message *req);
+ void on_shutdown() {
+ Mutex::Locker l(ops_in_flight_lock);
+ history.on_shutdown();
+ }
+ ~OpTracker() {
+ assert(ops_in_flight.empty());
+ }
};
/**
diff --git a/src/osd/PG.h b/src/osd/PG.h
index 658eb2d0ac9..b24c74f8bf5 100644
--- a/src/osd/PG.h
+++ b/src/osd/PG.h
@@ -389,7 +389,7 @@ protected:
atomic_t ref;
public:
- bool deleting; // true while RemoveWQ should be chewing on us
+ bool deleting; // true while in removing or OSD is shutting down
void lock(bool no_lockdep = false);
void unlock() {
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index f87e334f48f..50318fc5869 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -6162,21 +6162,36 @@ void ReplicatedPG::apply_and_flush_repops(bool requeue)
void ReplicatedPG::on_removal()
{
dout(10) << "on_removal" << dendl;
- apply_and_flush_repops(false);
- context_registry_on_change();
- clear_primary_state();
- osd->remove_want_pg_temp(info.pgid);
- cancel_recovery();
- osd->remote_reserver.cancel_reservation(info.pgid);
- osd->local_reserver.cancel_reservation(info.pgid);
+ on_shutdown();
}
void ReplicatedPG::on_shutdown()
{
dout(10) << "on_shutdown" << dendl;
+
+ // remove from queues
+ osd->recovery_wq.dequeue(this);
+ osd->scrub_wq.dequeue(this);
+ osd->scrub_finalize_wq.dequeue(this);
+ osd->snap_trim_wq.dequeue(this);
+ osd->pg_stat_queue_dequeue(this);
+ osd->dequeue_pg(this, 0);
+ osd->peering_wq.dequeue(this);
+
+ // handles queue races
+ deleting = true;
+
+ unreg_next_scrub();
apply_and_flush_repops(false);
context_registry_on_change();
+
+ osd->remote_reserver.cancel_reservation(info.pgid);
+ osd->local_reserver.cancel_reservation(info.pgid);
+
+ clear_primary_state();
+ osd->remove_want_pg_temp(info.pgid);
+ cancel_recovery();
}
void ReplicatedPG::on_flushed()