summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2013-05-22 08:44:52 -0700
committerSage Weil <sage@inktank.com>2013-05-22 16:13:37 -0700
commit27381c0c6259ac89f5f9c592b4bfb585937a1cfc (patch)
tree29e50e07a7ee4a4670036e7fa371b9ad7ee4849d /src
parent92a558bf0e5fee6d5250e1085427bff22fe4bbe4 (diff)
downloadceph-27381c0c6259ac89f5f9c592b4bfb585937a1cfc.tar.gz
osd: ping both front and back interfaces
Send ping requests to both the front and back hb addrs for peer osds. If the front hb addr is not present, do not send it and interpret a reply as coming from both. This handles the transition from old to new OSDs seamlessly. Note both the front and back rx times. Both need to be up to date in order for the peer to be healthy. Signed-off-by: Sage Weil <sage@inktank.com>
Diffstat (limited to 'src')
-rw-r--r--src/msg/Message.h4
-rw-r--r--src/osd/OSD.cc187
-rw-r--r--src/osd/OSD.h18
3 files changed, 142 insertions, 67 deletions
diff --git a/src/msg/Message.h b/src/msg/Message.h
index 79a00c06fbe..18a64c1d02e 100644
--- a/src/msg/Message.h
+++ b/src/msg/Message.h
@@ -247,6 +247,10 @@ public:
return pipe != NULL;
}
+ Messenger *get_messenger() {
+ return msgr;
+ }
+
int get_peer_type() { return peer_type; }
void set_peer_type(int t) { peer_type = t; }
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc
index 5b51b44f24d..a37c95e2c7a 100644
--- a/src/osd/OSD.cc
+++ b/src/osd/OSD.cc
@@ -2250,16 +2250,24 @@ void OSD::_add_heartbeat_peer(int p)
map<int,HeartbeatInfo>::iterator i = heartbeat_peers.find(p);
if (i == heartbeat_peers.end()) {
- ConnectionRef con = service.get_con_osd_hb(p, osdmap->get_epoch());
- if (!con)
+ pair<ConnectionRef,ConnectionRef> cons = service.get_con_osd_hb(p, osdmap->get_epoch());
+ if (!cons.first)
return;
hi = &heartbeat_peers[p];
- hi->con = con.get();
- hi->con->get();
hi->peer = p;
- hi->con->set_priv(new HeartbeatSession(p));
+ HeartbeatSession *s = new HeartbeatSession(p);
+ hi->con_back = cons.first.get();
+ hi->con_back->get();
+ hi->con_back->set_priv(s);
+ if (cons.second) {
+ hi->con_front = cons.second.get();
+ hi->con_front->get();
+ hi->con_front->set_priv(s->get());
+ }
dout(10) << "_add_heartbeat_peer: new peer osd." << p
- << " " << hi->con->get_peer_addr() << dendl;
+ << " " << hi->con_back->get_peer_addr()
+ << " " << (hi->con_front ? hi->con_front->get_peer_addr() : entity_addr_t())
+ << dendl;
} else {
hi = &i->second;
}
@@ -2310,10 +2318,15 @@ void OSD::maybe_update_heartbeat_peers()
while (p != heartbeat_peers.end()) {
if (p->second.epoch < osdmap->get_epoch()) {
dout(20) << " removing heartbeat peer osd." << p->first
- << " " << p->second.con->get_peer_addr()
+ << " " << p->second.con_back->get_peer_addr()
+ << " " << (p->second.con_front ? p->second.con_front->get_peer_addr() : entity_addr_t())
<< dendl;
- hbclient_messenger->mark_down(p->second.con);
- p->second.con->put();
+ hbclient_messenger->mark_down(p->second.con_back);
+ p->second.con_back->put();
+ if (p->second.con_front) {
+ hbclient_messenger->mark_down(p->second.con_front);
+ p->second.con_front->put();
+ }
heartbeat_peers.erase(p++);
} else {
++p;
@@ -2328,8 +2341,13 @@ void OSD::reset_heartbeat_peers()
dout(10) << "reset_heartbeat_peers" << dendl;
Mutex::Locker l(heartbeat_lock);
while (!heartbeat_peers.empty()) {
- hbclient_messenger->mark_down(heartbeat_peers.begin()->second.con);
- heartbeat_peers.begin()->second.con->put();
+ HeartbeatInfo& hi = heartbeat_peers.begin()->second;
+ hbclient_messenger->mark_down(hi.con_back);
+ hi.con_back->put();
+ if (hi.con_front) {
+ hbclient_messenger->mark_down(hi.con_front);
+ hi.con_front->put();
+ }
heartbeat_peers.erase(heartbeat_peers.begin());
}
failure_queue.clear();
@@ -2389,7 +2407,7 @@ void OSD::handle_osd_ping(MOSDPing *m)
curmap->get_epoch(),
MOSDPing::PING_REPLY,
m->stamp);
- hb_back_server_messenger->send_message(r, m->get_connection());
+ m->get_connection()->get_messenger()->send_message(r, m->get_connection());
if (curmap->is_up(from)) {
note_peer_epoch(from, m->map_epoch);
@@ -2407,12 +2425,26 @@ void OSD::handle_osd_ping(MOSDPing *m)
{
map<int,HeartbeatInfo>::iterator i = heartbeat_peers.find(from);
if (i != heartbeat_peers.end()) {
- dout(25) << "handle_osd_ping got reply from osd." << from
- << " first_rx " << i->second.first_tx
- << " last_tx " << i->second.last_tx
- << " last_rx " << i->second.last_rx << " -> " << m->stamp
- << dendl;
- i->second.last_rx = m->stamp;
+ if (m->get_connection() == i->second.con_back) {
+ dout(25) << "handle_osd_ping got reply from osd." << from
+ << " first_rx " << i->second.first_tx
+ << " last_tx " << i->second.last_tx
+ << " last_rx_back " << i->second.last_rx_back << " -> " << m->stamp
+ << " last_rx_front " << i->second.last_rx_front
+ << dendl;
+ i->second.last_rx_back = m->stamp;
+ // if there is no front con, set both stamps.
+ if (i->second.con_front == NULL)
+ i->second.last_rx_front = m->stamp;
+ } else if (m->get_connection() == i->second.con_front) {
+ dout(25) << "handle_osd_ping got reply from osd." << from
+ << " first_rx " << i->second.first_tx
+ << " last_tx " << i->second.last_tx
+ << " last_rx_back " << i->second.last_rx_back
+ << " last_rx_front " << i->second.last_rx_front << " -> " << m->stamp
+ << dendl;
+ i->second.last_rx_front = m->stamp;
+ }
}
if (m->map_epoch &&
@@ -2426,12 +2458,19 @@ void OSD::handle_osd_ping(MOSDPing *m)
}
}
- // Cancel false reports
- if (failure_queue.count(from))
- failure_queue.erase(from);
- if (failure_pending.count(from)) {
- send_still_alive(curmap->get_epoch(), failure_pending[from]);
- failure_pending.erase(from);
+ utime_t cutoff = ceph_clock_now(g_ceph_context);
+ cutoff -= g_conf->osd_heartbeat_grace;
+ if (i->second.is_healthy(cutoff)) {
+ // Cancel false reports
+ if (failure_queue.count(from)) {
+ dout(10) << "handle_osd_ping canceling queued failure report for osd." << from<< dendl;
+ failure_queue.erase(from);
+ }
+ if (failure_pending.count(from)) {
+ dout(10) << "handle_osd_ping canceling in-flight failure report for osd." << from<< dendl;
+ send_still_alive(curmap->get_epoch(), failure_pending[from]);
+ failure_pending.erase(from);
+ }
}
}
break;
@@ -2486,27 +2525,25 @@ void OSD::heartbeat_check()
dout(25) << "heartbeat_check osd." << p->first
<< " first_tx " << p->second.first_tx
<< " last_tx " << p->second.last_tx
- << " last_rx " << p->second.last_rx
+ << " last_rx_back " << p->second.last_rx_back
+ << " last_rx_front " << p->second.last_rx_front
<< dendl;
- if (p->second.last_rx == utime_t()) {
- if (p->second.last_tx == utime_t() ||
- p->second.first_tx > cutoff)
- continue; // just started sending recently
- derr << "heartbeat_check: no reply from osd." << p->first
- << " ever, first ping sent " << p->second.first_tx
- << " (cutoff " << cutoff << ")" << dendl;
-
- // fail
- failure_queue[p->first] = p->second.last_tx;
- } else {
- if (p->second.last_rx > cutoff)
- continue; // got recent reply
- derr << "heartbeat_check: no reply from osd." << p->first
- << " since " << p->second.last_rx
- << " (cutoff " << cutoff << ")" << dendl;
-
- // fail
- failure_queue[p->first] = p->second.last_rx;
+ if (!p->second.is_healthy(cutoff)) {
+ if (p->second.last_rx_back == utime_t() ||
+ p->second.last_rx_front == utime_t()) {
+ derr << "heartbeat_check: no reply from osd." << p->first
+ << " ever on either front or back, first ping sent " << p->second.first_tx
+ << " (cutoff " << cutoff << ")" << dendl;
+ // fail
+ failure_queue[p->first] = p->second.last_tx;
+ } else {
+ derr << "heartbeat_check: no reply from osd." << p->first
+ << " since back " << p->second.last_rx_back
+ << " front " << p->second.last_rx_front
+ << " (cutoff " << cutoff << ")" << dendl;
+ // fail
+ failure_queue[p->first] = MIN(p->second.last_rx_back, p->second.last_rx_front);
+ }
}
}
}
@@ -2537,16 +2574,21 @@ void OSD::heartbeat()
i != heartbeat_peers.end();
++i) {
int peer = i->first;
- dout(30) << "heartbeat allocating ping for osd." << peer << dendl;
- Message *m = new MOSDPing(monc->get_fsid(),
- service.get_osdmap()->get_epoch(),
- MOSDPing::PING,
- now);
i->second.last_tx = now;
if (i->second.first_tx == utime_t())
i->second.first_tx = now;
dout(30) << "heartbeat sending ping to osd." << peer << dendl;
- hbclient_messenger->send_message(m, i->second.con);
+ hbclient_messenger->send_message(new MOSDPing(monc->get_fsid(),
+ service.get_osdmap()->get_epoch(),
+ MOSDPing::PING,
+ now),
+ i->second.con_back);
+ if (i->second.con_front)
+ hbclient_messenger->send_message(new MOSDPing(monc->get_fsid(),
+ service.get_osdmap()->get_epoch(),
+ MOSDPing::PING,
+ now),
+ i->second.con_front);
}
dout(30) << "heartbeat check" << dendl;
@@ -2580,20 +2622,30 @@ bool OSD::heartbeat_reset(Connection *con)
}
map<int,HeartbeatInfo>::iterator p = heartbeat_peers.find(s->peer);
if (p != heartbeat_peers.end() &&
- p->second.con == con) {
- ConnectionRef newcon = service.get_con_osd_hb(p->second.peer, p->second.epoch);
- if (!newcon) {
+ p->second.con_back == con) {
+ pair<ConnectionRef,ConnectionRef> newcon = service.get_con_osd_hb(p->second.peer, p->second.epoch);
+ if (!newcon.first) {
dout(10) << "heartbeat_reset reopen failed hb con " << con << " but failed to reopen" << dendl;
} else {
dout(10) << "heartbeat_reset reopen failed hb con " << con << dendl;
- p->second.con = newcon.get();
- p->second.con->get();
- p->second.con->set_priv(s);
+ hbclient_messenger->mark_down(p->second.con_back);
+ p->second.con_back = newcon.first.get();
+ p->second.con_back->get();
+ p->second.con_back->set_priv(s);
+ if (p->second.con_front)
+ hbclient_messenger->mark_down(p->second.con_front);
+ if (newcon.second) {
+ p->second.con_front = newcon.second.get();
+ p->second.con_front->get();
+ p->second.con_front->set_priv(s->get());
+ } else {
+ p->second.con_front = NULL;
+ }
}
} else {
dout(10) << "heartbeat_reset closing (old) failed hb con " << con << dendl;
+ hbclient_messenger->mark_down(con);
}
- hbclient_messenger->mark_down(con);
heartbeat_lock.Unlock();
s->put();
}
@@ -3121,20 +3173,23 @@ ConnectionRef OSDService::get_con_osd_cluster(int peer, epoch_t from_epoch)
return ret;
}
-ConnectionRef OSDService::get_con_osd_hb(int peer, epoch_t from_epoch)
+pair<ConnectionRef,ConnectionRef> OSDService::get_con_osd_hb(int peer, epoch_t from_epoch)
{
Mutex::Locker l(pre_publish_lock);
// service map is always newer/newest
assert(from_epoch <= next_osdmap->get_epoch());
+ pair<ConnectionRef,ConnectionRef> ret;
if (next_osdmap->is_down(peer) ||
next_osdmap->get_info(peer).up_from > from_epoch) {
- return NULL;
+ return ret;
}
- ConnectionRef ret(
- osd->hbclient_messenger->get_connection(next_osdmap->get_hb_back_inst(peer)));
- ret->put(); // Ref from get_connection
+ ret.first = osd->hbclient_messenger->get_connection(next_osdmap->get_hb_back_inst(peer));
+ ret.first->put(); // Ref from get_connection
+ ret.second = osd->hbclient_messenger->get_connection(next_osdmap->get_hb_front_inst(peer));
+ if (ret.second)
+ ret.second->put(); // Ref from get_connection
return ret;
}
@@ -4216,8 +4271,12 @@ void OSD::note_down_osd(int peer)
failure_pending.erase(peer);
map<int,HeartbeatInfo>::iterator p = heartbeat_peers.find(peer);
if (p != heartbeat_peers.end()) {
- hbclient_messenger->mark_down(p->second.con);
- p->second.con->put();
+ hbclient_messenger->mark_down(p->second.con_back);
+ p->second.con_back->put();
+ if (p->second.con_front) {
+ hbclient_messenger->mark_down(p->second.con_back);
+ p->second.con_front->put();
+ }
heartbeat_peers.erase(p);
}
heartbeat_lock.Unlock();
diff --git a/src/osd/OSD.h b/src/osd/OSD.h
index b26e0598f4c..428284c85ab 100644
--- a/src/osd/OSD.h
+++ b/src/osd/OSD.h
@@ -295,7 +295,7 @@ public:
next_osdmap = map;
}
ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch);
- ConnectionRef get_con_osd_hb(int peer, epoch_t from_epoch);
+ pair<ConnectionRef,ConnectionRef> get_con_osd_hb(int peer, epoch_t from_epoch); // (back, front)
void send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch);
void send_message_osd_cluster(Message *m, Connection *con) {
cluster_messenger->send_message(m, con);
@@ -696,11 +696,23 @@ private:
/// information about a heartbeat peer
struct HeartbeatInfo {
int peer; ///< peer
- Connection *con; ///< peer connection
+ Connection *con_front; ///< peer connection (front)
+ Connection *con_back; ///< peer connection (back)
utime_t first_tx; ///< time we sent our first ping request
utime_t last_tx; ///< last time we sent a ping request
- utime_t last_rx; ///< last time we got a ping reply
+ utime_t last_rx_front; ///< last time we got a ping reply on the front side
+ utime_t last_rx_back; ///< last time we got a ping reply on the back side
epoch_t epoch; ///< most recent epoch we wanted this peer
+
+ bool is_healthy(utime_t cutoff) {
+ return
+ (last_rx_front > cutoff ||
+ (last_rx_front == utime_t() && (last_tx == utime_t() ||
+ first_tx > cutoff))) &&
+ (last_rx_back > cutoff ||
+ (last_rx_back == utime_t() && (last_tx == utime_t() ||
+ first_tx > cutoff)));
+ }
};
/// state attached to outgoing heartbeat connections
struct HeartbeatSession : public RefCountedObject {