summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/common/config_opts.h1
-rw-r--r--src/osd/OSD.cc250
-rw-r--r--src/osd/OSD.h28
-rw-r--r--src/osd/OSDMap.h24
4 files changed, 230 insertions, 73 deletions
diff --git a/src/common/config_opts.h b/src/common/config_opts.h
index 285f4d52335..5e87b2f1782 100644
--- a/src/common/config_opts.h
+++ b/src/common/config_opts.h
@@ -409,6 +409,7 @@ OPTION(osd_age_time, OPT_INT, 0)
OPTION(osd_heartbeat_addr, OPT_ADDR, entity_addr_t())
OPTION(osd_heartbeat_interval, OPT_INT, 6) // (seconds) how often we ping peers
OPTION(osd_heartbeat_grace, OPT_INT, 20) // (seconds) how long before we decide a peer has failed
+OPTION(osd_heartbeat_min_peers, OPT_INT, 10) // minimum number of peers
OPTION(osd_mon_heartbeat_interval, OPT_INT, 30) // (seconds) how often to ping monitor if no peers
OPTION(osd_mon_report_interval_max, OPT_INT, 120)
OPTION(osd_mon_report_interval_min, OPT_INT, 5) // pg stats, failures, up_thru, boot.
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc
index 8993a1100f5..0ca3092372f 100644
--- a/src/osd/OSD.cc
+++ b/src/osd/OSD.cc
@@ -2274,6 +2274,23 @@ void OSD::_add_heartbeat_peer(int p)
hi->epoch = osdmap->get_epoch();
}
+void OSD::_remove_heartbeat_peer(int n)
+{
+ map<int,HeartbeatInfo>::iterator q = heartbeat_peers.find(n);
+ assert(q != heartbeat_peers.end());
+ dout(20) << " removing heartbeat peer osd." << n
+ << " " << q->second.con_back->get_peer_addr()
+ << " " << (q->second.con_front ? q->second.con_front->get_peer_addr() : entity_addr_t())
+ << dendl;
+ hbclient_messenger->mark_down(q->second.con_back);
+ q->second.con_back->put();
+ if (q->second.con_front) {
+ hbclient_messenger->mark_down(q->second.con_front);
+ q->second.con_front->put();
+ }
+ heartbeat_peers.erase(q);
+}
+
void OSD::need_heartbeat_peer_update()
{
Mutex::Locker l(heartbeat_lock);
@@ -2286,53 +2303,109 @@ void OSD::need_heartbeat_peer_update()
void OSD::maybe_update_heartbeat_peers()
{
assert(osd_lock.is_locked());
- Mutex::Locker l(heartbeat_lock);
+ if (is_waiting_for_healthy()) {
+ utime_t now = ceph_clock_now(g_ceph_context);
+ if (last_heartbeat_resample == utime_t()) {
+ last_heartbeat_resample = now;
+ heartbeat_need_update = true;
+ } else if (!heartbeat_need_update) {
+ utime_t dur = now - last_heartbeat_resample;
+ if (dur > g_conf->osd_heartbeat_grace) {
+ dout(10) << "maybe_update_heartbeat_peers forcing update after " << dur << " seconds" << dendl;
+ heartbeat_need_update = true;
+ last_heartbeat_resample = now;
+ reset_heartbeat_peers(); // we want *new* peers!
+ }
+ }
+ }
+
+ Mutex::Locker l(heartbeat_lock);
if (!heartbeat_need_update)
return;
heartbeat_need_update = false;
+ dout(10) << "maybe_update_heartbeat_peers updating" << dendl;
+
heartbeat_epoch = osdmap->get_epoch();
// build heartbeat from set
- for (hash_map<pg_t, PG*>::iterator i = pg_map.begin();
- i != pg_map.end();
- ++i) {
- PG *pg = i->second;
- pg->heartbeat_peer_lock.Lock();
- dout(20) << i->first << " heartbeat_peers " << pg->heartbeat_peers << dendl;
- for (set<int>::iterator p = pg->heartbeat_peers.begin();
- p != pg->heartbeat_peers.end();
- ++p)
- if (osdmap->is_up(*p))
- _add_heartbeat_peer(*p);
- for (set<int>::iterator p = pg->probe_targets.begin();
- p != pg->probe_targets.end();
- ++p)
- if (osdmap->is_up(*p))
- _add_heartbeat_peer(*p);
- pg->heartbeat_peer_lock.Unlock();
- }
-
+ if (is_active()) {
+ for (hash_map<pg_t, PG*>::iterator i = pg_map.begin();
+ i != pg_map.end();
+ ++i) {
+ PG *pg = i->second;
+ pg->heartbeat_peer_lock.Lock();
+ dout(20) << i->first << " heartbeat_peers " << pg->heartbeat_peers << dendl;
+ for (set<int>::iterator p = pg->heartbeat_peers.begin();
+ p != pg->heartbeat_peers.end();
+ ++p)
+ if (osdmap->is_up(*p))
+ _add_heartbeat_peer(*p);
+ for (set<int>::iterator p = pg->probe_targets.begin();
+ p != pg->probe_targets.end();
+ ++p)
+ if (osdmap->is_up(*p))
+ _add_heartbeat_peer(*p);
+ pg->heartbeat_peer_lock.Unlock();
+ }
+ }
+
+ // include next and previous up osds to ensure we have a fully-connected set
+ set<int> want, extras;
+ int next = osdmap->get_next_up_osd_after(whoami);
+ if (next >= 0)
+ want.insert(next);
+ int prev = osdmap->get_previous_up_osd_before(whoami);
+ if (prev >= 0)
+ want.insert(prev);
+
+ for (set<int>::iterator p = want.begin(); p != want.end(); ++p) {
+ dout(10) << " adding neighbor peer osd." << *p << dendl;
+ extras.insert(*p);
+ _add_heartbeat_peer(*p);
+ }
+
+ // remove down peers; enumerate extras
map<int,HeartbeatInfo>::iterator p = heartbeat_peers.begin();
while (p != heartbeat_peers.end()) {
- if (p->second.epoch < osdmap->get_epoch()) {
- dout(20) << " removing heartbeat peer osd." << p->first
- << " " << p->second.con_back->get_peer_addr()
- << " " << (p->second.con_front ? p->second.con_front->get_peer_addr() : entity_addr_t())
- << dendl;
- hbclient_messenger->mark_down(p->second.con_back);
- p->second.con_back->put();
- if (p->second.con_front) {
- hbclient_messenger->mark_down(p->second.con_front);
- p->second.con_front->put();
- }
- heartbeat_peers.erase(p++);
- } else {
+ if (!osdmap->is_up(p->first)) {
+ int o = p->first;
++p;
+ _remove_heartbeat_peer(o);
+ continue;
+ }
+ if (p->second.epoch < osdmap->get_epoch()) {
+ extras.insert(p->first);
}
+ ++p;
}
- dout(10) << "maybe_update_heartbeat_peers " << heartbeat_peers.size() << " peers" << dendl;
+
+ // too few?
+ int start = osdmap->get_next_up_osd_after(whoami);
+ for (int n = start; n >= 0; ) {
+ if ((int)heartbeat_peers.size() >= g_conf->osd_heartbeat_min_peers)
+ break;
+ if (!extras.count(n) && !want.count(n) && n != whoami) {
+ dout(10) << " adding random peer osd." << n << dendl;
+ extras.insert(n);
+ _add_heartbeat_peer(n);
+ }
+ n = osdmap->get_next_up_osd_after(n);
+ if (n == start)
+ break; // came full circle; stop
+ }
+
+ // too many?
+ for (set<int>::iterator p = extras.begin();
+ (int)heartbeat_peers.size() > g_conf->osd_heartbeat_min_peers && p != extras.end();
+ ++p) {
+ if (want.count(*p))
+ continue;
+ _remove_heartbeat_peer(*p);
+ }
+
+ dout(10) << "maybe_update_heartbeat_peers " << heartbeat_peers.size() << " peers, extras " << extras << dendl;
}
void OSD::reset_heartbeat_peers()
@@ -2417,6 +2490,13 @@ void OSD::handle_osd_ping(MOSDPing *m)
_share_map_outgoing(from, con.get());
}
}
+ } else if (curmap->get_down_at(from) > m->map_epoch) {
+ // tell them they have died
+ Message *r = new MOSDPing(monc->get_fsid(),
+ curmap->get_epoch(),
+ MOSDPing::YOU_DIED,
+ m->stamp);
+ m->get_connection()->get_messenger()->send_message(r, m->get_connection());
}
}
break;
@@ -2478,8 +2558,8 @@ void OSD::handle_osd_ping(MOSDPing *m)
case MOSDPing::YOU_DIED:
dout(10) << "handle_osd_ping " << m->get_source_inst() << " says i am down in " << m->map_epoch
<< dendl;
- monc->sub_want("osdmap", m->map_epoch, CEPH_SUBSCRIBE_ONETIME);
- monc->renew_subs();
+ if (monc->sub_want("osdmap", m->map_epoch, CEPH_SUBSCRIBE_ONETIME))
+ monc->renew_subs();
break;
}
@@ -2528,7 +2608,7 @@ void OSD::heartbeat_check()
<< " last_rx_back " << p->second.last_rx_back
<< " last_rx_front " << p->second.last_rx_front
<< dendl;
- if (!p->second.is_healthy(cutoff)) {
+ if (p->second.is_unhealthy(cutoff)) {
if (p->second.last_rx_back == utime_t() ||
p->second.last_rx_front == utime_t()) {
derr << "heartbeat_check: no reply from osd." << p->first
@@ -2671,22 +2751,7 @@ void OSD::tick()
logger->set(l_osd_buf, buffer::get_total_alloc());
- if (is_waiting_for_healthy()) {
- if (g_ceph_context->get_heartbeat_map()->is_healthy()) {
- dout(1) << "healthy again, booting" << dendl;
- state = STATE_BOOTING;
- start_boot();
- }
- }
-
- if (is_active()) {
- // periodically kick recovery work queue
- recovery_tp.wake();
-
- if (!scrub_random_backoff()) {
- sched_scrub();
- }
-
+ if (is_active() || is_waiting_for_healthy()) {
map_lock.get_read();
maybe_update_heartbeat_peers();
@@ -2695,8 +2760,6 @@ void OSD::tick()
heartbeat_check();
heartbeat_lock.Unlock();
- check_replay_queue();
-
// mon report?
utime_t now = ceph_clock_now(g_ceph_context);
if (outstanding_pg_stats &&
@@ -2717,6 +2780,25 @@ void OSD::tick()
map_lock.put_read();
}
+ if (is_waiting_for_healthy()) {
+ if (_is_healthy()) {
+ dout(1) << "healthy again, booting" << dendl;
+ state = STATE_BOOTING;
+ start_boot();
+ }
+ }
+
+ if (is_active()) {
+ // periodically kick recovery work queue
+ recovery_tp.wake();
+
+ if (!scrub_random_backoff()) {
+ sched_scrub();
+ }
+
+ check_replay_queue();
+ }
+
// only do waiters if dispatch() isn't currently running. (if it is,
// it'll do the waiters, and doing them here may screw up ordering
// of op_queue vs handle_osd_map.)
@@ -3052,18 +3134,16 @@ void OSD::_maybe_boot(epoch_t oldest, epoch_t newest)
return;
}
- // if we are not healthy, do not mark ourselves up (yet)
- if (!g_ceph_context->get_heartbeat_map()->is_healthy()) {
- dout(5) << "not healthy, deferring boot" << dendl;
- state = STATE_WAITING_FOR_HEALTHY;
- return;
- }
-
// if our map within recent history, try to add ourselves to the osdmap.
if (osdmap->test_flag(CEPH_OSDMAP_NOUP)) {
dout(5) << "osdmap NOUP flag is set, waiting for it to clear" << dendl;
- } else if (!g_ceph_context->get_heartbeat_map()->is_healthy()) {
- dout(1) << "internal heartbeats indicate we are not healthy; waiting to boot" << dendl;
+ } else if (is_waiting_for_healthy() || !_is_healthy()) {
+ // if we are not healthy, do not mark ourselves up (yet)
+ dout(1) << "not healthy; waiting to boot" << dendl;
+ if (!is_waiting_for_healthy())
+ start_waiting_for_healthy();
+ // send pings sooner rather than later
+ heartbeat_kick();
} else if (osdmap->get_epoch() >= oldest - 1 &&
osdmap->get_epoch() + g_conf->osd_map_message_max > newest) {
_send_boot();
@@ -3078,6 +3158,41 @@ void OSD::_maybe_boot(epoch_t oldest, epoch_t newest)
monc->renew_subs();
}
+void OSD::start_waiting_for_healthy()
+{
+ dout(1) << "start_waiting_for_healthy" << dendl;
+ state = STATE_WAITING_FOR_HEALTHY;
+ last_heartbeat_resample = utime_t();
+}
+
+bool OSD::_is_healthy()
+{
+ if (!g_ceph_context->get_heartbeat_map()->is_healthy()) {
+ dout(1) << "is_healthy false -- internal heartbeat failed" << dendl;
+ return false;
+ }
+
+ if (is_waiting_for_healthy()) {
+ Mutex::Locker l(heartbeat_lock);
+ utime_t cutoff = ceph_clock_now(g_ceph_context);
+ cutoff -= g_conf->osd_heartbeat_grace;
+ int num = 0, up = 0;
+ for (map<int,HeartbeatInfo>::iterator p = heartbeat_peers.begin();
+ p != heartbeat_peers.end();
+ ++p) {
+ if (p->second.is_healthy(cutoff))
+ ++up;
+ ++num;
+ }
+ if (up < num / 3) {
+ dout(1) << "is_healthy false -- only " << up << "/" << num << " up peers (less than 1/3)" << dendl;
+ return false;
+ }
+ }
+
+ return true;
+}
+
void OSD::_send_boot()
{
dout(10) << "_send_boot" << dendl;
@@ -4525,11 +4640,12 @@ void OSD::handle_osd_map(MOSDMap *m)
<< " != my " << hb_front_server_messenger->get_myaddr() << ")";
if (!service.is_stopping()) {
- state = STATE_BOOTING;
up_epoch = 0;
do_restart = true;
bind_epoch = osdmap->get_epoch();
+ start_waiting_for_healthy();
+
set<int> avoid_ports;
avoid_ports.insert(cluster_messenger->get_myaddr().get_port());
avoid_ports.insert(hb_back_server_messenger->get_myaddr().get_port());
@@ -4577,6 +4693,9 @@ void OSD::handle_osd_map(MOSDMap *m)
// yay!
consume_map();
+ if (is_active() || is_waiting_for_healthy())
+ maybe_update_heartbeat_peers();
+
if (!is_active()) {
dout(10) << " not yet active; waiting for peering wq to drain" << dendl;
peering_wq.drain();
@@ -4826,7 +4945,6 @@ void OSD::activate_map()
dout(7) << "activate_map version " << osdmap->get_epoch() << dendl;
wake_all_pg_waiters(); // the pg mapping may have shifted
- maybe_update_heartbeat_peers();
if (osdmap->test_flag(CEPH_OSDMAP_FULL)) {
dout(10) << " osdmap flagged full, doing onetime osdmap subscribe" << dendl;
diff --git a/src/osd/OSD.h b/src/osd/OSD.h
index 99d75dc40ad..effbb5e3533 100644
--- a/src/osd/OSD.h
+++ b/src/osd/OSD.h
@@ -704,15 +704,19 @@ private:
utime_t last_rx_back; ///< last time we got a ping reply on the back side
epoch_t epoch; ///< most recent epoch we wanted this peer
- bool is_healthy(utime_t cutoff) {
+ bool is_unhealthy(utime_t cutoff) {
return
- (last_rx_front > cutoff ||
- (last_rx_front == utime_t() && (last_tx == utime_t() ||
- first_tx > cutoff))) &&
- (last_rx_back > cutoff ||
- (last_rx_back == utime_t() && (last_tx == utime_t() ||
- first_tx > cutoff)));
+ ! ((last_rx_front > cutoff ||
+ (last_rx_front == utime_t() && (last_tx == utime_t() ||
+ first_tx > cutoff))) &&
+ (last_rx_back > cutoff ||
+ (last_rx_back == utime_t() && (last_tx == utime_t() ||
+ first_tx > cutoff))));
+ }
+ bool is_healthy(utime_t cutoff) {
+ return last_rx_front > cutoff && last_rx_back > cutoff;
}
+
};
/// state attached to outgoing heartbeat connections
struct HeartbeatSession : public RefCountedObject {
@@ -730,8 +734,10 @@ private:
Messenger *hbclient_messenger;
Messenger *hb_front_server_messenger;
Messenger *hb_back_server_messenger;
+ utime_t last_heartbeat_resample; ///< last time we chose random peers in waiting-for-healthy state
void _add_heartbeat_peer(int p);
+ void _remove_heartbeat_peer(int p);
bool heartbeat_reset(Connection *con);
void maybe_update_heartbeat_peers();
void reset_heartbeat_peers();
@@ -740,6 +746,11 @@ private:
void heartbeat_entry();
void need_heartbeat_peer_update();
+ void heartbeat_kick() {
+ Mutex::Locker l(heartbeat_lock);
+ heartbeat_cond.Signal();
+ }
+
struct T_Heartbeat : public Thread {
OSD *osd;
T_Heartbeat(OSD *o) : osd(o) {}
@@ -1116,6 +1127,9 @@ protected:
void start_boot();
void _maybe_boot(epoch_t oldest, epoch_t newest);
void _send_boot();
+
+ void start_waiting_for_healthy();
+ bool _is_healthy();
friend class C_OSD_GetVersion;
diff --git a/src/osd/OSDMap.h b/src/osd/OSDMap.h
index deebc376a91..0a00c40e23b 100644
--- a/src/osd/OSDMap.h
+++ b/src/osd/OSDMap.h
@@ -404,6 +404,30 @@ private:
return -1;
}
+ int get_next_up_osd_after(int n) const {
+ for (int i = n + 1; i != n; ++i) {
+ if (i >= get_max_osd())
+ i = 0;
+ if (i == n)
+ break;
+ if (is_up(i))
+ return i;
+ }
+ return -1;
+ }
+
+ int get_previous_up_osd_before(int n) const {
+ for (int i = n - 1; i != n; --i) {
+ if (i < 0)
+ i = get_max_osd() - 1;
+ if (i == n)
+ break;
+ if (is_up(i))
+ return i;
+ }
+ return -1;
+ }
+
/**
* get feature bits required by the current structure
*