diff options
-rw-r--r-- | src/common/config_opts.h | 1 | ||||
-rw-r--r-- | src/osd/OSD.cc | 250 | ||||
-rw-r--r-- | src/osd/OSD.h | 28 | ||||
-rw-r--r-- | src/osd/OSDMap.h | 24 |
4 files changed, 230 insertions, 73 deletions
diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 285f4d52335..5e87b2f1782 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -409,6 +409,7 @@ OPTION(osd_age_time, OPT_INT, 0) OPTION(osd_heartbeat_addr, OPT_ADDR, entity_addr_t()) OPTION(osd_heartbeat_interval, OPT_INT, 6) // (seconds) how often we ping peers OPTION(osd_heartbeat_grace, OPT_INT, 20) // (seconds) how long before we decide a peer has failed +OPTION(osd_heartbeat_min_peers, OPT_INT, 10) // minimum number of peers OPTION(osd_mon_heartbeat_interval, OPT_INT, 30) // (seconds) how often to ping monitor if no peers OPTION(osd_mon_report_interval_max, OPT_INT, 120) OPTION(osd_mon_report_interval_min, OPT_INT, 5) // pg stats, failures, up_thru, boot. diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 8993a1100f5..0ca3092372f 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -2274,6 +2274,23 @@ void OSD::_add_heartbeat_peer(int p) hi->epoch = osdmap->get_epoch(); } +void OSD::_remove_heartbeat_peer(int n) +{ + map<int,HeartbeatInfo>::iterator q = heartbeat_peers.find(n); + assert(q != heartbeat_peers.end()); + dout(20) << " removing heartbeat peer osd." << n + << " " << q->second.con_back->get_peer_addr() + << " " << (q->second.con_front ? q->second.con_front->get_peer_addr() : entity_addr_t()) + << dendl; + hbclient_messenger->mark_down(q->second.con_back); + q->second.con_back->put(); + if (q->second.con_front) { + hbclient_messenger->mark_down(q->second.con_front); + q->second.con_front->put(); + } + heartbeat_peers.erase(q); +} + void OSD::need_heartbeat_peer_update() { Mutex::Locker l(heartbeat_lock); @@ -2286,53 +2303,109 @@ void OSD::need_heartbeat_peer_update() void OSD::maybe_update_heartbeat_peers() { assert(osd_lock.is_locked()); - Mutex::Locker l(heartbeat_lock); + if (is_waiting_for_healthy()) { + utime_t now = ceph_clock_now(g_ceph_context); + if (last_heartbeat_resample == utime_t()) { + last_heartbeat_resample = now; + heartbeat_need_update = true; + } else if (!heartbeat_need_update) { + utime_t dur = now - last_heartbeat_resample; + if (dur > g_conf->osd_heartbeat_grace) { + dout(10) << "maybe_update_heartbeat_peers forcing update after " << dur << " seconds" << dendl; + heartbeat_need_update = true; + last_heartbeat_resample = now; + reset_heartbeat_peers(); // we want *new* peers! + } + } + } + + Mutex::Locker l(heartbeat_lock); if (!heartbeat_need_update) return; heartbeat_need_update = false; + dout(10) << "maybe_update_heartbeat_peers updating" << dendl; + heartbeat_epoch = osdmap->get_epoch(); // build heartbeat from set - for (hash_map<pg_t, PG*>::iterator i = pg_map.begin(); - i != pg_map.end(); - ++i) { - PG *pg = i->second; - pg->heartbeat_peer_lock.Lock(); - dout(20) << i->first << " heartbeat_peers " << pg->heartbeat_peers << dendl; - for (set<int>::iterator p = pg->heartbeat_peers.begin(); - p != pg->heartbeat_peers.end(); - ++p) - if (osdmap->is_up(*p)) - _add_heartbeat_peer(*p); - for (set<int>::iterator p = pg->probe_targets.begin(); - p != pg->probe_targets.end(); - ++p) - if (osdmap->is_up(*p)) - _add_heartbeat_peer(*p); - pg->heartbeat_peer_lock.Unlock(); - } - + if (is_active()) { + for (hash_map<pg_t, PG*>::iterator i = pg_map.begin(); + i != pg_map.end(); + ++i) { + PG *pg = i->second; + pg->heartbeat_peer_lock.Lock(); + dout(20) << i->first << " heartbeat_peers " << pg->heartbeat_peers << dendl; + for (set<int>::iterator p = pg->heartbeat_peers.begin(); + p != pg->heartbeat_peers.end(); + ++p) + if (osdmap->is_up(*p)) + _add_heartbeat_peer(*p); + for (set<int>::iterator p = pg->probe_targets.begin(); + p != pg->probe_targets.end(); + ++p) + if (osdmap->is_up(*p)) + _add_heartbeat_peer(*p); + pg->heartbeat_peer_lock.Unlock(); + } + } + + // include next and previous up osds to ensure we have a fully-connected set + set<int> want, extras; + int next = osdmap->get_next_up_osd_after(whoami); + if (next >= 0) + want.insert(next); + int prev = osdmap->get_previous_up_osd_before(whoami); + if (prev >= 0) + want.insert(prev); + + for (set<int>::iterator p = want.begin(); p != want.end(); ++p) { + dout(10) << " adding neighbor peer osd." << *p << dendl; + extras.insert(*p); + _add_heartbeat_peer(*p); + } + + // remove down peers; enumerate extras map<int,HeartbeatInfo>::iterator p = heartbeat_peers.begin(); while (p != heartbeat_peers.end()) { - if (p->second.epoch < osdmap->get_epoch()) { - dout(20) << " removing heartbeat peer osd." << p->first - << " " << p->second.con_back->get_peer_addr() - << " " << (p->second.con_front ? p->second.con_front->get_peer_addr() : entity_addr_t()) - << dendl; - hbclient_messenger->mark_down(p->second.con_back); - p->second.con_back->put(); - if (p->second.con_front) { - hbclient_messenger->mark_down(p->second.con_front); - p->second.con_front->put(); - } - heartbeat_peers.erase(p++); - } else { + if (!osdmap->is_up(p->first)) { + int o = p->first; ++p; + _remove_heartbeat_peer(o); + continue; + } + if (p->second.epoch < osdmap->get_epoch()) { + extras.insert(p->first); } + ++p; } - dout(10) << "maybe_update_heartbeat_peers " << heartbeat_peers.size() << " peers" << dendl; + + // too few? + int start = osdmap->get_next_up_osd_after(whoami); + for (int n = start; n >= 0; ) { + if ((int)heartbeat_peers.size() >= g_conf->osd_heartbeat_min_peers) + break; + if (!extras.count(n) && !want.count(n) && n != whoami) { + dout(10) << " adding random peer osd." << n << dendl; + extras.insert(n); + _add_heartbeat_peer(n); + } + n = osdmap->get_next_up_osd_after(n); + if (n == start) + break; // came full circle; stop + } + + // too many? + for (set<int>::iterator p = extras.begin(); + (int)heartbeat_peers.size() > g_conf->osd_heartbeat_min_peers && p != extras.end(); + ++p) { + if (want.count(*p)) + continue; + _remove_heartbeat_peer(*p); + } + + dout(10) << "maybe_update_heartbeat_peers " << heartbeat_peers.size() << " peers, extras " << extras << dendl; } void OSD::reset_heartbeat_peers() @@ -2417,6 +2490,13 @@ void OSD::handle_osd_ping(MOSDPing *m) _share_map_outgoing(from, con.get()); } } + } else if (curmap->get_down_at(from) > m->map_epoch) { + // tell them they have died + Message *r = new MOSDPing(monc->get_fsid(), + curmap->get_epoch(), + MOSDPing::YOU_DIED, + m->stamp); + m->get_connection()->get_messenger()->send_message(r, m->get_connection()); } } break; @@ -2478,8 +2558,8 @@ void OSD::handle_osd_ping(MOSDPing *m) case MOSDPing::YOU_DIED: dout(10) << "handle_osd_ping " << m->get_source_inst() << " says i am down in " << m->map_epoch << dendl; - monc->sub_want("osdmap", m->map_epoch, CEPH_SUBSCRIBE_ONETIME); - monc->renew_subs(); + if (monc->sub_want("osdmap", m->map_epoch, CEPH_SUBSCRIBE_ONETIME)) + monc->renew_subs(); break; } @@ -2528,7 +2608,7 @@ void OSD::heartbeat_check() << " last_rx_back " << p->second.last_rx_back << " last_rx_front " << p->second.last_rx_front << dendl; - if (!p->second.is_healthy(cutoff)) { + if (p->second.is_unhealthy(cutoff)) { if (p->second.last_rx_back == utime_t() || p->second.last_rx_front == utime_t()) { derr << "heartbeat_check: no reply from osd." << p->first @@ -2671,22 +2751,7 @@ void OSD::tick() logger->set(l_osd_buf, buffer::get_total_alloc()); - if (is_waiting_for_healthy()) { - if (g_ceph_context->get_heartbeat_map()->is_healthy()) { - dout(1) << "healthy again, booting" << dendl; - state = STATE_BOOTING; - start_boot(); - } - } - - if (is_active()) { - // periodically kick recovery work queue - recovery_tp.wake(); - - if (!scrub_random_backoff()) { - sched_scrub(); - } - + if (is_active() || is_waiting_for_healthy()) { map_lock.get_read(); maybe_update_heartbeat_peers(); @@ -2695,8 +2760,6 @@ void OSD::tick() heartbeat_check(); heartbeat_lock.Unlock(); - check_replay_queue(); - // mon report? utime_t now = ceph_clock_now(g_ceph_context); if (outstanding_pg_stats && @@ -2717,6 +2780,25 @@ void OSD::tick() map_lock.put_read(); } + if (is_waiting_for_healthy()) { + if (_is_healthy()) { + dout(1) << "healthy again, booting" << dendl; + state = STATE_BOOTING; + start_boot(); + } + } + + if (is_active()) { + // periodically kick recovery work queue + recovery_tp.wake(); + + if (!scrub_random_backoff()) { + sched_scrub(); + } + + check_replay_queue(); + } + // only do waiters if dispatch() isn't currently running. (if it is, // it'll do the waiters, and doing them here may screw up ordering // of op_queue vs handle_osd_map.) @@ -3052,18 +3134,16 @@ void OSD::_maybe_boot(epoch_t oldest, epoch_t newest) return; } - // if we are not healthy, do not mark ourselves up (yet) - if (!g_ceph_context->get_heartbeat_map()->is_healthy()) { - dout(5) << "not healthy, deferring boot" << dendl; - state = STATE_WAITING_FOR_HEALTHY; - return; - } - // if our map within recent history, try to add ourselves to the osdmap. if (osdmap->test_flag(CEPH_OSDMAP_NOUP)) { dout(5) << "osdmap NOUP flag is set, waiting for it to clear" << dendl; - } else if (!g_ceph_context->get_heartbeat_map()->is_healthy()) { - dout(1) << "internal heartbeats indicate we are not healthy; waiting to boot" << dendl; + } else if (is_waiting_for_healthy() || !_is_healthy()) { + // if we are not healthy, do not mark ourselves up (yet) + dout(1) << "not healthy; waiting to boot" << dendl; + if (!is_waiting_for_healthy()) + start_waiting_for_healthy(); + // send pings sooner rather than later + heartbeat_kick(); } else if (osdmap->get_epoch() >= oldest - 1 && osdmap->get_epoch() + g_conf->osd_map_message_max > newest) { _send_boot(); @@ -3078,6 +3158,41 @@ void OSD::_maybe_boot(epoch_t oldest, epoch_t newest) monc->renew_subs(); } +void OSD::start_waiting_for_healthy() +{ + dout(1) << "start_waiting_for_healthy" << dendl; + state = STATE_WAITING_FOR_HEALTHY; + last_heartbeat_resample = utime_t(); +} + +bool OSD::_is_healthy() +{ + if (!g_ceph_context->get_heartbeat_map()->is_healthy()) { + dout(1) << "is_healthy false -- internal heartbeat failed" << dendl; + return false; + } + + if (is_waiting_for_healthy()) { + Mutex::Locker l(heartbeat_lock); + utime_t cutoff = ceph_clock_now(g_ceph_context); + cutoff -= g_conf->osd_heartbeat_grace; + int num = 0, up = 0; + for (map<int,HeartbeatInfo>::iterator p = heartbeat_peers.begin(); + p != heartbeat_peers.end(); + ++p) { + if (p->second.is_healthy(cutoff)) + ++up; + ++num; + } + if (up < num / 3) { + dout(1) << "is_healthy false -- only " << up << "/" << num << " up peers (less than 1/3)" << dendl; + return false; + } + } + + return true; +} + void OSD::_send_boot() { dout(10) << "_send_boot" << dendl; @@ -4525,11 +4640,12 @@ void OSD::handle_osd_map(MOSDMap *m) << " != my " << hb_front_server_messenger->get_myaddr() << ")"; if (!service.is_stopping()) { - state = STATE_BOOTING; up_epoch = 0; do_restart = true; bind_epoch = osdmap->get_epoch(); + start_waiting_for_healthy(); + set<int> avoid_ports; avoid_ports.insert(cluster_messenger->get_myaddr().get_port()); avoid_ports.insert(hb_back_server_messenger->get_myaddr().get_port()); @@ -4577,6 +4693,9 @@ void OSD::handle_osd_map(MOSDMap *m) // yay! consume_map(); + if (is_active() || is_waiting_for_healthy()) + maybe_update_heartbeat_peers(); + if (!is_active()) { dout(10) << " not yet active; waiting for peering wq to drain" << dendl; peering_wq.drain(); @@ -4826,7 +4945,6 @@ void OSD::activate_map() dout(7) << "activate_map version " << osdmap->get_epoch() << dendl; wake_all_pg_waiters(); // the pg mapping may have shifted - maybe_update_heartbeat_peers(); if (osdmap->test_flag(CEPH_OSDMAP_FULL)) { dout(10) << " osdmap flagged full, doing onetime osdmap subscribe" << dendl; diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 99d75dc40ad..effbb5e3533 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -704,15 +704,19 @@ private: utime_t last_rx_back; ///< last time we got a ping reply on the back side epoch_t epoch; ///< most recent epoch we wanted this peer - bool is_healthy(utime_t cutoff) { + bool is_unhealthy(utime_t cutoff) { return - (last_rx_front > cutoff || - (last_rx_front == utime_t() && (last_tx == utime_t() || - first_tx > cutoff))) && - (last_rx_back > cutoff || - (last_rx_back == utime_t() && (last_tx == utime_t() || - first_tx > cutoff))); + ! ((last_rx_front > cutoff || + (last_rx_front == utime_t() && (last_tx == utime_t() || + first_tx > cutoff))) && + (last_rx_back > cutoff || + (last_rx_back == utime_t() && (last_tx == utime_t() || + first_tx > cutoff)))); + } + bool is_healthy(utime_t cutoff) { + return last_rx_front > cutoff && last_rx_back > cutoff; } + }; /// state attached to outgoing heartbeat connections struct HeartbeatSession : public RefCountedObject { @@ -730,8 +734,10 @@ private: Messenger *hbclient_messenger; Messenger *hb_front_server_messenger; Messenger *hb_back_server_messenger; + utime_t last_heartbeat_resample; ///< last time we chose random peers in waiting-for-healthy state void _add_heartbeat_peer(int p); + void _remove_heartbeat_peer(int p); bool heartbeat_reset(Connection *con); void maybe_update_heartbeat_peers(); void reset_heartbeat_peers(); @@ -740,6 +746,11 @@ private: void heartbeat_entry(); void need_heartbeat_peer_update(); + void heartbeat_kick() { + Mutex::Locker l(heartbeat_lock); + heartbeat_cond.Signal(); + } + struct T_Heartbeat : public Thread { OSD *osd; T_Heartbeat(OSD *o) : osd(o) {} @@ -1116,6 +1127,9 @@ protected: void start_boot(); void _maybe_boot(epoch_t oldest, epoch_t newest); void _send_boot(); + + void start_waiting_for_healthy(); + bool _is_healthy(); friend class C_OSD_GetVersion; diff --git a/src/osd/OSDMap.h b/src/osd/OSDMap.h index deebc376a91..0a00c40e23b 100644 --- a/src/osd/OSDMap.h +++ b/src/osd/OSDMap.h @@ -404,6 +404,30 @@ private: return -1; } + int get_next_up_osd_after(int n) const { + for (int i = n + 1; i != n; ++i) { + if (i >= get_max_osd()) + i = 0; + if (i == n) + break; + if (is_up(i)) + return i; + } + return -1; + } + + int get_previous_up_osd_before(int n) const { + for (int i = n - 1; i != n; --i) { + if (i < 0) + i = get_max_osd() - 1; + if (i == n) + break; + if (is_up(i)) + return i; + } + return -1; + } + /** * get feature bits required by the current structure * |