From 1652463dc0d39f327e8a8d4eaf770611d69f11cd Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 30 Nov 2012 13:47:27 -0800 Subject: wip livenessinfo, misc --- src/osd/OSD.cc | 70 +++++++++++++++++++++++++--------------------------- src/osd/OSD.h | 14 ++++++++--- src/osd/osd_types.cc | 4 +-- 3 files changed, 46 insertions(+), 42 deletions(-) diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 65566111961..2e076e056f7 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -1795,7 +1795,9 @@ void OSD::_add_heartbeat_peer(int p) hi->con = con.get(); hi->con->get(); hi->peer = p; - hi->con->set_priv(new HeartbeatSession(p)); + HeartbeatSession *s = new HeartbeatSession(p); + s->info = _get_peer_liveness(p, osdmap->get_info(p).up_from); + hi->con->set_priv(s); dout(10) << "_add_heartbeat_peer: new peer osd." << p << " " << hi->con->get_peer_addr() << dendl; } else { @@ -1929,17 +1931,17 @@ void OSD::handle_osd_ping(MOSDPing *m) } } - HeartbeatSession *s = con->get_priv(); - if (s) { - s->last_ack = ceph_clock_now(g_ceph_context); - } else { + Connection *con = m->get_connection(); + HeartbeatSession *s = (HeartbeatSession*)con->get_priv(); + if (!s) { s = new HeartbeatSession(-1); + s->info = _get_peer_liveness(m->get_source().num(), m->up_from); s->get(); s->client = make_pair(m->get_source().num(), m->up_from); - heartbeat_clients[s->client].insert(s); con->set_priv(s); dout(20) << "handle_osd_ping new client session for " << con->get_peer_addr() << " " << s->client << " " << s << dendl; } + s->info->last_tx_ack = ceph_clock_now(g_ceph_context); } break; @@ -2040,41 +2042,35 @@ void OSD::heartbeat_check() } // trim old server sessions - utime_t cutoff = ceph_clock_now(g_ceph_context); - cutoff -= g_conf->osd_heartbeat_read_interval; - list::iterator p = heartbeat_clients_closed.begin(); - while (p != heartbeat_clients_closed.end()) { - HeartbeatSession *s = *p; - if (s->last_ack >= cutoff) - break; - dout(20) << "heartbeat_check trimming closed session " << s << dendl; - heartbeat_clients_closed.erase(p++); - - map, set >::iterator q = heartbeat_clients.find(s->client); - assert(q != heartbeat_clients.end()); - q->second.erase(s); - if (q->second.empty()) - heartbeat_clients.erase(q); - - s->put(); + { + utime_t cutoff = ceph_clock_now(g_ceph_context); + cutoff -= g_conf->osd_heartbeat_read_interval; + list::iterator p = heartbeat_clients_closed.begin(); + while (p != heartbeat_clients_closed.end()) { + HeartbeatSession *s = *p; + if (s->info->last_rx_ack >= cutoff || + s->info->last_tx_ack >= cutoff) + break; + dout(20) << "heartbeat_check trimming closed session " << s << dendl; + heartbeat_clients_closed.erase(p++); + s->put(); + } } } -utime_t OSD::get_last_hb_ack(int peer, epoch_t up_from) +OSD::LivenessInfoRef OSD::get_peer_liveness(int peer, epoch_t up_from) { Mutex::Locker l(heartbeat_lock); - map,set >::iterator p = heartbeat_clients.find(make_pair(peer, up_from)); - if (p == heartbeat_clients.end()) - return utime_t(); - set::iterator q = p->second.begin(); - utime_t min = (*q)->last_ack; - while (++q != heartbeat_clients.end()) { - if ((*q)->last_ack > min) { - min = (*q)->last_ack; - } + return _get_peer_liveness(peer, up_from); +} + +OSD::LivenessInfoRef OSD::_get_peer_liveness(int peer, epoch_t up_from) +{ + map,LivenessInfoRef>::iterator p = heartbeat_peer_info.find(make_pair(peer, up_from)); + if (p == heartbeat_peer_info.end()) { + p = heartbeat_peer_info.insert(make_pair(make_pair(peer, up_from), new LivenessInfo)); } - dout(20) << "get_last_hb_ack " << peer << "," << up_from << " " << min << dendl; - return min; + return p->second; } void OSD::heartbeat() @@ -3838,13 +3834,13 @@ void OSD::handle_osd_map(MOSDMap *m) hbclient_messenger->mark_down_all(); if (hbserver_messenger_previous) { - hbserver_messenger_prevoius->mark_down_all(); + hbserver_messenger_previous->mark_down_all(); hbserver_messenger_previous->shutdown(); // FIXME: don't leak! } hbserver_messenger_previous = hbserver_messenger; hbserver_messenger = create_hbserver_messenger(whoami, nonce); - entity_addr_t hb_addr = hbserver_messenger_previous->get_addr(); + entity_addr_t hb_addr = hbserver_messenger_previous->get_myaddr(); hb_addr.set_port(0); r = hbserver_messenger->bind(hb_addr); if (r != 0) diff --git a/src/osd/OSD.h b/src/osd/OSD.h index f02c10e4b39..478ae1998cb 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -501,6 +501,12 @@ public: private: // -- heartbeat -- + /// liveness information about a peer, both acks sent and received. + struct LivenessInfo { + utime_t last_tx_ack; ///< last ping ack we sent + utime_t last_rx_ack; ///< last ping ack we received + }; + typedef boost::shared_ptr LivenessInfoRef; /// information about a heartbeat peer struct HeartbeatInfo { int peer; ///< peer @@ -513,9 +519,9 @@ private: /// state attached to incoming or outgoing heartbeat connections struct HeartbeatSession : public RefCountedObject { int peer; ///< if >= 0, we are a client connecting to this peer. - utime_t last_ack; bool closed; pair client; ///< client (osd, up_from) + LivenessInfoRef info; HeartbeatSession(int p) : peer(p), closed(false) {} @@ -535,7 +541,7 @@ private: map heartbeat_peers; ///< map of osd id to HeartbeatInfo utime_t last_mon_heartbeat; Messenger *hbclient_messenger, *hbserver_messenger, *hbserver_messenger_previous; - map,set > heartbeat_clients; ///< (epoch, up_from) -> sessions... + map,LivenessInfoRef> heartbeat_peer_info; ///< (epoch, up_from) -> sessions... list heartbeat_clients_closed; ///< closed sessions void _add_heartbeat_peer(int p); @@ -547,6 +553,8 @@ private: void heartbeat_entry(); void need_heartbeat_peer_update(); utime_t get_last_hb_ack(int peer, epoch_t up_from); + LivenessInfoRef _get_peer_liveness(int peer, epoch_t up_from); + LivenessInfoRef get_peer_liveness(int peer, epoch_t up_from); struct T_Heartbeat : public Thread { OSD *osd; @@ -1391,7 +1399,7 @@ public: void suicide(int exitcode); int shutdown(); - static Messenger *create_hbserver_messenger(); + static Messenger *create_hbserver_messenger(int whoami, uint64_t nonce); void handle_signal(int signum); diff --git a/src/osd/osd_types.cc b/src/osd/osd_types.cc index 3bd07cde73c..31a20cd2d25 100644 --- a/src/osd/osd_types.cc +++ b/src/osd/osd_types.cc @@ -1460,7 +1460,7 @@ void pg_interval_t::dump(Formatter *f) const f->open_array_section("acting"); for (vector::const_iterator p = acting.begin(); p != acting.end(); ++p) f->dump_int("osd", *p); - f->dump_stream("end_stamp") << duration; + f->dump_stream("end_stamp") << end_stamp; f->dump_unsigned("primary_up_from", primary_up_from); f->close_section(); } @@ -1504,7 +1504,7 @@ bool pg_interval_t::check_new_interval( i.up = old_up; i.end_stamp = osdmap->get_modified(); if (old_acting.size()) - i.primary_up_from = lastmap->get_osd_info(old_acting[0])->up_from; + i.primary_up_from = lastmap->get_info(old_acting[0]).up_from; if (i.acting.size() >= osdmap->get_pools().find(pool_id)->second.min_size) { -- cgit v1.2.1